Skip to content

Commit

Permalink
Merge pull request #764 from jGauravGupta/FISH-7822
Browse files Browse the repository at this point in the history
FISH-7822 Add Support for Amazon SQS Extended Client Library
  • Loading branch information
jGauravGupta authored Mar 9, 2024
2 parents 2cce3ba + 60bfc3e commit ae251b8
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 24 deletions.
10 changes: 10 additions & 0 deletions AmazonSQS/AmazonSQSJCAAPI/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,15 @@
<version>1.12.661</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.12.661</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-extended-client-lib</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import javax.resource.ResourceException;
import javax.resource.spi.Activation;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.ConfigProperty;
import javax.resource.spi.InvalidPropertyException;
import javax.resource.spi.ResourceAdapter;

Expand Down Expand Up @@ -78,6 +79,10 @@ public class AmazonSQSActivationSpec implements ActivationSpec, AWSCredentialsPr
private String profileName;
private String roleArn;
private String roleSessionName;
private String s3BucketName;
private Integer s3SizeThreshold = 0;
private String s3KeyPrefix;
private Boolean s3FetchMessage = true;

@Override
public void validate() throws InvalidPropertyException {
Expand Down Expand Up @@ -203,7 +208,39 @@ public String getRoleSessionName() {
public void setRoleSessionName(String roleSessionName) {
this.roleSessionName = roleSessionName;
}


public String getS3BucketName() {
return s3BucketName;
}

public void setS3BucketName(String s3BucketName) {
this.s3BucketName = s3BucketName;
}

public Integer getS3SizeThreshold() {
return s3SizeThreshold;
}

public void setS3SizeThreshold(Integer s3SizeThreshold) {
this.s3SizeThreshold = s3SizeThreshold;
}

public String getS3KeyPrefix() {
return s3KeyPrefix;
}

public void setS3KeyPrefix(String s3KeyPrefix) {
this.s3KeyPrefix = s3KeyPrefix;
}

public Boolean getS3FetchMessage() {
return s3FetchMessage;
}

public void setS3FetchMessage(Boolean s3FetchMessage) {
this.s3FetchMessage = s3FetchMessage;
}

@Override
public AWSCredentials getCredentials() {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2017 Payara Foundation and/or its affiliates. All rights reserved.
* Copyright (c) 2017-2024 Payara Foundation and/or its affiliates. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
Expand Down Expand Up @@ -39,17 +39,30 @@
*/
package fish.payara.cloud.connectors.amazonsqs.api.inbound;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import fish.payara.cloud.connectors.amazonsqs.api.OnSQSMessage;
import java.io.IOException;
import java.io.StringReader;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.TimerTask;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonException;
import javax.json.JsonObject;
import javax.json.JsonReader;
import javax.json.JsonValue;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.WorkException;
Expand All @@ -59,17 +72,24 @@
* @author Steve Millidge (Payara Foundation)
*/
class SQSPoller extends TimerTask {

AmazonSQSActivationSpec spec;
BootstrapContext ctx;
MessageEndpointFactory factory;
AmazonSQS client;

private final AmazonSQSActivationSpec spec;
private final BootstrapContext ctx;
private final MessageEndpointFactory factory;
private final AmazonSQS client;
private AmazonS3 s3;
private static final String S3_BUCKET_NAME = "s3BucketName";
private static final String S3_KEY = "s3Key";
private static final Logger LOG = Logger.getLogger(SQSPoller.class.getName());

SQSPoller(AmazonSQSActivationSpec sqsSpec, BootstrapContext context, MessageEndpointFactory endpointFactory) {
spec = sqsSpec;
ctx = context;
factory = endpointFactory;
client = AmazonSQSClientBuilder.standard().withCredentials(spec).withRegion(spec.getRegion()).build();
if (spec.getS3BucketName() != null) {
s3 = AmazonS3ClientBuilder.standard().withCredentials(spec).withRegion(spec.getRegion()).build();
}
}

@Override
Expand All @@ -78,36 +98,78 @@ public void run() {
ReceiveMessageRequest rmr = new ReceiveMessageRequest(spec.getQueueURL());
rmr.setMaxNumberOfMessages(spec.getMaxMessages());
rmr.setVisibilityTimeout(spec.getVisibilityTimeout());
rmr.setWaitTimeSeconds(spec.getPollInterval()/1000);
rmr.setWaitTimeSeconds(spec.getPollInterval() / 1000);
rmr.setAttributeNames(Arrays.asList(spec.getAttributeNames().split(",")));
rmr.setMessageAttributeNames(Arrays.asList(spec.getMessageAttributeNames().split(",")));
ReceiveMessageResult rmResult = client.receiveMessage(rmr);
if (!rmResult.getMessages().isEmpty()) {

Class<?> mdbClass = factory.getEndpointClass();
for (Message message : rmResult.getMessages()) {
for (Method m : mdbClass.getMethods()) {
if (m.isAnnotationPresent(OnSQSMessage.class) && m.getParameterCount() == 1 && m.getParameterTypes()[0].equals(Message.class)) {
try {
ctx.getWorkManager().scheduleWork(new SQSWork(client, factory, m, message,spec.getQueueURL()));
} catch (WorkException ex) {
Logger.getLogger(AmazonSQSResourceAdapter.class.getName()).log(Level.SEVERE, null, ex);
}
if (isOnSQSMessageMethod(m) && shouldFetchS3Message(message)) {
fetchS3MessageContent(message);
scheduleSQSWork(m, message);
}
}

}
}
} catch (IllegalStateException ise) {
// Fix #29 ensure Illegal State Exception doesn't blow up the timer
Logger.getLogger(AmazonSQSResourceAdapter.class.getName()).log(Level.WARNING, "Poller caught an Illegal State Exception", ise);
} catch(Exception e) {
Logger.getLogger(AmazonSQSResourceAdapter.class.getName()).log(Level.WARNING, "Poller caught an Unexpected Exception", e);
LOG.log(Level.WARNING, "Poller caught an Illegal State Exception", ise);
} catch (Exception e) {
LOG.log(Level.WARNING, "Poller caught an Unexpected Exception", e);
}
}

private boolean isOnSQSMessageMethod(Method method) {
return method.isAnnotationPresent(OnSQSMessage.class)
&& method.getParameterCount() == 1
&& method.getParameterTypes()[0].equals(Message.class);
}

private boolean shouldFetchS3Message(Message message) {
return s3 != null
&& Boolean.TRUE.equals(spec.getS3FetchMessage())
&& message.getBody().contains(S3_BUCKET_NAME);
}

private void fetchS3MessageContent(Message message) throws IOException {
try (JsonReader jsonReader = Json.createReader(new StringReader(message.getBody()))) {
JsonArray jsonArray = jsonReader.readArray();
for (JsonValue jsonValue : jsonArray) {
if (jsonValue instanceof JsonObject) {
JsonObject jsonBody = (JsonObject) jsonValue;
String s3BucketName = jsonBody.getString(S3_BUCKET_NAME);
String s3Key = jsonBody.getString(S3_KEY);
LOG.log(Level.FINE, "S3 object received, S3 bucket name: {0}, S3 object key:{1}", new Object[]{s3BucketName, s3Key});
GetObjectRequest getObjectRequest = new GetObjectRequest(s3BucketName, s3Key);
S3Object s3Object = s3.getObject(getObjectRequest);
try (S3ObjectInputStream objectInputStream = s3Object.getObjectContent()) {
byte[] buffer = new byte[1024];
int bytesRead;
StringBuilder content = new StringBuilder();
while ((bytesRead = objectInputStream.read(buffer)) != -1) {
content.append(new String(buffer, 0, bytesRead));
}
message.setBody(content.toString());
}
}
}
} catch (JsonException e) {
LOG.log(Level.WARNING, "Error parsing S3 message metadata JSON", e);
}
}

private void scheduleSQSWork(Method method, Message message) {
try {
ctx.getWorkManager().scheduleWork(new SQSWork(client, factory, method, message, spec.getQueueURL()));
} catch (WorkException ex) {
Logger.getLogger(AmazonSQSResourceAdapter.class.getName()).log(Level.SEVERE, null, ex);
}
}

void stop() {
client.shutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
*/
package fish.payara.cloud.connectors.amazonsqs.api.outbound;

import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient;
import com.amazon.sqs.javamessaging.ExtendedClientConfiguration;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ResponseMetadata;
import com.amazonaws.auth.AWSCredentials;
Expand All @@ -47,6 +49,8 @@
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.AddPermissionRequest;
Expand Down Expand Up @@ -127,11 +131,25 @@ public class AmazonSQSManagedConnection implements ManagedConnection, AmazonSQSC
private final HashSet<ConnectionEventListener> listeners = new HashSet<>();
private PrintWriter logWriter;
private final AmazonSQS sqsClient;
private final AmazonSQSExtendedClient sqsExtClient;

AmazonSQSManagedConnection(Subject subject, ConnectionRequestInfo cxRequestInfo, AmazonSQSManagedConnectionFactory aThis) {

AWSCredentialsProvider credentialsProvider = getCredentials(aThis);
sqsClient = AmazonSQSClientBuilder.standard().withRegion(aThis.getRegion()).withCredentials(credentialsProvider).build();

ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration();
if (aThis.getS3BucketName() != null) {
final AmazonS3 s3 = AmazonS3ClientBuilder.standard().withRegion(aThis.getRegion()).withCredentials(credentialsProvider).build();
extendedClientConfig = extendedClientConfig.withPayloadSupportEnabled(s3, aThis.getS3BucketName());
if (aThis.getS3SizeThreshold() != null && aThis.getS3SizeThreshold() > 0) {
extendedClientConfig = extendedClientConfig.withPayloadSizeThreshold(aThis.getS3SizeThreshold());
}
if (aThis.getS3KeyPrefix() != null) {
extendedClientConfig = extendedClientConfig.withS3KeyPrefix(aThis.getS3KeyPrefix());
}
}
sqsExtClient = new AmazonSQSExtendedClient(sqsClient, extendedClientConfig);
}

@Override
Expand Down Expand Up @@ -224,22 +242,36 @@ void removeHandle(AmazonSQSConnection connection) {

@Override
public SendMessageResult sendMessage(SendMessageRequest request) {
return sqsClient.sendMessage(request);
if (isLargeMessage(request.getMessageBody())) {
return sqsExtClient.sendMessage(request);
} else {
return sqsClient.sendMessage(request);
}
}

@Override
public SendMessageResult sendMessage(String queueURL, String messageBody) {
return sqsClient.sendMessage(queueURL, messageBody);
if (isLargeMessage(messageBody)) {
return sqsExtClient.sendMessage(queueURL, messageBody);
} else {
return sqsClient.sendMessage(queueURL, messageBody);
}
}

@Override
public SendMessageBatchResult sendMessageBatch(String queueURL, List<SendMessageBatchRequestEntry> entries) {
return sqsClient.sendMessageBatch(queueURL, entries);
boolean largeMessageFound = entries.stream().anyMatch(entry -> isLargeMessage(entry.getMessageBody()));
return largeMessageFound ? sqsExtClient.sendMessageBatch(queueURL, entries) : sqsClient.sendMessageBatch(queueURL, entries);
}

@Override
public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest batch) {
return sqsClient.sendMessageBatch(batch);
boolean largeMessageFound = batch.getEntries().stream().anyMatch(entry -> isLargeMessage(entry.getMessageBody()));
return largeMessageFound ? sqsExtClient.sendMessageBatch(batch) : sqsClient.sendMessageBatch(batch);
}

private boolean isLargeMessage(String messageBody) {
return messageBody.length() > 256 * 1024; // 256KB
}

@Override
Expand Down Expand Up @@ -372,58 +404,75 @@ public UntagQueueResult untagQueue(String queueUrl, List<String> tagKeys) {
return sqsClient.untagQueue(queueUrl, tagKeys);
}

@Override
public AddPermissionResult addPermission(AddPermissionRequest addPermissionRequest) {
return sqsClient.addPermission(addPermissionRequest);
}

@Override
public AddPermissionResult addPermission(String queueUrl, String label, List<String> awsAccountIds, List<String> actions) {
return sqsClient.addPermission(queueUrl, label, awsAccountIds, actions);
}

@Override
public CancelMessageMoveTaskResult cancelMessageMoveTask(CancelMessageMoveTaskRequest cancelMessageMoveTaskRequest) {
return sqsClient.cancelMessageMoveTask(cancelMessageMoveTaskRequest);
}

@Override
public ChangeMessageVisibilityResult changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) {
return sqsClient.changeMessageVisibility(changeMessageVisibilityRequest);
}

@Override
public ChangeMessageVisibilityResult changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout) {
return sqsClient.changeMessageVisibility(queueUrl, receiptHandle, visibilityTimeout);
}

@Override
public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) {
return sqsClient.changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest);
}

@Override
public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(String queueUrl, List<ChangeMessageVisibilityBatchRequestEntry> entries) {
return sqsClient.changeMessageVisibilityBatch(queueUrl, entries);
}

@Override
public ListDeadLetterSourceQueuesResult listDeadLetterSourceQueues(ListDeadLetterSourceQueuesRequest listDeadLetterSourceQueuesRequest) {
return sqsClient.listDeadLetterSourceQueues(listDeadLetterSourceQueuesRequest);
}

@Override
public ListMessageMoveTasksResult listMessageMoveTasks(ListMessageMoveTasksRequest listMessageMoveTasksRequest) {
return sqsClient.listMessageMoveTasks(listMessageMoveTasksRequest);
}

@Override
public RemovePermissionResult removePermission(RemovePermissionRequest removePermissionRequest) {
return sqsClient.removePermission(removePermissionRequest);
}

@Override
public RemovePermissionResult removePermission(String queueUrl, String label) {
return sqsClient.removePermission(queueUrl, label);
}

@Override
public StartMessageMoveTaskResult startMessageMoveTask(StartMessageMoveTaskRequest startMessageMoveTaskRequest) {
return sqsClient.startMessageMoveTask(startMessageMoveTaskRequest);
}

@Override
public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest amazonWebServiceRequest) {
return sqsClient.getCachedResponseMetadata(amazonWebServiceRequest);
}

public AmazonSQS getSqsClient() {
return sqsClient;
}

@Override
public void close() throws Exception {
destroy();
Expand Down
Loading

0 comments on commit ae251b8

Please sign in to comment.