Skip to content

Commit

Permalink
Adds rate limiting actor
Browse files Browse the repository at this point in the history
  • Loading branch information
Luca Venturi committed Apr 7, 2023
1 parent a04a071 commit 747af80
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
37 changes: 37 additions & 0 deletions src/main/java/eu/lucaventuri/fibry/Stereotypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,43 @@ public Actor<HttpUrlDownload<byte[]>, Void, Void> binaryDownloader() {
return binaryDownloader(new Scheduler(), 30, true);
}

public <T> BaseActor<T, Void, Void> rateLimited(Consumer<T> actorLogic, int msBetweenCalls) {
NamedStateActorCreator<Void> config = anonymous().initialState(null);

Consumer<T> rateLimitedLogic = value -> {
long start = System.currentTimeMillis();

actorLogic.accept(value);

long end = System.currentTimeMillis() - start;

if (msBetweenCalls > (end - start))
SystemUtils.sleep(msBetweenCalls - (end - start));
};

return config.newActor(rateLimitedLogic);
}

public <T, R> BaseActor<T, R, Void> rateLimitedReturn(Function<T, R> actorLogic, int msBetweenCalls) {
NamedStateActorCreator<Void> config = anonymous().initialState(null);

Function<T, R> rateLimitedLogic = value -> {
long start = System.currentTimeMillis();

R result = actorLogic.apply(value);

long end = System.currentTimeMillis();

if (msBetweenCalls > (end - start)) {
SystemUtils.sleep(msBetweenCalls - (end - start));
}

return result;
};

return config.newActorWithReturn(rateLimitedLogic);
}

/**
* Process messages in batches, after grouping by the key and counting how many messages are received.
* e.g. ['a', 'a', 'b', 'c', 'a'] would generate ['a':3, 'b':1, 'c':1]
Expand Down
14 changes: 14 additions & 0 deletions src/test/java/eu/lucaventuri/fibry/TestStereotypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -816,4 +816,18 @@ public void testBytesDownloader() throws IOException, InterruptedException {
server.stop(0);
}
}

@Test
public void testRateLimit() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();

try (var actor = Stereotypes.auto().rateLimitedReturn(item -> 0, 20)) {
actor.sendMessageReturn("A");
actor.sendMessageReturn("B");
actor.sendMessageReturn("C").get();
}

var time = System.currentTimeMillis() - start;
assertTrue(time >= 60 );
}
}

0 comments on commit 747af80

Please sign in to comment.