diff --git a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java index 8093f04..3248f08 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java +++ b/src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java @@ -113,7 +113,7 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep if (sqsMessageBody == null || sqsMessageBody.isEmpty()) { throw new JMSException("Message body cannot be null or empty"); } - Map messageAttributes = propertyToMessageAttribute((SQSMessage) message); + Map messageAttributes = propertyToMessageAttribute(message); /** * These will override existing attributes if they exist. Everything that @@ -127,9 +127,7 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep SendMessageRequest sendMessageRequest = new SendMessageRequest(queue.getQueueUrl(), sqsMessageBody); sendMessageRequest.setMessageAttributes(messageAttributes); - if (deliveryDelaySeconds != 0) { - sendMessageRequest.setDelaySeconds(deliveryDelaySeconds); - } + sendMessageRequest.setDelaySeconds(calculateDelaySeconds(message)); //for FIFO queues, we have to specify both MessageGroupId, which we obtain from standard property JMSX_GROUP_ID //and MessageDeduplicationId, which we obtain from a custom provider specific property JMS_SQS_DEDUPLICATION_ID @@ -154,6 +152,19 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep } } + private int calculateDelaySeconds(SQSMessage message) throws JMSException { + try { + long scheduledDelay = message.getLongProperty(SQSMessage.SQS_SCHEDULED_DELAY); + return convertDeliveryDelayToSeconds(scheduledDelay); + } catch (NumberFormatException nfe) { + // Message does not have the delay property, use the producer's delay + return deliveryDelaySeconds; + } catch (IllegalArgumentException ide) { + // Message's delay property is invalid, throw exception + throw new JMSException(ide.getMessage()); + } + } + @Override public Queue getQueue() throws JMSException { return sqsDestination; @@ -511,13 +522,17 @@ public long getTimeToLive() throws JMSException { * in seconds. */ public void setDeliveryDelay(long deliveryDelay) { + this.deliveryDelaySeconds = convertDeliveryDelayToSeconds(deliveryDelay); + } + + private int convertDeliveryDelayToSeconds(long deliveryDelay) { if (deliveryDelay < 0 || deliveryDelay > MAXIMUM_DELIVERY_DELAY_MILLISECONDS) { throw new IllegalArgumentException("Delivery delay must be non-negative and at most 15 minutes: " + deliveryDelay); } if (deliveryDelay % 1000 != 0) { throw new IllegalArgumentException("Delivery delay must be a multiple of 1000: " + deliveryDelay); } - this.deliveryDelaySeconds = (int)(deliveryDelay / 1000); + return (int)(deliveryDelay / 1000); } /** diff --git a/src/main/java/com/amazon/sqs/javamessaging/message/SQSMessage.java b/src/main/java/com/amazon/sqs/javamessaging/message/SQSMessage.java index 04dbf3e..0951200 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/message/SQSMessage.java +++ b/src/main/java/com/amazon/sqs/javamessaging/message/SQSMessage.java @@ -70,7 +70,8 @@ public class SQSMessage implements Message { public static final String JMS_SQS_REPLY_TO_QUEUE_NAME = "JMS_SQSReplyToQueueName"; public static final String JMS_SQS_REPLY_TO_QUEUE_URL = "JMS_SQSReplyToQueueURL"; public static final String JMS_SQS_CORRELATION_ID = "JMS_SQSCorrelationID"; - + public static final String SQS_SCHEDULED_DELAY = "SQSScheduledDelay"; + // Default JMS Message properties private int deliveryMode = Message.DEFAULT_DELIVERY_MODE; private int priority = Message.DEFAULT_PRIORITY; diff --git a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java index 497d144..6acc3dc 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/SQSMessageProducerTest.java @@ -15,17 +15,11 @@ package com.amazon.sqs.javamessaging; -import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper; -import com.amazon.sqs.javamessaging.SQSConnection; -import com.amazon.sqs.javamessaging.SQSMessageProducer; -import com.amazon.sqs.javamessaging.SQSQueueDestination; -import com.amazon.sqs.javamessaging.SQSSession; import com.amazon.sqs.javamessaging.acknowledge.Acknowledger; import com.amazon.sqs.javamessaging.message.SQSBytesMessage; import com.amazon.sqs.javamessaging.message.SQSMessage; import com.amazon.sqs.javamessaging.message.SQSObjectMessage; import com.amazon.sqs.javamessaging.message.SQSTextMessage; -import com.amazonaws.services.sqs.model.DeleteMessageRequest; import com.amazonaws.services.sqs.model.MessageAttributeValue; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; @@ -743,6 +737,60 @@ public void testSetDeliveryDelayInvalidDelays() throws JMSException { // expected } } + + @Test + public void testSetDeliveryDelayWithMessageProperty() throws JMSException { + assertEquals(0, producer.getDeliveryDelay()); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); + when(amazonSQSClient.sendMessage(requestCaptor.capture())) + .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1)); + + SQSTextMessage msg = new SQSTextMessage("Sorry I'm late!"); + msg.setLongProperty(SQSMessage.SQS_SCHEDULED_DELAY, 2000L); + producer.send(msg); + + assertEquals(2, requestCaptor.getValue().getDelaySeconds().intValue()); + } + + + @Test + public void testSetDeliveryDelayWithMessagePropertyHasPriorityOverProducerDelay() throws JMSException { + assertEquals(0, producer.getDeliveryDelay()); + + producer.setDeliveryDelay(1000); + + assertEquals(1000, producer.getDeliveryDelay()); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); + when(amazonSQSClient.sendMessage(requestCaptor.capture())) + .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1)); + + SQSTextMessage msg = new SQSTextMessage("Sorry I'm late!"); + msg.setLongProperty(SQSMessage.SQS_SCHEDULED_DELAY, 2000L); + producer.send(msg); + + assertEquals(2, requestCaptor.getValue().getDelaySeconds().intValue()); + } + + + @Test + public void testSetDeliveryDelayWithMessagePropertyAndInvalidDelays() throws JMSException { + assertEquals(0, producer.getDeliveryDelay()); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class); + when(amazonSQSClient.sendMessage(requestCaptor.capture())) + .thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1)); + + SQSTextMessage msg = new SQSTextMessage("Sorry I'm late!"); + msg.setLongProperty(SQSMessage.SQS_SCHEDULED_DELAY, 123L); + try { + producer.send(msg); + fail(); + } catch (JMSException e) { + // expected + } + } private Map createMessageAttribute(String type) {