diff --git a/core/src/main/java/org/apache/brooklyn/core/effector/composite/AbstractCompositeEffector.java b/core/src/main/java/org/apache/brooklyn/core/effector/composite/AbstractCompositeEffector.java index a36d816fa0a..db11058f976 100644 --- a/core/src/main/java/org/apache/brooklyn/core/effector/composite/AbstractCompositeEffector.java +++ b/core/src/main/java/org/apache/brooklyn/core/effector/composite/AbstractCompositeEffector.java @@ -23,6 +23,7 @@ import org.apache.brooklyn.api.effector.Effector; import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.core.effector.AddEffector; import org.apache.brooklyn.core.effector.EffectorBody; import org.apache.brooklyn.util.core.config.ConfigBag; @@ -162,15 +163,17 @@ protected String getInputParameter(Object effectorDetails) { return inputParameter; } - protected Object invokeEffectorNamed(Entity target, String effectorName, ConfigBag params) { + protected Task submitEffectorNamed(Entity target, String effectorName, ConfigBag params) { LOG.debug("{} invoking {} with params {}", new Object[] { this, effectorName, params }); Maybe> effector = target.getEntityType().getEffectorByName(effectorName); if (effector.isAbsent()) { throw new IllegalStateException("Cannot find effector " + effectorName); } - return target.invoke(effector.get(), params.getAllConfig()).getUnchecked(); + return target.invoke(effector.get(), params.getAllConfig()); } + protected Object invokeEffectorNamed(Entity target, String effectorName, ConfigBag params) { + return submitEffectorNamed(target, effectorName, params).getUnchecked(); + } } - } diff --git a/core/src/main/java/org/apache/brooklyn/core/effector/composite/LoopEffector.java b/core/src/main/java/org/apache/brooklyn/core/effector/composite/LoopEffector.java index 1209e1a19f3..9d5860867a6 100644 --- a/core/src/main/java/org/apache/brooklyn/core/effector/composite/LoopEffector.java +++ b/core/src/main/java/org/apache/brooklyn/core/effector/composite/LoopEffector.java @@ -24,6 +24,7 @@ import org.apache.brooklyn.api.effector.Effector; import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.effector.AddEffector; @@ -36,6 +37,7 @@ import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; /** @@ -92,8 +94,6 @@ public List call(final ConfigBag params) { } Collection inputCollection = (Collection) inputObject; - List result = Lists.newArrayList(); - String effectorName = getEffectorName(effectorDetails); String inputArgument = getInputArgument(effectorDetails); Entity targetEntity = getTargetEntity(effectorDetails); @@ -103,13 +103,18 @@ public List call(final ConfigBag params) { throw new IllegalArgumentException("Input is not set for this effector: " + effectorDetails); } - for (Object inputEach : inputCollection) { - params.putStringKey(inputArgument, inputEach); + List> tasks = Lists.newArrayList(); + for (Object each : inputCollection) { + params.putStringKey(inputArgument, each); + tasks.add(submitEffectorNamed(targetEntity, effectorName, params)); + } - result.add(invokeEffectorNamed(targetEntity, effectorName, params)); + List result = Lists.newArrayList(); + for (Task each : tasks) { + result.add(each.getUnchecked()); } - LOG.debug("{} effector {} returned {}", new Object[] { this, effector.getName(), result }); + LOG.debug("{} effector {} returned {}", new Object[] { this, effector.getName(), Iterables.toString(result) }); return result; } } diff --git a/core/src/main/java/org/apache/brooklyn/core/effector/composite/ParallelEffector.java b/core/src/main/java/org/apache/brooklyn/core/effector/composite/ParallelEffector.java new file mode 100644 index 00000000000..b1b73e4d074 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/effector/composite/ParallelEffector.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.effector.composite; + +import java.util.List; +import java.util.Map; + +import org.apache.brooklyn.api.effector.Effector; +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.effector.AddEffector; +import org.apache.brooklyn.core.effector.EffectorBody; +import org.apache.brooklyn.core.effector.Effectors.EffectorBuilder; +import org.apache.brooklyn.core.entity.EntityInitializers; +import org.apache.brooklyn.util.core.config.ConfigBag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.reflect.TypeToken; + +/** + * Execute a sequence of effectors with the same input. + * + * @since 0.11.0 + */ +@Beta +public class ParallelEffector extends AbstractCompositeEffector { + + private static final Logger LOG = LoggerFactory.getLogger(ParallelEffector.class); + + public static final ConfigKey> PARALLEL = ConfigKeys.newConfigKey( + new TypeToken>() { }, + "parallel", + "Effector details list for the parallel effector", + ImmutableList.of()); + + public ParallelEffector(ConfigBag params) { + super(newEffectorBuilder(params).build()); + } + + public ParallelEffector(Map params) { + this(ConfigBag.newInstance(params)); + } + + public static EffectorBuilder newEffectorBuilder(ConfigBag params) { + EffectorBuilder eff = AddEffector.newEffectorBuilder(Object.class, params); + EffectorBody body = new Body(eff.buildAbstract(), params); + eff.impl(body); + return eff; + } + + protected static class Body extends AbstractCompositeEffector.Body { + + public Body(Effector eff, ConfigBag config) { + super(eff, config); + Preconditions.checkNotNull(config.getAllConfigRaw().get(PARALLEL.getName()), "Parallel effector names must be supplied when defining this effector"); + } + + @Override + public Object call(final ConfigBag params) { + synchronized (mutex) { + LOG.debug("{} called with config {}, params {}", new Object[] { this, config, params }); + List effectors = EntityInitializers.resolve(config, PARALLEL); + + List> tasks = Lists.newArrayList(); + for (Object effectorDetails : effectors) { + String effectorName = getEffectorName(effectorDetails); + String inputArgument = getInputArgument(effectorDetails); + Entity targetEntity = getTargetEntity(effectorDetails); + LOG.debug("{} executing {}({}) on {}", new Object[] { this, effectorName, inputArgument, targetEntity }); + + if (inputArgument == null) { + throw new IllegalArgumentException("Input is not set for this effector: " + effectorDetails); + } + Object input = params.getStringKey(inputArgument); + params.putStringKey(inputArgument, input); + + tasks.add(submitEffectorNamed(targetEntity, effectorName, params)); + } + + Object result = null; + for (Task each : tasks) { + result = each.getUnchecked(); + } + + LOG.debug("{} effector {} returned {}", new Object[] { this, effector.getName(), result }); + return result; + } + } + } +}