From 669b18eee1f7882007606184400b8a3bef96f6ec Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Sun, 24 Mar 2024 23:03:57 +0000 Subject: [PATCH 1/2] Experimental support for using lambdas for scheduling --- .../DefaultInvocationSerializer.java | 65 +++--- .../transactionoutbox/Instantiator.java | 4 + .../transactionoutbox/Invocation.java | 17 ++ .../transactionoutbox/LambdaContext.java | 13 ++ .../transactionoutbox/LambdaRunner.java | 14 ++ .../transactionoutbox/SerializableLambda.java | 38 ++++ .../transactionoutbox/TransactionOutbox.java | 8 + .../TransactionOutboxEntry.java | 30 +-- .../TransactionOutboxImpl.java | 192 +++++++++++------- ...bstractFullyQualifiedNameInstantiator.java | 6 + .../gruelbox/transactionoutbox/spi/Utils.java | 17 ++ .../TestLambdaInvocations.java | 30 +++ .../testing/AbstractAcceptanceTest.java | 39 ++++ 13 files changed, 354 insertions(+), 119 deletions(-) create mode 100644 transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/LambdaContext.java create mode 100644 transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/LambdaRunner.java create mode 100644 transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/SerializableLambda.java create mode 100644 transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestLambdaInvocations.java diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultInvocationSerializer.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultInvocationSerializer.java index 543af4b1..46de1c8f 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultInvocationSerializer.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultInvocationSerializer.java @@ -1,22 +1,11 @@ package com.gruelbox.transactionoutbox; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonArray; -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import com.google.gson.JsonSerializationContext; -import com.google.gson.JsonSerializer; -import com.google.gson.TypeAdapter; +import com.google.gson.*; +import com.google.gson.reflect.TypeToken; import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonToken; import com.google.gson.stream.JsonWriter; -import java.io.IOException; -import java.io.Reader; -import java.io.Writer; +import java.io.*; import java.lang.reflect.Modifier; import java.lang.reflect.Type; import java.math.BigDecimal; @@ -25,15 +14,7 @@ import java.time.*; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; -import java.util.Calendar; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; -import java.util.UUID; +import java.util.*; import lombok.Builder; import lombok.extern.slf4j.Slf4j; @@ -92,6 +73,17 @@ public final class DefaultInvocationSerializer implements InvocationSerializer { .registerTypeAdapter(Period.class, new PeriodTypeAdapter()) .registerTypeAdapter(Year.class, new YearTypeAdapter()) .registerTypeAdapter(YearMonth.class, new YearMonthAdapter()) + .registerTypeAdapterFactory( + new TypeAdapterFactory() { + @SuppressWarnings("unchecked") + @Override + public TypeAdapter create(Gson gson, TypeToken type) { + if (SerializableLambda.class.isAssignableFrom(type.getRawType())) { + return (TypeAdapter) new SerializableRunnerAdapter(); + } + return null; + } + }) .excludeFieldsWithModifiers(Modifier.TRANSIENT, Modifier.STATIC) .create(); } @@ -298,6 +290,7 @@ public Invocation deserialize( JsonElement argType = arg.getAsJsonObject().get("t"); if (argType != null) { JsonElement argValue = arg.getAsJsonObject().get("v"); + log.info("arg = {}", arg); Class argClass = classForName(argType.getAsString()); try { args[i] = context.deserialize(argValue, argClass); @@ -313,6 +306,9 @@ public Invocation deserialize( } private Class classForName(String name) { + if (name.equals(SerializableLambda.class.getName()) || name.contains("$$Lambda")) { + return SerializableLambda.class; + } var clazz = nameToClass.get(name); if (clazz == null) { throw new IllegalArgumentException("Cannot deserialize class - not found: " + name); @@ -321,6 +317,9 @@ private Class classForName(String name) { } private String nameForClass(Class clazz) { + if (SerializableLambda.class.isAssignableFrom(clazz)) { + return clazz.getName(); + } var name = classToName.get(clazz); if (name == null) { throw new IllegalArgumentException( @@ -696,4 +695,24 @@ private static int parseInt(String value, int beginIndex, int endIndex) return -result; } } + + private static class SerializableRunnerAdapter extends TypeAdapter { + @Override + public void write(JsonWriter out, SerializableLambda value) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + new ObjectOutputStream(bos).writeObject(value); + out.value(Base64.getEncoder().encodeToString(bos.toByteArray())); + } + + @Override + public SerializableLambda read(JsonReader in) throws IOException { + byte[] bytes = Base64.getDecoder().decode(in.nextString()); + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); + try { + return (SerializableLambda) ois.readObject(); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Instantiator.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Instantiator.java index 742695b1..aa4c6674 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Instantiator.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Instantiator.java @@ -65,4 +65,8 @@ static Instantiator using(Function, Object> fn) { * @return An instance of the class. */ Object getInstance(String name); + + default T getInstance(Class clazz) { + throw new UnsupportedOperationException(); + } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Invocation.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Invocation.java index 3cc815d8..03102617 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Invocation.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Invocation.java @@ -1,6 +1,9 @@ package com.gruelbox.transactionoutbox; +import static java.util.stream.Collectors.joining; + import com.google.gson.annotations.SerializedName; +import com.gruelbox.transactionoutbox.spi.Utils; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Arrays; @@ -136,4 +139,18 @@ void invoke(Object instance, TransactionOutboxListener listener) } listener.wrapInvocation(() -> method.invoke(instance, args)); } + + public String getDescription() { + if (getClassName().equals(LambdaRunner.class.getName())) { + return ((SerializableLambda) args[0]).getDescription(); + } else { + return String.format( + "%s.%s(%s)", + className, + methodName, + args == null + ? null + : Arrays.stream(args).map(Utils::stringifyArg).collect(joining(", "))); + } + } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/LambdaContext.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/LambdaContext.java new file mode 100644 index 00000000..5e2a0448 --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/LambdaContext.java @@ -0,0 +1,13 @@ +package com.gruelbox.transactionoutbox; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; + +@AllArgsConstructor(access = AccessLevel.PACKAGE) +public class LambdaContext { + private final Instantiator instantiator; + + public T getInstance(Class clazz) { + return instantiator.getInstance(clazz); + } +} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/LambdaRunner.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/LambdaRunner.java new file mode 100644 index 00000000..9abf873b --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/LambdaRunner.java @@ -0,0 +1,14 @@ +package com.gruelbox.transactionoutbox; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; + +@AllArgsConstructor(access = AccessLevel.PROTECTED) +final class LambdaRunner { + + private final LambdaContext lambdaContext; + + void run(SerializableLambda runnable) { + runnable.run(lambdaContext); + } +} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/SerializableLambda.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/SerializableLambda.java new file mode 100644 index 00000000..7b0bd72f --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/SerializableLambda.java @@ -0,0 +1,38 @@ +package com.gruelbox.transactionoutbox; + +import static java.util.stream.Collectors.joining; + +import com.gruelbox.transactionoutbox.spi.Utils; +import java.io.Serializable; +import java.util.Arrays; + +/** TODO */ +@FunctionalInterface +public interface SerializableLambda extends Serializable { + + void run(LambdaContext lambdaContext); + + default String getDescription() { + var className = getClass().getName(); + var index = className.indexOf("$$Lambda/"); + if (index == -1) { + return ""; + } + var parentClass = className.substring(0, index); + var lambdaId = className.substring(index + 9); + var args = + Arrays.stream(getClass().getDeclaredFields()) + .map( + f -> { + try { + f.setAccessible(true); + return f.get(this); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + }) + .map(Utils::stringifyArg) + .collect(joining(",")); + return String.format("%s.lambda_%s_(%s)", parentClass, lambdaId, args); + } +} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java index 51e0ab59..8c823291 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java @@ -58,6 +58,14 @@ static TransactionOutboxBuilder builder() { */ T schedule(Class clazz); + /** + * TODO + * + * @param runnable + * @return + */ + TransactionOutboxEntry schedule(SerializableLambda runnable); + /** * Starts building a schedule request with parameterization. See {@link * ParameterizedScheduleBuilder#schedule(Class)} for more information. diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java index 01749722..2510277f 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java @@ -1,9 +1,7 @@ package com.gruelbox.transactionoutbox; -import static java.util.stream.Collectors.joining; import java.time.Instant; -import java.util.Arrays; import lombok.*; import lombok.experimental.SuperBuilder; @@ -122,41 +120,21 @@ public String description() { if (!this.initialized) { synchronized (this) { if (!this.initialized) { - String description = + this.description = String.format( - "%s.%s(%s) [%s]%s%s", - invocation.getClassName(), - invocation.getMethodName(), - invocation.getArgs() == null - ? null - : Arrays.stream(invocation.getArgs()) - .map(this::stringify) - .collect(joining(", ")), + "%s [%s]%s%s", + invocation.getDescription(), id, uniqueRequestId == null ? "" : " uid=[" + uniqueRequestId + "]", topic == null ? "" : " seq=[" + topic + "/" + sequence + "]"); - this.description = description; this.initialized = true; - return description; + return this.description; } } } return this.description; } - private String stringify(Object o) { - if (o == null) { - return "null"; - } - if (o.getClass().isArray()) { - return "[" + Arrays.stream((Object[]) o).map(this::stringify).collect(joining(", ")) + "]"; - } - if (o instanceof String) { - return "\"" + o + "\""; - } - return o.toString(); - } - @Override public void validate(Validator validator) { validator.notNull("id", id); diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java index b9e1c981..74c00727 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java @@ -79,7 +79,55 @@ public void initialize() { @Override public T schedule(Class clazz) { - return schedule(clazz, null, null, null); + return schedule(clazz, null); + } + + @Override + public TransactionOutboxEntry schedule(SerializableLambda runnable) { + checkInitialized(); + checkThreadLocalTransactionManager(); + try { + var method = LambdaRunner.class.getDeclaredMethod("run", SerializableLambda.class); + var args = new SerializableLambda[] {runnable}; + var txn = transactionManager.extractTransaction(method, args); + return scheduleLambda(txn.getTransaction(), runnable); + } catch (Exception e) { + throw new IllegalArgumentException(e); + } + } + + private TransactionOutboxEntry schedule(Transaction transaction, SerializableLambda runnable) { + checkInitialized(); + return scheduleLambda(transaction, runnable); + } + + private TransactionOutboxEntry scheduleLambda( + Transaction transaction, SerializableLambda runnable) { + try { + var clazz = LambdaRunner.class; + var method = LambdaRunner.class.getDeclaredMethod("run", SerializableLambda.class); + var args = new SerializableLambda[] {runnable}; + TransactionOutboxEntry entry = + newEntry( + clazz, + "run", + method.getParameterTypes(), + args, + null, + null, + clockProvider.get().instant()); + persistor.save(transaction, entry); + addPostExecutionHook(null, transaction, entry); + return entry; + } catch (Exception e) { + throw new IllegalArgumentException(e); + } + } + + private void checkInitialized() { + if (!initialized.get()) { + throw new IllegalStateException("Not initialized"); + } } @Override @@ -114,9 +162,7 @@ private boolean flushStale(Instant now) { @Override public boolean flush(Executor executor) { - if (!initialized.get()) { - throw new IllegalStateException("Not initialized"); - } + checkInitialized(); Instant now = clockProvider.get().instant(); List> futures = new ArrayList<>(); @@ -168,13 +214,8 @@ private void expireIdempotencyProtection(Instant now) { @Override public boolean unblock(String entryId) { - if (!initialized.get()) { - throw new IllegalStateException("Not initialized"); - } - if (!(transactionManager instanceof ThreadLocalContextTransactionManager)) { - throw new UnsupportedOperationException( - "This method requires a ThreadLocalContextTransactionManager"); - } + checkInitialized(); + checkThreadLocalTransactionManager(); log.info("Unblocking entry {} for retry.", entryId); try { return ((ThreadLocalContextTransactionManager) transactionManager) @@ -184,12 +225,17 @@ public boolean unblock(String entryId) { } } + private void checkThreadLocalTransactionManager() { + if (!(transactionManager instanceof ThreadLocalContextTransactionManager)) { + throw new UnsupportedOperationException( + "This method requires a ThreadLocalContextTransactionManager"); + } + } + @Override @SuppressWarnings({"unchecked", "rawtypes"}) public boolean unblock(String entryId, Object transactionContext) { - if (!initialized.get()) { - throw new IllegalStateException("Not initialized"); - } + checkInitialized(); if (!(transactionManager instanceof ParameterContextTransactionManager)) { throw new UnsupportedOperationException( "This method requires a ParameterContextTransactionManager"); @@ -208,11 +254,8 @@ public boolean unblock(String entryId, Object transactionContext) { } } - private T schedule( - Class clazz, String uniqueRequestId, String topic, Duration delayForAtLeast) { - if (!initialized.get()) { - throw new IllegalStateException("Not initialized"); - } + private T schedule(Class clazz, ParameterizedScheduleBuilderImpl params) { + checkInitialized(); return proxyFactory.createProxy( clazz, (method, args) -> @@ -225,44 +268,44 @@ private T schedule( extracted.getMethodName(), extracted.getParameters(), extracted.getArgs(), - uniqueRequestId, - topic); - if (delayForAtLeast != null) { - entry.setNextAttemptTime(entry.getNextAttemptTime().plus(delayForAtLeast)); - } - validator.validate(entry); - persistor.save(extracted.getTransaction(), entry); - extracted - .getTransaction() - .addPostCommitHook( - () -> { - listener.scheduled(entry); - if (entry.getTopic() != null) { - log.debug("Queued {} in topic {}", entry.description(), topic); - } else if (delayForAtLeast == null) { - submitNow(entry); - log.debug( - "Scheduled {} for post-commit execution", entry.description()); - } else if (delayForAtLeast.compareTo(attemptFrequency) < 0) { - scheduler.schedule( - () -> submitNow(entry), - delayForAtLeast.toMillis(), - TimeUnit.MILLISECONDS); - log.info( - "Scheduled {} for post-commit execution after at least {}", - entry.description(), - delayForAtLeast); - } else { - log.info( - "Queued {} for execution after at least {}", - entry.description(), - delayForAtLeast); - } - }); + params == null ? null : params.uniqueRequestId, + params == null ? null : params.ordered, + params == null + ? clockProvider.get().instant() + : clockProvider.get().instant().plus(params.delayForAtLeast)); + Transaction txn = extracted.getTransaction(); + persistor.save(txn, entry); + addPostExecutionHook(params, txn, entry); return null; })); } + private void addPostExecutionHook( + ParameterizedScheduleBuilderImpl params, Transaction txn, TransactionOutboxEntry entry) { + txn.addPostCommitHook( + () -> { + listener.scheduled(entry); + if (entry.getTopic() != null) { + log.debug("Queued {} in topic {}", entry.description(), entry.getTopic()); + } else if (params == null || params.delayForAtLeast == null) { + submitNow(entry); + log.debug("Scheduled {} for post-commit execution", entry.description()); + } else if (params.delayForAtLeast.compareTo(attemptFrequency) < 0) { + scheduler.schedule( + () -> submitNow(entry), params.delayForAtLeast.toMillis(), TimeUnit.MILLISECONDS); + log.info( + "Scheduled {} for post-commit execution after at least {}", + entry.description(), + params.delayForAtLeast); + } else { + log.info( + "Queued {} for execution after at least {}", + entry.description(), + params.delayForAtLeast); + } + }); + } + private void submitNow(TransactionOutboxEntry entry) { submitter.submit(entry, this::processNow); } @@ -363,7 +406,10 @@ private void processWithExistingLock(Transaction tx, TransactionOutboxEntry entr private void invoke(TransactionOutboxEntry entry, Transaction transaction) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { - Object instance = instantiator.getInstance(entry.getInvocation().getClassName()); + Object instance = + entry.getInvocation().getClassName().equals(LambdaRunner.class.getName()) + ? new LambdaRunner(new LambdaContext(instantiator)) + : instantiator.getInstance(entry.getInvocation().getClassName()); log.debug("Created instance {}", instance); transactionManager .injectTransaction(entry.getInvocation(), transaction) @@ -376,21 +422,27 @@ private TransactionOutboxEntry newEntry( Class[] params, Object[] args, String uniqueRequestId, - String topic) { - return TransactionOutboxEntry.builder() - .id(UUID.randomUUID().toString()) - .invocation( - new Invocation( - instantiator.getName(clazz), - methodName, - params, - args, - serializeMdc && (MDC.getMDCAdapter() != null) ? MDC.getCopyOfContextMap() : null)) - .lastAttemptTime(null) - .nextAttemptTime(clockProvider.get().instant()) - .uniqueRequestId(uniqueRequestId) - .topic(topic) - .build(); + String topic, + Instant nextAttemptTime) { + var entry = + TransactionOutboxEntry.builder() + .id(UUID.randomUUID().toString()) + .invocation( + new Invocation( + instantiator.getName(clazz), + methodName, + params, + args, + serializeMdc && (MDC.getMDCAdapter() != null) + ? MDC.getCopyOfContextMap() + : null)) + .lastAttemptTime(null) + .nextAttemptTime(nextAttemptTime) + .uniqueRequestId(uniqueRequestId) + .topic(topic) + .build(); + validator.validate(entry); + return entry; } private void pushBack(Transaction transaction, TransactionOutboxEntry entry) @@ -488,7 +540,7 @@ public T schedule(Class clazz) { if (uniqueRequestId != null && uniqueRequestId.length() > 250) { throw new IllegalArgumentException("uniqueRequestId may be up to 250 characters"); } - return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered, delayForAtLeast); + return TransactionOutboxImpl.this.schedule(clazz, this); } } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/spi/AbstractFullyQualifiedNameInstantiator.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/spi/AbstractFullyQualifiedNameInstantiator.java index d727e614..befdc945 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/spi/AbstractFullyQualifiedNameInstantiator.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/spi/AbstractFullyQualifiedNameInstantiator.java @@ -29,4 +29,10 @@ public final Object getInstance(String name) { } protected abstract Object createInstance(Class clazz); + + @SuppressWarnings("unchecked") + @Override + public T getInstance(Class clazz) { + return (T) createInstance(clazz); + } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/spi/Utils.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/spi/Utils.java index 6ac31d25..37bd4466 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/spi/Utils.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/spi/Utils.java @@ -1,5 +1,7 @@ package com.gruelbox.transactionoutbox.spi; +import static java.util.stream.Collectors.joining; + import com.gruelbox.transactionoutbox.ThrowingRunnable; import com.gruelbox.transactionoutbox.UncheckedException; import java.util.Arrays; @@ -106,4 +108,19 @@ public static void logAtLevel(Logger logger, Level level, String message, Object break; } } + + public static String stringifyArg(Object o) { + if (o == null) { + return "null"; + } + if (o.getClass().isArray()) { + return "[" + + Arrays.stream((Object[]) o).map(Utils::stringifyArg).collect(joining(", ")) + + "]"; + } + if (o instanceof String) { + return "\"" + o + "\""; + } + return o.toString(); + } } diff --git a/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestLambdaInvocations.java b/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestLambdaInvocations.java new file mode 100644 index 00000000..e6c037e7 --- /dev/null +++ b/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestLambdaInvocations.java @@ -0,0 +1,30 @@ +package com.gruelbox.transactionoutbox; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; + +@Slf4j +public class TestLambdaInvocations { + + @Test + final void lambdaInvocations() { + var txm = new StubThreadLocalTransactionManager(); + var outbox = + (TransactionOutboxImpl) + TransactionOutbox.builder() + .transactionManager(txm) + .persistor(StubPersistor.builder().build()) + .instantiator( + Instantiator.using( + clazz -> { + throw new UnsupportedOperationException(); + })) + .submitter(Submitter.withExecutor(Runnable::run)) + .build(); + + var i = 2; + var j = 3; + // TODO + txm.inTransaction(() -> outbox.schedule(ins -> log.info("Foo {}, {}", i, j))); + } +} diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java index 1eacda39..8a699b49 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java @@ -56,6 +56,45 @@ void afterEachBase() throws InterruptedException { assertTrue(singleThreadPool.awaitTermination(30, SECONDS)); } + interface ResultPoster { + void post(List result); + } + + @Test + final void lambdaInvocations() throws InterruptedException { + TransactionManager transactionManager = txManager(); + CountDownLatch successLatch = new CountDownLatch(1); + List> results = new CopyOnWriteArrayList<>(); + + var outbox = + TransactionOutbox.builder() + .transactionManager(transactionManager) + .persistor(persistor()) + .instantiator( + Instantiator.using( + clazz -> { + if (clazz == ResultPoster.class) { + return (ResultPoster) results::add; + } + throw new UnsupportedOperationException(); + })) + .submitter(Submitter.withExecutor(unreliablePool)) + .listener(new LatchListener(successLatch)) + .build(); + + var i = 2; + var j = 3; + transactionManager.inTransaction( + () -> + outbox.schedule( + ctx -> { + log.info("Received {}, {}", i, j); + ctx.getInstance(ResultPoster.class).post(List.of(i, j)); + })); + assertTrue(successLatch.await(2, SECONDS)); + assertEquals(List.of(List.of(2, 3)), results); + } + @Test final void sequencing() throws Exception { int countPerTopic = 50; From 447c56cd6338852b03c43a0ec79dbb8cc2276c9f Mon Sep 17 00:00:00 2001 From: Graham Crockford Date: Tue, 26 Mar 2024 19:08:23 +0000 Subject: [PATCH 2/2] Tweaks --- .../TransactionOutboxImpl.java | 31 ++++++++++++------- .../TestLambdaInvocations.java | 30 ------------------ 2 files changed, 20 insertions(+), 41 deletions(-) delete mode 100644 transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestLambdaInvocations.java diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java index 74c00727..ff603588 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java @@ -7,7 +7,10 @@ import com.gruelbox.transactionoutbox.spi.ProxyFactory; import com.gruelbox.transactionoutbox.spi.Utils; + +import java.lang.invoke.MethodHandle; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.time.*; import java.util.ArrayList; import java.util.List; @@ -16,6 +19,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; + +import jdk.dynalink.linker.support.Lookup; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; import lombok.Setter; @@ -29,6 +34,16 @@ @RequiredArgsConstructor(access = AccessLevel.PRIVATE) final class TransactionOutboxImpl implements TransactionOutbox, Validatable { + private static final Method RUN_SERIALIZABLE_LAMBDA; + + static { + try { + RUN_SERIALIZABLE_LAMBDA = LambdaRunner.class.getDeclaredMethod("run", SerializableLambda.class); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + } + private final TransactionManager transactionManager; private final Persistor persistor; private final Instantiator instantiator; @@ -87,9 +102,7 @@ public TransactionOutboxEntry schedule(SerializableLambda runnable) { checkInitialized(); checkThreadLocalTransactionManager(); try { - var method = LambdaRunner.class.getDeclaredMethod("run", SerializableLambda.class); - var args = new SerializableLambda[] {runnable}; - var txn = transactionManager.extractTransaction(method, args); + var txn = transactionManager.extractTransaction(RUN_SERIALIZABLE_LAMBDA, new SerializableLambda[] {runnable}); return scheduleLambda(txn.getTransaction(), runnable); } catch (Exception e) { throw new IllegalArgumentException(e); @@ -101,18 +114,14 @@ private TransactionOutboxEntry schedule(Transaction transaction, SerializableLam return scheduleLambda(transaction, runnable); } - private TransactionOutboxEntry scheduleLambda( - Transaction transaction, SerializableLambda runnable) { + private TransactionOutboxEntry scheduleLambda(Transaction transaction, SerializableLambda runnable) { try { - var clazz = LambdaRunner.class; - var method = LambdaRunner.class.getDeclaredMethod("run", SerializableLambda.class); - var args = new SerializableLambda[] {runnable}; TransactionOutboxEntry entry = newEntry( - clazz, + LambdaRunner.class, "run", - method.getParameterTypes(), - args, + new Class[] { SerializableLambda.class }, + new SerializableLambda[] {runnable}, null, null, clockProvider.get().instant()); diff --git a/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestLambdaInvocations.java b/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestLambdaInvocations.java deleted file mode 100644 index e6c037e7..00000000 --- a/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/TestLambdaInvocations.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.gruelbox.transactionoutbox; - -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.Test; - -@Slf4j -public class TestLambdaInvocations { - - @Test - final void lambdaInvocations() { - var txm = new StubThreadLocalTransactionManager(); - var outbox = - (TransactionOutboxImpl) - TransactionOutbox.builder() - .transactionManager(txm) - .persistor(StubPersistor.builder().build()) - .instantiator( - Instantiator.using( - clazz -> { - throw new UnsupportedOperationException(); - })) - .submitter(Submitter.withExecutor(Runnable::run)) - .build(); - - var i = 2; - var j = 3; - // TODO - txm.inTransaction(() -> outbox.schedule(ins -> log.info("Foo {}, {}", i, j))); - } -}