Skip to content

Commit

Permalink
Merge branch 'STORM-1152' of https://github.com/sureshms/incubator-storm
Browse files Browse the repository at this point in the history
 into STORM-1152
  • Loading branch information
Sriharsha Chintalapani committed Oct 31, 2015
2 parents 3a724ee + bd0e8a0 commit fe08a9f
Show file tree
Hide file tree
Showing 11 changed files with 74 additions and 96 deletions.
64 changes: 20 additions & 44 deletions storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ public Cluster(INimbus nimbus, Map<String, SupervisorDetails> supervisors, Map<S
this.status = new HashMap<String, String>();
this.resources = new HashMap<String, Double[]>();
this.hostToId = new HashMap<String, List<String>>();
for (String nodeId : supervisors.keySet()) {
SupervisorDetails supervisor = supervisors.get(nodeId);
for (Map.Entry<String, SupervisorDetails> entry : supervisors.entrySet()) {
String nodeId = entry.getKey();
SupervisorDetails supervisor = entry.getValue();
String host = supervisor.getHost();
if (!this.hostToId.containsKey(host)) {
this.hostToId.put(host, new ArrayList<String>());
Expand Down Expand Up @@ -114,10 +115,7 @@ public String getHost(String supervisorId) {
}

/**
* Gets all the topologies which needs scheduling.
*
* @param topologies
* @return
* @return all the topologies which needs scheduling.
*/
public List<TopologyDetails> needsSchedulingTopologies(Topologies topologies) {
List<TopologyDetails> ret = new ArrayList<TopologyDetails>();
Expand All @@ -142,19 +140,12 @@ public List<TopologyDetails> needsSchedulingTopologies(Topologies topologies) {
public boolean needsScheduling(TopologyDetails topology) {
int desiredNumWorkers = topology.getNumWorkers();
int assignedNumWorkers = this.getAssignedNumWorkers(topology);

if (desiredNumWorkers > assignedNumWorkers) {
return true;
}

return this.getUnassignedExecutors(topology).size() > 0;
return desiredNumWorkers > assignedNumWorkers || this.getUnassignedExecutors(topology).size() > 0;
}

/**
* Gets a executor -> component-id map which needs scheduling in this topology.
*
* @param topology
* @return
* @return a executor -> component-id map which needs scheduling in this topology.
*/
public Map<ExecutorDetails, String> getNeedsSchedulingExecutorToComponents(TopologyDetails topology) {
Collection<ExecutorDetails> allExecutors = new HashSet(topology.getExecutors());
Expand All @@ -169,16 +160,15 @@ public Map<ExecutorDetails, String> getNeedsSchedulingExecutorToComponents(Topol
}

/**
* Gets a component-id -> executors map which needs scheduling in this topology.
*
* @param topology
* @return
* @return a component-id -> executors map which needs scheduling in this topology.
*/
public Map<String, List<ExecutorDetails>> getNeedsSchedulingComponentToExecutors(TopologyDetails topology) {
Map<ExecutorDetails, String> executorToComponents = this.getNeedsSchedulingExecutorToComponents(topology);
Map<String, List<ExecutorDetails>> componentToExecutors = new HashMap<String, List<ExecutorDetails>>();
for (ExecutorDetails executor : executorToComponents.keySet()) {
String component = executorToComponents.get(executor);
for (Map.Entry<ExecutorDetails, String> entry : executorToComponents.entrySet()) {
ExecutorDetails executor = entry.getKey();
String component = entry.getValue();
if (!componentToExecutors.containsKey(component)) {
componentToExecutors.put(component, new ArrayList<ExecutorDetails>());
}
Expand All @@ -192,9 +182,6 @@ public Map<String, List<ExecutorDetails>> getNeedsSchedulingComponentToExecutors

/**
* Get all the used ports of this supervisor.
*
* @param cluster
* @return
*/
public Set<Integer> getUsedPorts(SupervisorDetails supervisor) {
Map<String, SchedulerAssignment> assignments = this.getAssignments();
Expand All @@ -213,9 +200,6 @@ public Set<Integer> getUsedPorts(SupervisorDetails supervisor) {

/**
* Return the available ports of this supervisor.
*
* @param cluster
* @return
*/
public Set<Integer> getAvailablePorts(SupervisorDetails supervisor) {
Set<Integer> usedPorts = this.getUsedPorts(supervisor);
Expand All @@ -234,9 +218,6 @@ public Set<Integer> getAssignablePorts(SupervisorDetails supervisor) {

/**
* Return all the available slots on this supervisor.
*
* @param cluster
* @return
*/
public List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor) {
Set<Integer> ports = this.getAvailablePorts(supervisor);
Expand Down Expand Up @@ -280,10 +261,8 @@ public Collection<ExecutorDetails> getUnassignedExecutors(TopologyDetails topolo
}

/**
* Gets the number of workers assigned to this topology.
*
* @param topology
* @return
* @return the number of workers assigned to this topology.
*/
public int getAssignedNumWorkers(TopologyDetails topology) {
SchedulerAssignment assignment = this.getAssignmentById(topology.getId());
Expand Down Expand Up @@ -323,9 +302,7 @@ public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetail
}

/**
* Gets all the available slots in the cluster.
*
* @return
* @return all the available worker slots in the cluster.
*/
public List<WorkerSlot> getAvailableSlots() {
List<WorkerSlot> slots = new ArrayList<WorkerSlot>();
Expand Down Expand Up @@ -373,10 +350,8 @@ public void freeSlots(Collection<WorkerSlot> slots) {
}

/**
* Checks the specified slot is occupied.
*
* @param slot the slot be to checked.
* @return
* @return true if the specified slot is occupied.
*/
public boolean isSlotOccupied(WorkerSlot slot) {
for (SchedulerAssignment assignment : this.assignments.values()) {
Expand Down Expand Up @@ -473,13 +448,14 @@ public Map<String, List<String>> getNetworkTopography() {
DNSToSwitchMapping topographyMapper = (DNSToSwitchMapping) Utils.newInstance(clazz);

Map<String, String> resolvedSuperVisors = topographyMapper.resolve(supervisorHostNames);
for (String hostName : resolvedSuperVisors.keySet()) {
String rack = resolvedSuperVisors.get(hostName);

if (!networkTopography.containsKey(rack)) {
networkTopography.put(rack, new ArrayList<String>());
}
for (Map.Entry<String, String> entry : resolvedSuperVisors.entrySet()) {
String hostName = entry.getKey();
String rack = entry.getValue();
List<String> nodesForRack = networkTopography.get(rack);
if (nodesForRack == null) {
nodesForRack = new ArrayList<String>();
networkTopography.put(rack, nodesForRack);
}
nodesForRack.add(hostName);
}
}
Expand Down
6 changes: 3 additions & 3 deletions storm-core/src/jvm/backtype/storm/scheduler/Topologies.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public Topologies(Map<String, TopologyDetails> topologies) {
this.topologies.putAll(topologies);
this.nameToId = new HashMap<String, String>(topologies.size());

for (String topologyId : topologies.keySet()) {
TopologyDetails topology = topologies.get(topologyId);
this.nameToId.put(topology.getName(), topologyId);
for (Map.Entry<String, TopologyDetails> entry : topologies.entrySet()) {
TopologyDetails topology = entry.getValue();
this.nameToId.put(topology.getName(), entry.getKey());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.slf4j.LoggerFactory;

public class DRPCSimpleACLAuthorizer extends DRPCAuthorizerBase {
public static Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(DRPCSimpleACLAuthorizer.class);

public static final String CLIENT_USERS_KEY = "client.users";
Expand Down Expand Up @@ -75,15 +75,15 @@ protected Map<String,AclFunctionEntry> readAclFromConfig() {
(Map<String,Map<String,?>>)
conf.get(Config.DRPC_AUTHORIZER_ACL);

for (String function : confAcl.keySet()) {
Map<String,?> val = confAcl.get(function);
for (Map.Entry<String, Map<String, ?>> entry : confAcl.entrySet()) {
Map<String,?> val = entry.getValue();
Collection<String> clientUsers =
val.containsKey(CLIENT_USERS_KEY) ?
(Collection<String>) val.get(CLIENT_USERS_KEY) : null;
String invocationUser =
val.containsKey(INVOCATION_USER_KEY) ?
(String) val.get(INVOCATION_USER_KEY) : null;
acl.put(function,
acl.put(entry.getKey(),
new AclFunctionEntry(clientUsers, invocationUser));
}
} else if (!_permitWhenMissingFunctionEntry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ public void prepare(Map conf) {
Map<String, Map<String, List<String>>> userToHostAndGroup = (Map<String, Map<String, List<String>>>) conf.get(Config.NIMBUS_IMPERSONATION_ACL);

if (userToHostAndGroup != null) {
for (String user : userToHostAndGroup.keySet()) {
Set<String> groups = ImmutableSet.copyOf(userToHostAndGroup.get(user).get("groups"));
Set<String> hosts = ImmutableSet.copyOf(userToHostAndGroup.get(user).get("hosts"));
for (Map.Entry<String, Map<String, List<String>>> entry : userToHostAndGroup.entrySet()) {
String user = entry.getKey();
Set<String> groups = ImmutableSet.copyOf(entry.getValue().get("groups"));
Set<String> hosts = ImmutableSet.copyOf(entry.getValue().get("hosts"));
userImpersonationACL.put(user, new ImpersonationACL(user, groups, hosts));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ public static Kryo getKryo(Map conf) {
kryoFactory.preRegister(k, conf);

boolean skipMissing = (Boolean) conf.get(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS);
for(String klassName: registrations.keySet()) {
String serializerClassName = registrations.get(klassName);
for(Map.Entry<String, String> entry: registrations.entrySet()) {
String serializerClassName = entry.getValue();
try {
Class klass = Class.forName(klassName);
Class klass = Class.forName(entry.getKey());
Class serializerClass = null;
if(serializerClassName!=null)
serializerClass = Class.forName(serializerClassName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,12 @@ public Map<String, Map<String, Grouping>> getTargets(String componentId) {
Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>();
for(String otherComponentId: getComponentIds()) {
Map<GlobalStreamId, Grouping> inputs = getComponentCommon(otherComponentId).get_inputs();
for(GlobalStreamId id: inputs.keySet()) {
for(Map.Entry<GlobalStreamId, Grouping> entry: inputs.entrySet()) {
GlobalStreamId id = entry.getKey();
if(id.get_componentId().equals(componentId)) {
Map<String, Grouping> curr = ret.get(id.get_streamId());
if(curr==null) curr = new HashMap<String, Grouping>();
curr.put(otherComponentId, inputs.get(id));
curr.put(otherComponentId, entry.getValue());
ret.put(id.get_streamId(), curr);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ public void cleanupBefore(BigInteger txid) {
public void commit(TransactionAttempt attempt) {
BigInteger txid = attempt.getTransactionId();
Map<Integer, Object> metas = _cachedMetas.remove(txid);
for(Integer partition: metas.keySet()) {
Object meta = metas.get(partition);
for(Entry<Integer, Object> entry: metas.entrySet()) {
Integer partition = entry.getKey();
Object meta = entry.getValue();
_partitionStates.get(partition).overrideState(txid, meta);
}
}
Expand Down
17 changes: 8 additions & 9 deletions storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
Expand All @@ -30,18 +31,19 @@ public class TransferDrainer {
private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();

public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
for (Integer task : taskTupleSetMap.keySet()) {
addListRefToMap(this.bundles, task, taskTupleSetMap.get(task));
for (Map.Entry<Integer, ArrayList<TaskMessage>> entry : taskTupleSetMap.entrySet()) {
addListRefToMap(this.bundles, entry.getKey(), entry.getValue());
}
}

public void send(HashMap<Integer, String> taskToNode, HashMap<String, IConnection> connections) {
HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);

for (String hostPort : bundleMapByDestination.keySet()) {
for (Map.Entry<String, ArrayList<ArrayList<TaskMessage>>> entry : bundleMapByDestination.entrySet()) {
String hostPort = entry.getKey();
IConnection connection = connections.get(hostPort);
if (null != connection) {
ArrayList<ArrayList<TaskMessage>> bundle = bundleMapByDestination.get(hostPort);
ArrayList<ArrayList<TaskMessage>> bundle = entry.getValue();
Iterator<TaskMessage> iter = getBundleIterator(bundle);
if (null != iter && iter.hasNext()) {
connection.send(iter);
Expand Down Expand Up @@ -98,15 +100,12 @@ private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMe

@Override
public boolean hasNext() {
if (offset < size) {
return true;
}
return false;
return offset < size;
}

@Override
public TaskMessage next() {
TaskMessage msg = null;
TaskMessage msg;
if (iter.hasNext()) {
msg = iter.next();
} else {
Expand Down
19 changes: 9 additions & 10 deletions storm-core/src/jvm/backtype/storm/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,14 @@ public static Map<String, Object> fromCompressedJsonConf(byte[] serialized) {

public static <T> String join(Iterable<T> coll, String sep) {
Iterator<T> it = coll.iterator();
String ret = "";
StringBuilder ret = new StringBuilder();
while(it.hasNext()) {
ret = ret + it.next();
ret.append(it.next());
if(it.hasNext()) {
ret = ret + sep;
ret.append(sep);
}
}
return ret;
return ret.toString();
}

public static void sleep(long millis) {
Expand Down Expand Up @@ -327,10 +327,9 @@ public static Map readStormConfig() {
private static Object normalizeConf(Object conf) {
if (conf == null) return new HashMap();
if (conf instanceof Map) {
Map confMap = new HashMap((Map) conf);
for (Object key : confMap.keySet()) {
Object val = confMap.get(key);
confMap.put(key, normalizeConf(val));
Map<Object, Object> confMap = new HashMap((Map) conf);
for (Map.Entry<Object, Object> entry : confMap.entrySet()) {
confMap.put(entry.getKey(), normalizeConf(entry.getValue()));
}
return confMap;
} else if (conf instanceof List) {
Expand Down Expand Up @@ -426,8 +425,8 @@ public static boolean isSystemId(String id) {

public static <K, V> Map<V, K> reverseMap(Map<K, V> map) {
Map<V, K> ret = new HashMap<V, K>();
for (K key : map.keySet()) {
ret.put(map.get(key), key);
for (Map.Entry<K, V> entry : map.entrySet()) {
ret.put(entry.getValue(), entry.getKey());
}
return ret;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentColl
prevCached = new HashMap<>();
}

for(String id: _partitionStates.keySet()) {
EmitterPartitionState s = _partitionStates.get(id);
for(Entry<String, EmitterPartitionState> e: _partitionStates.entrySet()) {
String id = e.getKey();
EmitterPartitionState s = e.getValue();
s.rotatingState.removeState(tx.getTransactionId());
Object lastMeta = prevCached.get(id);
if(lastMeta==null) lastMeta = s.rotatingState.getLastState();
Expand Down Expand Up @@ -162,9 +163,8 @@ public void commit(TransactionAttempt attempt) {

Long txid = attempt.getTransactionId();
Map<String, Object> metas = _cachedMetas.remove(txid);
for(String partitionId: metas.keySet()) {
Object meta = metas.get(partitionId);
_partitionStates.get(partitionId).rotatingState.overrideState(txid, meta);
for(Entry<String, Object> entry: metas.entrySet()) {
_partitionStates.get(entry.getKey()).rotatingState.overrideState(txid, entry.getValue());
}
}

Expand Down
Loading

0 comments on commit fe08a9f

Please sign in to comment.