From 022e53599e619480fda5a0230199afc4e6357f01 Mon Sep 17 00:00:00 2001 From: Gaurav Gupta Date: Fri, 8 Mar 2024 20:47:21 +0530 Subject: [PATCH 1/4] FISH-7822 Add Support for Amazon SQS Extended Client Library --- AmazonSQS/AmazonSQSJCAAPI/pom.xml | 10 ++ .../api/inbound/AmazonSQSActivationSpec.java | 39 +++++++- .../amazonsqs/api/inbound/SQSPoller.java | 96 +++++++++++++++---- .../outbound/AmazonSQSManagedConnection.java | 57 ++++++++++- .../AmazonSQSManagedConnectionFactory.java | 44 +++++++++ AmazonSQS/AmazonSQSRAR/pom.xml | 10 ++ 6 files changed, 232 insertions(+), 24 deletions(-) diff --git a/AmazonSQS/AmazonSQSJCAAPI/pom.xml b/AmazonSQS/AmazonSQSJCAAPI/pom.xml index 911afbd1..3fbb79dc 100644 --- a/AmazonSQS/AmazonSQSJCAAPI/pom.xml +++ b/AmazonSQS/AmazonSQSJCAAPI/pom.xml @@ -33,5 +33,15 @@ 1.12.661 jar + + com.amazonaws + aws-java-sdk-s3 + 1.12.661 + + + com.amazonaws + amazon-sqs-java-extended-client-lib + 1.2.5 + diff --git a/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/AmazonSQSActivationSpec.java b/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/AmazonSQSActivationSpec.java index b0fddcd7..d782f846 100644 --- a/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/AmazonSQSActivationSpec.java +++ b/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/AmazonSQSActivationSpec.java @@ -51,6 +51,7 @@ import javax.resource.ResourceException; import javax.resource.spi.Activation; import javax.resource.spi.ActivationSpec; +import javax.resource.spi.ConfigProperty; import javax.resource.spi.InvalidPropertyException; import javax.resource.spi.ResourceAdapter; @@ -78,6 +79,10 @@ public class AmazonSQSActivationSpec implements ActivationSpec, AWSCredentialsPr private String profileName; private String roleArn; private String roleSessionName; + private String s3BucketName; + private Integer s3SizeThreshold = 0; + private String s3KeyPrefix; + private Boolean s3FetchMessage = true; @Override public void validate() throws InvalidPropertyException { @@ -203,7 +208,39 @@ public String getRoleSessionName() { public void setRoleSessionName(String roleSessionName) { this.roleSessionName = roleSessionName; } - + + public String getS3BucketName() { + return s3BucketName; + } + + public void setS3BucketName(String s3BucketName) { + this.s3BucketName = s3BucketName; + } + + public Integer getS3SizeThreshold() { + return s3SizeThreshold; + } + + public void setS3SizeThreshold(Integer s3SizeThreshold) { + this.s3SizeThreshold = s3SizeThreshold; + } + + public String getS3KeyPrefix() { + return s3KeyPrefix; + } + + public void setS3KeyPrefix(String s3KeyPrefix) { + this.s3KeyPrefix = s3KeyPrefix; + } + + public Boolean getS3FetchMessage() { + return s3FetchMessage; + } + + public void setS3FetchMessage(Boolean s3FetchMessage) { + this.s3FetchMessage = s3FetchMessage; + } + @Override public AWSCredentials getCredentials() { diff --git a/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java b/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java index 6ba83d18..1cde8435 100644 --- a/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java +++ b/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java @@ -1,7 +1,7 @@ /* * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. * - * Copyright (c) 2017 Payara Foundation and/or its affiliates. All rights reserved. + * Copyright (c) 2017-2024 Payara Foundation and/or its affiliates. All rights reserved. * * The contents of this file are subject to the terms of either the GNU * General Public License Version 2 only ("GPL") or the Common Development @@ -39,17 +39,29 @@ */ package fish.payara.cloud.connectors.amazonsqs.api.inbound; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; import fish.payara.cloud.connectors.amazonsqs.api.OnSQSMessage; +import java.io.IOException; +import java.io.StringReader; import java.lang.reflect.Method; import java.util.Arrays; import java.util.TimerTask; import java.util.logging.Level; import java.util.logging.Logger; +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonException; +import javax.json.JsonObject; +import javax.json.JsonReader; +import javax.json.JsonValue; import javax.resource.spi.BootstrapContext; import javax.resource.spi.endpoint.MessageEndpointFactory; import javax.resource.spi.work.WorkException; @@ -59,11 +71,15 @@ * @author Steve Millidge (Payara Foundation) */ class SQSPoller extends TimerTask { - - AmazonSQSActivationSpec spec; - BootstrapContext ctx; - MessageEndpointFactory factory; - AmazonSQS client; + + private final AmazonSQSActivationSpec spec; + private final BootstrapContext ctx; + private final MessageEndpointFactory factory; + private final AmazonSQS client; + private AmazonS3 s3; + private static final String S3_BUCKET_NAME_KEY = "s3BucketName"; + private static final String S3_KEY_KEY = "s3Key"; + private static final Logger LOG = Logger.getLogger(SQSPoller.class.getName()); SQSPoller(AmazonSQSActivationSpec sqsSpec, BootstrapContext context, MessageEndpointFactory endpointFactory) { spec = sqsSpec; @@ -78,36 +94,78 @@ public void run() { ReceiveMessageRequest rmr = new ReceiveMessageRequest(spec.getQueueURL()); rmr.setMaxNumberOfMessages(spec.getMaxMessages()); rmr.setVisibilityTimeout(spec.getVisibilityTimeout()); - rmr.setWaitTimeSeconds(spec.getPollInterval()/1000); + rmr.setWaitTimeSeconds(spec.getPollInterval() / 1000); rmr.setAttributeNames(Arrays.asList(spec.getAttributeNames().split(","))); rmr.setMessageAttributeNames(Arrays.asList(spec.getMessageAttributeNames().split(","))); ReceiveMessageResult rmResult = client.receiveMessage(rmr); if (!rmResult.getMessages().isEmpty()) { - Class mdbClass = factory.getEndpointClass(); for (Message message : rmResult.getMessages()) { for (Method m : mdbClass.getMethods()) { - if (m.isAnnotationPresent(OnSQSMessage.class) && m.getParameterCount() == 1 && m.getParameterTypes()[0].equals(Message.class)) { - try { - ctx.getWorkManager().scheduleWork(new SQSWork(client, factory, m, message,spec.getQueueURL())); - } catch (WorkException ex) { - Logger.getLogger(AmazonSQSResourceAdapter.class.getName()).log(Level.SEVERE, null, ex); - } + if (isOnSQSMessageMethod(m, message) && shouldFetchS3Message(message)) { + fetchS3MessageContent(message); + scheduleSQSWork(m, message); } } - } } } catch (IllegalStateException ise) { // Fix #29 ensure Illegal State Exception doesn't blow up the timer - Logger.getLogger(AmazonSQSResourceAdapter.class.getName()).log(Level.WARNING, "Poller caught an Illegal State Exception", ise); - } catch(Exception e) { - Logger.getLogger(AmazonSQSResourceAdapter.class.getName()).log(Level.WARNING, "Poller caught an Unexpected Exception", e); + LOG.log(Level.WARNING, "Poller caught an Illegal State Exception", ise); + } catch (Exception e) { + LOG.log(Level.WARNING, "Poller caught an Unexpected Exception", e); + } + } + + private boolean isOnSQSMessageMethod(Method method, Message message) { + return method.isAnnotationPresent(OnSQSMessage.class) + && method.getParameterCount() == 1 + && method.getParameterTypes()[0].equals(Message.class); + } + + private boolean shouldFetchS3Message(Message message) { + return s3 != null + && Boolean.TRUE.equals(spec.getS3FetchMessage()) + && message.getBody().contains(S3_BUCKET_NAME_KEY); + } + + private void fetchS3MessageContent(Message message) throws IOException { + try (JsonReader jsonReader = Json.createReader(new StringReader(message.getBody()))) { + JsonArray jsonArray = jsonReader.readArray(); + for (JsonValue jsonValue : jsonArray) { + if (jsonValue instanceof JsonObject) { + JsonObject jsonBody = (JsonObject) jsonValue; + String s3BucketName = jsonBody.getString(S3_BUCKET_NAME_KEY); + String s3Key = jsonBody.getString(S3_KEY_KEY); + LOG.log(Level.FINE, "S3 object received, S3 bucket name: {0}, S3 object key:{1}", new Object[]{s3BucketName, s3Key}); + GetObjectRequest getObjectRequest = new GetObjectRequest(s3BucketName, s3Key); + S3Object s3Object = s3.getObject(getObjectRequest); + try (S3ObjectInputStream objectInputStream = s3Object.getObjectContent()) { + byte[] buffer = new byte[1024]; + int bytesRead; + StringBuilder content = new StringBuilder(); + while ((bytesRead = objectInputStream.read(buffer)) != -1) { + content.append(new String(buffer, 0, bytesRead)); + } + message.setBody(content.toString()); + } + } + } + } catch (JsonException e) { + LOG.log(Level.WARNING, "Error parsing S3 message metadata JSON", e); + } + } + + private void scheduleSQSWork(Method method, Message message) { + try { + ctx.getWorkManager().scheduleWork(new SQSWork(client, factory, method, message, spec.getQueueURL())); + } catch (WorkException ex) { + Logger.getLogger(AmazonSQSResourceAdapter.class.getName()).log(Level.SEVERE, null, ex); } } void stop() { client.shutdown(); } - + } diff --git a/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/outbound/AmazonSQSManagedConnection.java b/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/outbound/AmazonSQSManagedConnection.java index 0a80c453..b7b362d6 100644 --- a/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/outbound/AmazonSQSManagedConnection.java +++ b/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/outbound/AmazonSQSManagedConnection.java @@ -39,6 +39,8 @@ */ package fish.payara.cloud.connectors.amazonsqs.api.outbound; +import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; +import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import com.amazonaws.AmazonWebServiceRequest; import com.amazonaws.ResponseMetadata; import com.amazonaws.auth.AWSCredentials; @@ -47,6 +49,8 @@ import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.AddPermissionRequest; @@ -127,11 +131,25 @@ public class AmazonSQSManagedConnection implements ManagedConnection, AmazonSQSC private final HashSet listeners = new HashSet<>(); private PrintWriter logWriter; private final AmazonSQS sqsClient; + private final AmazonSQSExtendedClient sqsExtClient; AmazonSQSManagedConnection(Subject subject, ConnectionRequestInfo cxRequestInfo, AmazonSQSManagedConnectionFactory aThis) { AWSCredentialsProvider credentialsProvider = getCredentials(aThis); sqsClient = AmazonSQSClientBuilder.standard().withRegion(aThis.getRegion()).withCredentials(credentialsProvider).build(); + + ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration(); + if (aThis.getS3BucketName() != null) { + final AmazonS3 s3 = AmazonS3ClientBuilder.standard().withRegion(aThis.getRegion()).withCredentials(credentialsProvider).build(); + extendedClientConfig = extendedClientConfig.withPayloadSupportEnabled(s3, aThis.getS3BucketName()); + if (aThis.getS3SizeThreshold() != null && aThis.getS3SizeThreshold() > 0) { + extendedClientConfig = extendedClientConfig.withPayloadSizeThreshold(aThis.getS3SizeThreshold()); + } + if (aThis.getS3KeyPrefix() != null) { + extendedClientConfig = extendedClientConfig.withS3KeyPrefix(aThis.getS3KeyPrefix()); + } + } + sqsExtClient = new AmazonSQSExtendedClient(sqsClient, extendedClientConfig); } @Override @@ -224,22 +242,36 @@ void removeHandle(AmazonSQSConnection connection) { @Override public SendMessageResult sendMessage(SendMessageRequest request) { - return sqsClient.sendMessage(request); + if (isLargeMessage(request.getMessageBody())) { + return sqsExtClient.sendMessage(request); + } else { + return sqsClient.sendMessage(request); + } } @Override public SendMessageResult sendMessage(String queueURL, String messageBody) { - return sqsClient.sendMessage(queueURL, messageBody); + if (isLargeMessage(messageBody)) { + return sqsExtClient.sendMessage(queueURL, messageBody); + } else { + return sqsClient.sendMessage(queueURL, messageBody); + } } @Override public SendMessageBatchResult sendMessageBatch(String queueURL, List entries) { - return sqsClient.sendMessageBatch(queueURL, entries); + boolean largeMessageFound = entries.stream().anyMatch(entry -> isLargeMessage(entry.getMessageBody())); + return largeMessageFound ? sqsExtClient.sendMessageBatch(queueURL, entries) : sqsClient.sendMessageBatch(queueURL, entries); } @Override public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest batch) { - return sqsClient.sendMessageBatch(batch); + boolean largeMessageFound = batch.getEntries().stream().anyMatch(entry -> isLargeMessage(entry.getMessageBody())); + return largeMessageFound ? sqsExtClient.sendMessageBatch(batch) : sqsClient.sendMessageBatch(batch); + } + + private boolean isLargeMessage(String messageBody) { + return messageBody.length() > 256 * 1024; // 256KB } @Override @@ -372,58 +404,75 @@ public UntagQueueResult untagQueue(String queueUrl, List tagKeys) { return sqsClient.untagQueue(queueUrl, tagKeys); } + @Override public AddPermissionResult addPermission(AddPermissionRequest addPermissionRequest) { return sqsClient.addPermission(addPermissionRequest); } + @Override public AddPermissionResult addPermission(String queueUrl, String label, List awsAccountIds, List actions) { return sqsClient.addPermission(queueUrl, label, awsAccountIds, actions); } + @Override public CancelMessageMoveTaskResult cancelMessageMoveTask(CancelMessageMoveTaskRequest cancelMessageMoveTaskRequest) { return sqsClient.cancelMessageMoveTask(cancelMessageMoveTaskRequest); } + @Override public ChangeMessageVisibilityResult changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) { return sqsClient.changeMessageVisibility(changeMessageVisibilityRequest); } + @Override public ChangeMessageVisibilityResult changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout) { return sqsClient.changeMessageVisibility(queueUrl, receiptHandle, visibilityTimeout); } + @Override public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) { return sqsClient.changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest); } + @Override public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(String queueUrl, List entries) { return sqsClient.changeMessageVisibilityBatch(queueUrl, entries); } + @Override public ListDeadLetterSourceQueuesResult listDeadLetterSourceQueues(ListDeadLetterSourceQueuesRequest listDeadLetterSourceQueuesRequest) { return sqsClient.listDeadLetterSourceQueues(listDeadLetterSourceQueuesRequest); } + @Override public ListMessageMoveTasksResult listMessageMoveTasks(ListMessageMoveTasksRequest listMessageMoveTasksRequest) { return sqsClient.listMessageMoveTasks(listMessageMoveTasksRequest); } + @Override public RemovePermissionResult removePermission(RemovePermissionRequest removePermissionRequest) { return sqsClient.removePermission(removePermissionRequest); } + @Override public RemovePermissionResult removePermission(String queueUrl, String label) { return sqsClient.removePermission(queueUrl, label); } + @Override public StartMessageMoveTaskResult startMessageMoveTask(StartMessageMoveTaskRequest startMessageMoveTaskRequest) { return sqsClient.startMessageMoveTask(startMessageMoveTaskRequest); } + @Override public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest amazonWebServiceRequest) { return sqsClient.getCachedResponseMetadata(amazonWebServiceRequest); } + public AmazonSQS getSqsClient() { + return sqsClient; + } + @Override public void close() throws Exception { destroy(); diff --git a/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/outbound/AmazonSQSManagedConnectionFactory.java b/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/outbound/AmazonSQSManagedConnectionFactory.java index 099c0bf0..0aa4bbd1 100644 --- a/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/outbound/AmazonSQSManagedConnectionFactory.java +++ b/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/outbound/AmazonSQSManagedConnectionFactory.java @@ -82,6 +82,18 @@ public class AmazonSQSManagedConnectionFactory implements ManagedConnectionFacto @ConfigProperty(description = "AWS Session name", type = String.class) private String roleSessionName; + @ConfigProperty(description = "AWS S3 bucket name", type = String.class) + private String s3BucketName; + + @ConfigProperty(description = "AWS S3 size threshold", type = Integer.class, defaultValue ="0") + private Integer s3SizeThreshold; + + @ConfigProperty(description = "AWS S3 key prefix", type = String.class) + private String s3KeyPrefix; + + @ConfigProperty(description = "AWS S3 fetch message", type = Boolean.class, defaultValue ="true") + private Boolean s3FetchMessage; + private PrintWriter logger; public String getAwsSecretKey() { @@ -132,6 +144,38 @@ public void setRoleSessionName(String roleSessionName) { this.roleSessionName = roleSessionName; } + public String getS3BucketName() { + return s3BucketName; + } + + public void setS3BucketName(String s3BucketName) { + this.s3BucketName = s3BucketName; + } + + public Integer getS3SizeThreshold() { + return s3SizeThreshold; + } + + public void setS3SizeThreshold(Integer s3SizeThreshold) { + this.s3SizeThreshold = s3SizeThreshold; + } + + public String getS3KeyPrefix() { + return s3KeyPrefix; + } + + public void setS3KeyPrefix(String s3KeyPrefix) { + this.s3KeyPrefix = s3KeyPrefix; + } + + public Boolean getS3FetchMessage() { + return s3FetchMessage; + } + + public void setS3FetchMessage(Boolean s3FetchMessage) { + this.s3FetchMessage = s3FetchMessage; + } + @Override public Object createConnectionFactory(ConnectionManager cxManager) throws ResourceException { return new AmazonSQSConnectionFactoryImpl(cxManager, this); diff --git a/AmazonSQS/AmazonSQSRAR/pom.xml b/AmazonSQS/AmazonSQSRAR/pom.xml index a328c614..83fca481 100644 --- a/AmazonSQS/AmazonSQSRAR/pom.xml +++ b/AmazonSQS/AmazonSQSRAR/pom.xml @@ -37,5 +37,15 @@ 1.12.661 jar + + com.amazonaws + aws-java-sdk-s3 + 1.12.661 + + + com.amazonaws + amazon-sqs-java-extended-client-lib + 1.2.5 + \ No newline at end of file From 02432d258f7b355fd196c2118d29e112d226f94d Mon Sep 17 00:00:00 2001 From: Gaurav Gupta Date: Fri, 8 Mar 2024 22:44:42 +0530 Subject: [PATCH 2/4] FISH-7822 Polish --- .../cloud/connectors/amazonsqs/api/inbound/SQSPoller.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java b/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java index 1cde8435..83218e8c 100644 --- a/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java +++ b/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java @@ -40,6 +40,7 @@ package fish.payara.cloud.connectors.amazonsqs.api.inbound; import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; @@ -86,6 +87,9 @@ class SQSPoller extends TimerTask { ctx = context; factory = endpointFactory; client = AmazonSQSClientBuilder.standard().withCredentials(spec).withRegion(spec.getRegion()).build(); + if (spec.getS3BucketName() != null) { + s3 = AmazonS3ClientBuilder.standard().withCredentials(spec).withRegion(spec.getRegion()).build(); + } } @Override @@ -102,7 +106,7 @@ public void run() { Class mdbClass = factory.getEndpointClass(); for (Message message : rmResult.getMessages()) { for (Method m : mdbClass.getMethods()) { - if (isOnSQSMessageMethod(m, message) && shouldFetchS3Message(message)) { + if (isOnSQSMessageMethod(m) && shouldFetchS3Message(message)) { fetchS3MessageContent(message); scheduleSQSWork(m, message); } @@ -117,7 +121,7 @@ public void run() { } } - private boolean isOnSQSMessageMethod(Method method, Message message) { + private boolean isOnSQSMessageMethod(Method method) { return method.isAnnotationPresent(OnSQSMessage.class) && method.getParameterCount() == 1 && method.getParameterTypes()[0].equals(Message.class); From 1db21e2c74a1ca79fc71d357902c1cc8255f34ac Mon Sep 17 00:00:00 2001 From: Gaurav Gupta Date: Sat, 9 Mar 2024 13:03:08 +0530 Subject: [PATCH 3/4] Update AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java Co-authored-by: Alfonso Valdez --- .../cloud/connectors/amazonsqs/api/inbound/SQSPoller.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java b/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java index 83218e8c..a8bd8115 100644 --- a/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java +++ b/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java @@ -79,7 +79,7 @@ class SQSPoller extends TimerTask { private final AmazonSQS client; private AmazonS3 s3; private static final String S3_BUCKET_NAME_KEY = "s3BucketName"; - private static final String S3_KEY_KEY = "s3Key"; + private static final String S3_KEY = "s3Key"; private static final Logger LOG = Logger.getLogger(SQSPoller.class.getName()); SQSPoller(AmazonSQSActivationSpec sqsSpec, BootstrapContext context, MessageEndpointFactory endpointFactory) { From 60bfc3e318f67a54a6a36a6d307f343657ba24e2 Mon Sep 17 00:00:00 2001 From: Gaurav Gupta Date: Sat, 9 Mar 2024 13:09:10 +0530 Subject: [PATCH 4/4] FISH-7822 Naming --- .../cloud/connectors/amazonsqs/api/inbound/SQSPoller.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java b/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java index a8bd8115..95de59cb 100644 --- a/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java +++ b/AmazonSQS/AmazonSQSJCAAPI/src/main/java/fish/payara/cloud/connectors/amazonsqs/api/inbound/SQSPoller.java @@ -78,7 +78,7 @@ class SQSPoller extends TimerTask { private final MessageEndpointFactory factory; private final AmazonSQS client; private AmazonS3 s3; - private static final String S3_BUCKET_NAME_KEY = "s3BucketName"; + private static final String S3_BUCKET_NAME = "s3BucketName"; private static final String S3_KEY = "s3Key"; private static final Logger LOG = Logger.getLogger(SQSPoller.class.getName()); @@ -130,7 +130,7 @@ private boolean isOnSQSMessageMethod(Method method) { private boolean shouldFetchS3Message(Message message) { return s3 != null && Boolean.TRUE.equals(spec.getS3FetchMessage()) - && message.getBody().contains(S3_BUCKET_NAME_KEY); + && message.getBody().contains(S3_BUCKET_NAME); } private void fetchS3MessageContent(Message message) throws IOException { @@ -139,8 +139,8 @@ private void fetchS3MessageContent(Message message) throws IOException { for (JsonValue jsonValue : jsonArray) { if (jsonValue instanceof JsonObject) { JsonObject jsonBody = (JsonObject) jsonValue; - String s3BucketName = jsonBody.getString(S3_BUCKET_NAME_KEY); - String s3Key = jsonBody.getString(S3_KEY_KEY); + String s3BucketName = jsonBody.getString(S3_BUCKET_NAME); + String s3Key = jsonBody.getString(S3_KEY); LOG.log(Level.FINE, "S3 object received, S3 bucket name: {0}, S3 object key:{1}", new Object[]{s3BucketName, s3Key}); GetObjectRequest getObjectRequest = new GetObjectRequest(s3BucketName, s3Key); S3Object s3Object = s3.getObject(getObjectRequest);