Skip to content

Commit

Permalink
Use Date object in scheduling policies for time config
Browse files Browse the repository at this point in the history
  • Loading branch information
grkvlt committed Apr 5, 2017
1 parent 2088284 commit c910c06
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
*/
package org.apache.brooklyn.policy.action;

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalTime;
import java.util.Calendar;
import java.util.Date;
import java.util.Map;
Expand Down Expand Up @@ -58,9 +56,6 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp

private static final Logger LOG = LoggerFactory.getLogger(AbstractScheduledEffectorPolicy.class);

protected static final String TIME_FORMAT = "HH:mm:ss";
protected static DateFormat FORMATTER = new SimpleDateFormat(TIME_FORMAT);

public static final ConfigKey<String> EFFECTOR = ConfigKeys.builder(String.class)
.name("effector")
.description("The effector to be executed by this policy")
Expand All @@ -74,7 +69,7 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp
.defaultValue(ImmutableMap.<String, Object>of())
.build();

public static final ConfigKey<String> TIME = ConfigKeys.builder(String.class)
public static final ConfigKey<Date> TIME = ConfigKeys.builder(Date.class)
.name("time")
.description("An optional time when this policy should be first executed")
.build();
Expand All @@ -85,9 +80,10 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp
.constraint(Predicates.or(Predicates.isNull(), DurationPredicates.positive()))
.build();

protected final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
protected final Object mutex = new Object[0];

protected Effector<?> effector;
protected ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
protected Object mutex = new Object[0];

public AbstractScheduledEffectorPolicy() {
this(MutableMap.<String,Object>of());
Expand Down Expand Up @@ -117,22 +113,15 @@ protected Effector<?> getEffector() {
return effector.get();
}

// TODO move to java.time classes in JDK 8
protected Duration getWaitUntil(String time) {
try {
Calendar now = Calendar.getInstance();
Calendar when = Calendar.getInstance();
Date parsed = FORMATTER.parse(time);
when.setTime(parsed);
when.set(now.get(Calendar.YEAR), now.get(Calendar.MONTH), now.get(Calendar.DATE));
if (when.before(now)) {
when.add(Calendar.DATE, 1);
}
return Duration.millis(Math.max(0, when.getTimeInMillis() - now.getTimeInMillis()));
} catch (ParseException e) {
LOG.warn("The time must be formatted as " + TIME_FORMAT + " for this policy", e);
throw Exceptions.propagate(e);
protected Duration getWaitUntil(Date time) {
Calendar now = Calendar.getInstance();
Calendar when = Calendar.getInstance();
when.setTime(time);
when.set(now.get(Calendar.YEAR), now.get(Calendar.MONTH), now.get(Calendar.DATE));
if (when.before(now)) {
when.add(Calendar.DATE, 1);
}
return Duration.millis(Math.max(0, when.getTimeInMillis() - now.getTimeInMillis()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.brooklyn.policy.action;

import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -30,7 +31,6 @@
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.DurationPredicates;
import org.apache.brooklyn.util.time.Time;
Expand Down Expand Up @@ -89,12 +89,12 @@ public void onEvent(SensorEvent<Object> event) {
synchronized (mutex) {
LOG.debug("{}: Got event {}", PeriodicEffectorPolicy.this, event);
if (event.getSensor().getName().equals(START_SCHEDULER.getName())) {
Boolean start = Boolean.parseBoolean(Strings.toString(event.getValue()));
Boolean start = (Boolean) event.getValue();
if (start && running.compareAndSet(false, true)) {
Duration period = Preconditions.checkNotNull(config().get(PERIOD), "The period must be configured for this policy");
String time = config().get(TIME);
Date time = config().get(TIME);
Duration wait = config().get(WAIT);
if (Strings.isNonBlank(time)) {
if (time != null) {
wait = getWaitUntil(time);
} else if (wait == null) {
wait = period;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.brooklyn.policy.action;

import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand All @@ -27,7 +28,6 @@
import org.apache.brooklyn.api.sensor.SensorEventListener;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
Expand All @@ -52,7 +52,7 @@ public class ScheduledEffectorPolicy extends AbstractScheduledEffectorPolicy {
private static final Logger LOG = LoggerFactory.getLogger(ScheduledEffectorPolicy.class);

public static final AttributeSensor<Boolean> INVOKE_IMMEDIATELY = Sensors.newBooleanSensor("scheduler.invoke.now", "Invoke the configured effector immediately when this becomes true");
public static final AttributeSensor<String> INVOKE_AT = Sensors.newStringSensor("scheduler.invoke.at", "Invoke the configured effector at this time");
public static final AttributeSensor<Date> INVOKE_AT = Sensors.newSensor(Date.class, "scheduler.invoke.at", "Invoke the configured effector at this time");

public ScheduledEffectorPolicy() {
this(MutableMap.<String,Object>of());
Expand All @@ -69,16 +69,16 @@ public void setEntity(final EntityLocal entity) {
subscriptions().subscribe(entity, INVOKE_IMMEDIATELY, handler);
subscriptions().subscribe(entity, INVOKE_AT, handler);

String time = config().get(TIME);
Date time = config().get(TIME);
Duration wait = config().get(WAIT);
if (Strings.isNonBlank(time)) {
if (time != null) {
scheduleAt(time);
} else if (wait != null) {
executor.schedule(this, wait.toMilliseconds(), TimeUnit.MILLISECONDS);
}
}

protected void scheduleAt(String time) {
protected void scheduleAt(Date time) {
Duration wait = getWaitUntil(time);
LOG.debug("{}: Scheduling {} at {} (in {})", new Object[] { this, effector.getName(), time, Time.fromDurationToTimeStringRounded().apply(wait) });
executor.schedule(this, wait.toMilliseconds(), TimeUnit.MILLISECONDS);
Expand All @@ -90,13 +90,13 @@ public void onEvent(SensorEvent<Object> event) {
synchronized (mutex) {
LOG.debug("{}: Got event {}", ScheduledEffectorPolicy.this, event);
if (event.getSensor().getName().equals(INVOKE_AT.getName())) {
String time = (String) event.getValue();
if (Strings.isNonBlank(time)) {
Date time = (Date) event.getValue();
if (time != null) {
scheduleAt(time);
}
}
if (event.getSensor().getName().equals(INVOKE_IMMEDIATELY.getName())) {
Boolean invoke = Boolean.parseBoolean(Strings.toString(event.getValue()));
Boolean invoke = (Boolean) event.getValue();
if (invoke) {
executor.submit(ScheduledEffectorPolicy.this);
}
Expand Down

0 comments on commit c910c06

Please sign in to comment.