Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a way to set individual message delay with message property #79

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep
if (sqsMessageBody == null || sqsMessageBody.isEmpty()) {
throw new JMSException("Message body cannot be null or empty");
}
Map<String, MessageAttributeValue> messageAttributes = propertyToMessageAttribute((SQSMessage) message);
Map<String, MessageAttributeValue> messageAttributes = propertyToMessageAttribute(message);

/**
* These will override existing attributes if they exist. Everything that
Expand All @@ -127,9 +127,7 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep
SendMessageRequest sendMessageRequest = new SendMessageRequest(queue.getQueueUrl(), sqsMessageBody);
sendMessageRequest.setMessageAttributes(messageAttributes);

if (deliveryDelaySeconds != 0) {
sendMessageRequest.setDelaySeconds(deliveryDelaySeconds);
}
sendMessageRequest.setDelaySeconds(calculateDelaySeconds(message));

//for FIFO queues, we have to specify both MessageGroupId, which we obtain from standard property JMSX_GROUP_ID
//and MessageDeduplicationId, which we obtain from a custom provider specific property JMS_SQS_DEDUPLICATION_ID
Expand All @@ -154,6 +152,19 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep
}
}

private int calculateDelaySeconds(SQSMessage message) throws JMSException {
try {
long scheduledDelay = message.getLongProperty(SQSMessage.SQS_SCHEDULED_DELAY);
return convertDeliveryDelayToSeconds(scheduledDelay);
} catch (NumberFormatException nfe) {
// Message does not have the delay property, use the producer's delay
return deliveryDelaySeconds;
} catch (IllegalArgumentException ide) {
// Message's delay property is invalid, throw exception
throw new JMSException(ide.getMessage());
}
}

@Override
public Queue getQueue() throws JMSException {
return sqsDestination;
Expand Down Expand Up @@ -511,13 +522,17 @@ public long getTimeToLive() throws JMSException {
* in seconds.
*/
public void setDeliveryDelay(long deliveryDelay) {
this.deliveryDelaySeconds = convertDeliveryDelayToSeconds(deliveryDelay);
}

private int convertDeliveryDelayToSeconds(long deliveryDelay) {
if (deliveryDelay < 0 || deliveryDelay > MAXIMUM_DELIVERY_DELAY_MILLISECONDS) {
throw new IllegalArgumentException("Delivery delay must be non-negative and at most 15 minutes: " + deliveryDelay);
}
if (deliveryDelay % 1000 != 0) {
throw new IllegalArgumentException("Delivery delay must be a multiple of 1000: " + deliveryDelay);
}
this.deliveryDelaySeconds = (int)(deliveryDelay / 1000);
return (int)(deliveryDelay / 1000);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public class SQSMessage implements Message {
public static final String JMS_SQS_REPLY_TO_QUEUE_NAME = "JMS_SQSReplyToQueueName";
public static final String JMS_SQS_REPLY_TO_QUEUE_URL = "JMS_SQSReplyToQueueURL";
public static final String JMS_SQS_CORRELATION_ID = "JMS_SQSCorrelationID";

public static final String SQS_SCHEDULED_DELAY = "SQSScheduledDelay";

// Default JMS Message properties
private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
private int priority = Message.DEFAULT_PRIORITY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,11 @@
package com.amazon.sqs.javamessaging;


import com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper;
import com.amazon.sqs.javamessaging.SQSConnection;
import com.amazon.sqs.javamessaging.SQSMessageProducer;
import com.amazon.sqs.javamessaging.SQSQueueDestination;
import com.amazon.sqs.javamessaging.SQSSession;
import com.amazon.sqs.javamessaging.acknowledge.Acknowledger;
import com.amazon.sqs.javamessaging.message.SQSBytesMessage;
import com.amazon.sqs.javamessaging.message.SQSMessage;
import com.amazon.sqs.javamessaging.message.SQSObjectMessage;
import com.amazon.sqs.javamessaging.message.SQSTextMessage;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;
Expand Down Expand Up @@ -743,6 +737,60 @@ public void testSetDeliveryDelayInvalidDelays() throws JMSException {
// expected
}
}

@Test
public void testSetDeliveryDelayWithMessageProperty() throws JMSException {
assertEquals(0, producer.getDeliveryDelay());

ArgumentCaptor<SendMessageRequest> requestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
when(amazonSQSClient.sendMessage(requestCaptor.capture()))
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1));

SQSTextMessage msg = new SQSTextMessage("Sorry I'm late!");
msg.setLongProperty(SQSMessage.SQS_SCHEDULED_DELAY, 2000L);
producer.send(msg);

assertEquals(2, requestCaptor.getValue().getDelaySeconds().intValue());
}


@Test
public void testSetDeliveryDelayWithMessagePropertyHasPriorityOverProducerDelay() throws JMSException {
assertEquals(0, producer.getDeliveryDelay());

producer.setDeliveryDelay(1000);

assertEquals(1000, producer.getDeliveryDelay());

ArgumentCaptor<SendMessageRequest> requestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
when(amazonSQSClient.sendMessage(requestCaptor.capture()))
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1));

SQSTextMessage msg = new SQSTextMessage("Sorry I'm late!");
msg.setLongProperty(SQSMessage.SQS_SCHEDULED_DELAY, 2000L);
producer.send(msg);

assertEquals(2, requestCaptor.getValue().getDelaySeconds().intValue());
}


@Test
public void testSetDeliveryDelayWithMessagePropertyAndInvalidDelays() throws JMSException {
assertEquals(0, producer.getDeliveryDelay());

ArgumentCaptor<SendMessageRequest> requestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
when(amazonSQSClient.sendMessage(requestCaptor.capture()))
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1));

SQSTextMessage msg = new SQSTextMessage("Sorry I'm late!");
msg.setLongProperty(SQSMessage.SQS_SCHEDULED_DELAY, 123L);
try {
producer.send(msg);
fail();
} catch (JMSException e) {
// expected
}
}


private Map<String, MessageAttributeValue> createMessageAttribute(String type) {
Expand Down