Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add ring buffer bocking queue #38

Merged
merged 7 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ You can pull it from the central Maven repositories:
<dependency>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib-v1</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
</dependency>
```

Expand All @@ -47,7 +47,7 @@ You can pull it from the central Maven repositories:
<dependency>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib-v2</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
</dependency>
```

Expand All @@ -68,12 +68,12 @@ If you want to try a snapshot version, add the following repository:

### For AWS SDK v1
```groovy
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.0.5'
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v1:1.0.6'
```

### For AWS SDK v2
```groovy
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.0.5'
implementation 'com.github.mvallim:amazon-sns-java-messaging-lib-v2:1.0.6'
```

If you want to try a snapshot version, add the following repository:
Expand Down
2 changes: 1 addition & 1 deletion amazon-sns-java-messaging-lib-template/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib</artifactId>
<version>1.0.6-SNAPSHOT</version>
<version>1.0.7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.amazon.sns.messaging.lib.concurrent;

import java.util.AbstractQueue;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;

@SuppressWarnings({ "java:S2274", "unchecked" })
public class RingBufferBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {

private static final int DEFAULT_CAPACITY = 2048;

private final Entry<E>[] buffer;

private final int capacity;

private final AtomicInteger writeSequence = new AtomicInteger(-1);

private final AtomicInteger readSequence = new AtomicInteger(0);

private final ReentrantLock reentrantLock;

private final Condition notEmpty;

private final Condition notFull;

public RingBufferBlockingQueue(final int capacity) {
this.capacity = capacity;
this.buffer = new Entry[capacity];
Arrays.setAll(buffer, p -> new Entry<>());
reentrantLock = new ReentrantLock(true);
notEmpty = reentrantLock.newCondition();
notFull = reentrantLock.newCondition();
}

public RingBufferBlockingQueue() {
this(DEFAULT_CAPACITY);
}

@SneakyThrows
private void enqueue(final E element) {
while (isFull()) {
notFull.await();
}

final int nextWriteSeq = writeSequence.get() + 1;
buffer[wrap(nextWriteSeq)].setValue(element);
writeSequence.incrementAndGet();
notEmpty.signal();
}

@SneakyThrows
private E dequeue() {
while (isEmpty()) {
notEmpty.await();
}

final E nextValue = buffer[wrap(readSequence.get())].getValue();
readSequence.incrementAndGet();
notFull.signal();
return nextValue;
}

private int wrap(final int sequence) {
return sequence % capacity;
}

@Override
public int size() {
return (writeSequence.get() - readSequence.get()) + 1;
}

@Override
public boolean isEmpty() {
return writeSequence.get() < readSequence.get();
}

public boolean isFull() {
return size() >= capacity;
}

public int writeSequence() {
return writeSequence.get();
}

public int readSequence() {
return readSequence.get();
}

@Override
@SneakyThrows
public E peek() {
if (isEmpty()) {
return null;
}

return buffer[wrap(readSequence.get())].getValue();
}

@Override
@SneakyThrows
public void put(final E element) {
try {
reentrantLock.lock();
enqueue(element);
} finally {
reentrantLock.unlock();
}
}

@Override
@SneakyThrows
public E take() {
try {
reentrantLock.lock();
return dequeue();
} finally {
reentrantLock.unlock();
}
}

@Getter
@Setter
static class Entry<E> {

private E value;

}

@Override
public boolean offer(final E e) {
throw new UnsupportedOperationException();
}

@Override
public E poll() {
throw new UnsupportedOperationException();
}

@Override
public Iterator<E> iterator() {
throw new UnsupportedOperationException();
}

@Override
public boolean add(final E e) {
throw new UnsupportedOperationException();
}

@Override
public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public int remainingCapacity() {
throw new UnsupportedOperationException();
}

@Override
public int drainTo(final Collection<? super E> c) {
throw new UnsupportedOperationException();
}

@Override
public int drainTo(final Collection<? super E> c, final int maxElements) {
throw new UnsupportedOperationException();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void shutdown() {
scheduledExecutorService.shutdown();
if (!scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS)) {
LOGGER.warn("Scheduled executor service did not terminate in the specified time.");
final List<Runnable> droppedTasks = executorService.shutdownNow();
final List<Runnable> droppedTasks = scheduledExecutorService.shutdownNow();
LOGGER.warn("Scheduled executor service was abruptly shut down. {} tasks will not be executed.", droppedTasks.size());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.amazon.sns.messaging.lib.concurrent;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;

import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.junit.jupiter.api.Test;

import com.amazon.sns.messaging.lib.model.RequestEntry;

class RingBufferBlockingQueueTest {

private final ExecutorService producer = Executors.newSingleThreadExecutor();

private final ScheduledExecutorService consumer = Executors.newSingleThreadScheduledExecutor();

@Test
void testSuccess() throws InterruptedException {
final List<RequestEntry<Integer>> requestEntriesOut = new LinkedList<>();

final RingBufferBlockingQueue<RequestEntry<Integer>> ringBlockingQueue = new RingBufferBlockingQueue<>(5120);

producer.submit(() -> {
IntStream.range(0, 100_000).forEach(value -> {
ringBlockingQueue.put(RequestEntry.<Integer>builder().withValue(value).build());
});
});

consumer.scheduleAtFixedRate(() -> {
while (!ringBlockingQueue.isEmpty()) {
final List<RequestEntry<Integer>> requestEntries = new LinkedList<>();

while ((requestEntries.size() < 10) && Objects.nonNull(ringBlockingQueue.peek())) {
requestEntries.add(ringBlockingQueue.take());
}

requestEntriesOut.addAll(requestEntries);
}
}, 0, 100L, TimeUnit.MILLISECONDS);

await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.writeSequence() == 99_999);
producer.shutdownNow();

await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.readSequence() == 100_000);
consumer.shutdownNow();

assertThat(ringBlockingQueue.isEmpty(), is(true));

assertThat(requestEntriesOut, hasSize(100_000));
requestEntriesOut.sort((a, b) -> a.getValue() - b.getValue());

for (int i = 0; i < 100_000; i++) {
assertThat(requestEntriesOut.get(i).getValue(), is(i));
}
}

}
2 changes: 1 addition & 1 deletion amazon-sns-java-messaging-lib-v1/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib</artifactId>
<version>1.0.6-SNAPSHOT</version>
<version>1.0.7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue;
import com.amazon.sns.messaging.lib.model.RequestEntry;
import com.amazon.sns.messaging.lib.model.TopicProperty;
import com.amazonaws.services.sns.AmazonSNS;
Expand Down Expand Up @@ -49,7 +49,7 @@ public AmazonSnsTemplate(final AmazonSNS amazonSNS, final TopicProperty topicPro
}

public AmazonSnsTemplate(final AmazonSNS amazonSNS, final TopicProperty topicProperty, final ObjectMapper objectMapper) {
this(amazonSNS, topicProperty, new ConcurrentHashMap<>(), new LinkedBlockingQueue<>(topicProperty.getMaximumPoolSize() * topicProperty.getMaxBatchSize()), objectMapper);
this(amazonSNS, topicProperty, new ConcurrentHashMap<>(), new RingBufferBlockingQueue<>(topicProperty.getMaximumPoolSize() * topicProperty.getMaxBatchSize()), objectMapper);
}

public AmazonSnsTemplate(final AmazonSNS amazonSNS, final TopicProperty topicProperty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

import org.apache.commons.lang3.RandomStringUtils;
Expand All @@ -39,6 +38,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue;
import com.amazon.sns.messaging.lib.core.helper.ConsumerHelper;
import com.amazon.sns.messaging.lib.model.RequestEntry;
import com.amazon.sns.messaging.lib.model.ResponseFailEntry;
Expand Down Expand Up @@ -69,7 +69,7 @@ public void before() throws Exception {
.maximumPoolSize(10)
.topicArn("arn:aws:sns:us-east-2:000000000000:topic")
.build();
snsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty, new LinkedBlockingQueue<>(1024));
snsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty, new RingBufferBlockingQueue<>(1024));
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion amazon-sns-java-messaging-lib-v2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.github.mvallim</groupId>
<artifactId>amazon-sns-java-messaging-lib</artifactId>
<version>1.0.6-SNAPSHOT</version>
<version>1.0.7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Loading
Loading