Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding possibility to send binary messages to Azure Service Bus #11838

Merged
merged 6 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ private BiConsumer<Exchange, AsyncCallback> sendMessages() {

Mono<Void> sendMessageAsync;

if (exchange.getMessage().getBody() instanceof Iterable) {
if (exchange.getMessage().getBody() instanceof Iterable<?>) {
sendMessageAsync
= serviceBusSenderOperations.sendMessages(convertBodyToList((Iterable<Object>) inputBody),
= serviceBusSenderOperations.sendMessages(convertBodyToList((Iterable<?>) inputBody),
configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties);
} else {
sendMessageAsync = serviceBusSenderOperations.sendMessages(exchange.getMessage().getBody(String.class),
sendMessageAsync = serviceBusSenderOperations.sendMessages(inputBody,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the input body is a Java class or somethig else. Does ASB know how to handle all kind of java objects ?

Copy link
Contributor Author

@a-mazurok a-mazurok Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I can see both consumers and producers will support only String, byte[] and BinaryData and only those types(besides special AmqpMessageBody and ServiceBusReceivedMessage types) are supported by underlying Azure library.
I think that client developer can always convert any other Java type to String if necessary in proper way(including Camel type converters) than doing it internally and implicitly with toString() method or implicit Camel type converters(as it works so far).

Binary data like Protobuf messages(our case) can't be properly converted to String type and I believe that many other organizations may have met the same limitation already.

I understand that potentially that change can break some potentially existing clients who were relying on such implicit conversion of different Java types to String so such change shouldn't be released as minor version upgrade 4.1.X but I hope it can be included in 4.2.0 release together with documentation update.

Maybe we can add the same change in new Camel 3.X.0 release as well(if any planned).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of other Java types ServiceBusUtils will throw IllegalArgumentException("Make sure your message data is in String, byte[] or BinaryData")(not changed that).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok thanks, but we need to leverage Camels type converter systems, so eg if your body is a java.io.File or InputStream then they can be sent as-is also.

And is BinaryData some kind of azure class ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, BinaryData is Azure special class that is like wrapper that can hold both Strings and bytes inside.
If I correctly understand your thoughts direction then probably some new endpoint configuration property should be added e.g. binary with default value false(backward compatible) that will control if Camel should try to convert any Java class to String or byte[] unless it is already BinaryData type that doesn't need any conversion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that sounds like a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added new property "binary".

configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties);
}

Expand All @@ -170,15 +170,15 @@ private BiConsumer<Exchange, AsyncCallback> scheduleMessages() {

Mono<List<Long>> scheduleMessagesAsync;

if (exchange.getMessage().getBody() instanceof Iterable) {
if (exchange.getMessage().getBody() instanceof Iterable<?>) {
scheduleMessagesAsync
= serviceBusSenderOperations.scheduleMessages(convertBodyToList((Iterable<Object>) inputBody),
= serviceBusSenderOperations.scheduleMessages(convertBodyToList((Iterable<?>) inputBody),
configurationOptionsProxy.getScheduledEnqueueTime(exchange),
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
applicationProperties);
} else {
scheduleMessagesAsync
= serviceBusSenderOperations.scheduleMessages(exchange.getMessage().getBody(String.class),
= serviceBusSenderOperations.scheduleMessages(inputBody,
configurationOptionsProxy.getScheduledEnqueueTime(exchange),
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
applicationProperties);
Expand All @@ -189,10 +189,8 @@ private BiConsumer<Exchange, AsyncCallback> scheduleMessages() {
};
}

private List<String> convertBodyToList(final Iterable<Object> inputBody) {
return StreamSupport.stream(inputBody.spliterator(), false)
.map(body -> getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, body))
.toList();
private List<?> convertBodyToList(final Iterable<?> inputBody) {
return StreamSupport.stream(inputBody.spliterator(), false).toList();
}

private <T> void subscribeToMono(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static ServiceBusMessage createServiceBusMessage(
}

public static Iterable<ServiceBusMessage> createServiceBusMessages(
final Iterable<Object> data, final Map<String, Object> applicationProperties) {
final Iterable<?> data, final Map<String, Object> applicationProperties) {
return StreamSupport.stream(data.spliterator(), false)
.map(obj -> createServiceBusMessage(obj, applicationProperties))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public Mono<Void> sendMessages(
final Object data,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties) {
if (data instanceof Iterable) {
return sendMessages((Iterable<Object>) data, context, applicationProperties);
if (data instanceof Iterable<?>) {
return sendMessages((Iterable<?>) data, context, applicationProperties);
}

return sendMessage(data, context, applicationProperties);
Expand All @@ -58,15 +58,15 @@ public Mono<List<Long>> scheduleMessages(
throw new IllegalArgumentException("To schedule a message, you need to set scheduledEnqueueTime.");
}

if (data instanceof Iterable) {
return scheduleMessages((Iterable<Object>) data, scheduledEnqueueTime, context, applicationProperties);
if (data instanceof Iterable<?>) {
return scheduleMessages((Iterable<?>) data, scheduledEnqueueTime, context, applicationProperties);
}

return scheduleMessage(data, scheduledEnqueueTime, context, applicationProperties);
}

private Mono<Void> sendMessages(
final Iterable<Object> data,
final Iterable<?> data,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties) {
final Iterable<ServiceBusMessage> messages = ServiceBusUtils.createServiceBusMessages(data, applicationProperties);
Expand Down Expand Up @@ -108,7 +108,7 @@ private Mono<List<Long>> scheduleMessage(
}

private Mono<List<Long>> scheduleMessages(
final Iterable<Object> data, final OffsetDateTime scheduledEnqueueTime,
final Iterable<?> data, final OffsetDateTime scheduledEnqueueTime,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties) {
final Iterable<ServiceBusMessage> messages = ServiceBusUtils.createServiceBusMessages(data, applicationProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.camel.component.azure.servicebus;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.StreamSupport;
Expand All @@ -38,11 +40,16 @@ void testCreateServiceBusMessage() {
final ServiceBusMessage message2 = ServiceBusUtils.createServiceBusMessage(String.valueOf(12345), null);

assertEquals("12345", message2.getBody().toString());

//test bytes
byte[] testByteBody = "test string".getBytes(StandardCharsets.UTF_8);
final ServiceBusMessage message3 = ServiceBusUtils.createServiceBusMessage(testByteBody, null);
assertArrayEquals(testByteBody, message3.getBody().toBytes());
}

@Test
void testCreateServiceBusMessages() {
final List<Object> inputMessages = new LinkedList<>();
final List<String> inputMessages = new LinkedList<>();
inputMessages.add("test data");
inputMessages.add(String.valueOf(12345));

Expand All @@ -52,5 +59,19 @@ void testCreateServiceBusMessages() {
.anyMatch(record -> record.getBody().toString().equals("test data")));
assertTrue(StreamSupport.stream(busMessages.spliterator(), false)
.anyMatch(record -> record.getBody().toString().equals("12345")));

//Test bytes
final List<byte[]> inputMessages2 = new LinkedList<>();
byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
inputMessages2.add(byteBody1);
inputMessages2.add(byteBody2);

final Iterable<ServiceBusMessage> busMessages2 = ServiceBusUtils.createServiceBusMessages(inputMessages2, null);

assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
.anyMatch(message -> Arrays.equals(message.getBody().toBytes(), byteBody1)));
assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
.anyMatch(message -> Arrays.equals(message.getBody().toBytes(), byteBody2)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.camel.component.azure.servicebus.integration;

import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -49,7 +51,7 @@ class ServiceBusProducerTest extends BaseCamelServiceBusTestSupport {
@Test
public void testSendMessage() throws InterruptedException {
template.send("direct:sendMessage", exchange -> {
exchange.getIn().setBody(123456789);
exchange.getIn().setBody("123456789");
exchange.getIn().setHeader(ServiceBusConstants.APPLICATION_PROPERTIES, Map.of("customKey", "customValue"));
});

Expand All @@ -68,16 +70,38 @@ public void testSendMessage() throws InterruptedException {

assertTrue(batch1Exists, "test message body");
assertTrue(applicationPropertiesPresent, "test message application properties");

//test byte body
byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
template.send("direct:sendMessage", exchange -> {
exchange.getIn().setBody(testByteBody);
exchange.getIn().setHeader(ServiceBusConstants.APPLICATION_PROPERTIES, Map.of("customKey", "customValue"));
});

Thread.sleep(1000);

final List<ServiceBusReceivedMessage> receivedMessages2
= receiverAsyncClient.receiveMessages().toStream().toList();

final boolean batch2Exists = receivedMessages2.stream()
.anyMatch(serviceBusReceivedMessage -> serviceBusReceivedMessage.getBody().toString().equals("123456789"));

final boolean applicationPropertiesPresent2 = receivedMessages2.stream()
.anyMatch(serviceBusReceivedMessage -> serviceBusReceivedMessage.getApplicationProperties()
.containsKey("customKey"));

assertTrue(batch2Exists, "test byte body");
assertTrue(applicationPropertiesPresent2, "test byte message application properties");
}

@Test
public void testSendBatchMessages() throws InterruptedException {
template.send("direct:sendBatchMessages", exchange -> {
final List<Object> inputBatch = new LinkedList<>();
final List<String> inputBatch = new LinkedList<>();
inputBatch.add("test batch 1");
inputBatch.add("test batch 2");
inputBatch.add("test batch 3");
inputBatch.add(123456);
inputBatch.add("123456");

exchange.getIn().setBody(inputBatch);
});
Expand All @@ -104,6 +128,30 @@ public void testSendBatchMessages() throws InterruptedException {
assertTrue(batch2Exists, "test message body 2");
assertTrue(batch3Exists, "test message body 3");
assertTrue(batch4Exists, "test message body 4");

//test bytes
final List<byte[]> inputBatch2 = new LinkedList<>();
byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
inputBatch2.add(byteBody1);
inputBatch2.add(byteBody2);

template.send("direct:sendBatchMessages", exchange -> {
exchange.getIn().setBody(inputBatch2);
});

Thread.sleep(1000);

// let's check our data
final Spliterator<ServiceBusReceivedMessage> receivedMessages2
= receiverAsyncClient.receiveMessages().toIterable().spliterator();
final boolean byteBody1Exists = StreamSupport.stream(receivedMessages2, false)
.anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody1));
final boolean byteBody2Exists = StreamSupport.stream(receivedMessages2, false)
.anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody2));

assertTrue(byteBody1Exists, "test byte body 1");
assertTrue(byteBody2Exists, "test byte body 2");
}

@Test
Expand All @@ -123,6 +171,23 @@ void testScheduleMessage() throws InterruptedException {
.anyMatch(serviceBusReceivedMessage -> serviceBusReceivedMessage.getBody().toString().equals("test message"));

assertTrue(batch1Exists);

//test bytes
byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
template.send("direct:scheduleMessage", exchange -> {
exchange.getIn().setHeader(ServiceBusConstants.SCHEDULED_ENQUEUE_TIME, OffsetDateTime.now());
exchange.getIn().setBody(testByteBody);
});

Thread.sleep(1000);

final Spliterator<ServiceBusReceivedMessage> receivedMessages2
= receiverAsyncClient.receiveMessages().toIterable().spliterator();

final boolean batch2Exists = StreamSupport.stream(receivedMessages2, false)
.anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), testByteBody));

assertTrue(batch2Exists);
}

@Test
Expand All @@ -132,7 +197,7 @@ public void testScheduleBatchMessages() throws InterruptedException {
inputBatch.add("test batch 1");
inputBatch.add("test batch 2");
inputBatch.add("test batch 3");
inputBatch.add(123456);
inputBatch.add("123456");

exchange.getIn().setHeader(ServiceBusConstants.SCHEDULED_ENQUEUE_TIME, OffsetDateTime.now());
exchange.getIn().setBody(inputBatch);
Expand Down Expand Up @@ -160,6 +225,33 @@ public void testScheduleBatchMessages() throws InterruptedException {
assertTrue(batch2Exists, "test message body 2");
assertTrue(batch3Exists, "test message body 3");
assertTrue(batch4Exists, "test message body 4");

//test bytes
final List<byte[]> inputBatch2 = new LinkedList<>();
byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
inputBatch2.add(byteBody1);
inputBatch2.add(byteBody2);

template.send("direct:scheduleBatchMessages", exchange -> {
exchange.getIn().setHeader(ServiceBusConstants.SCHEDULED_ENQUEUE_TIME, OffsetDateTime.now());
exchange.getIn().setBody(inputBatch2);
});

Thread.sleep(1000);

// let's check our data
final Spliterator<ServiceBusReceivedMessage> receivedMessages2
= receiverAsyncClient.receiveMessages().toIterable().spliterator();

final boolean byteBody1Exists = StreamSupport.stream(receivedMessages2, false)
.anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody1));

final boolean byteBody2Exists = StreamSupport.stream(receivedMessages2, false)
.anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody2));

assertTrue(byteBody1Exists, "test byte message body 1");
assertTrue(byteBody2Exists, "test byte message body 2");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.camel.component.azure.servicebus.integration.operations;

import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.*;
import java.util.stream.StreamSupport;
Expand All @@ -37,6 +38,7 @@
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertTrue;

@EnabledIfSystemProperty(named = "connectionString", matches = ".*",
disabledReason = "Make sure to supply azure servicebus connectionString, e.g: mvn verify -DconnectionString=string")
Expand Down Expand Up @@ -93,6 +95,13 @@ void testSendSingleMessage() {

assertTrue(exists, "test message body");

//test bytes
byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
operations.sendMessages(testByteBody, null, Map.of("customKey", "customValue")).block();
final boolean exists2 = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
.anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), testByteBody));
assertTrue(exists2, "test byte body");

// test if we have something other than string or byte[]
assertThrows(IllegalArgumentException.class, () -> {
operations.sendMessages(12345, null, null).block();
Expand Down Expand Up @@ -125,6 +134,26 @@ void testSendingBatchMessages() {
assertTrue(batch1Exists, "test message body 1");
assertTrue(batch2Exists, "test message body 2");
assertTrue(batch3Exists, "test message body 3");

//test bytes
final List<byte[]> inputBatch2 = new LinkedList<>();
byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
inputBatch2.add(byteBody1);
inputBatch2.add(byteBody2);

operations.sendMessages(inputBatch2, null, null).block();
final Spliterator<ServiceBusReceivedMessage> receivedMessages2
= clientReceiverWrapper.receiveMessages().toIterable().spliterator();

final boolean byteBody1Exists = StreamSupport.stream(receivedMessages2, false)
.anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody1));
final boolean byteBody2Exists = StreamSupport.stream(receivedMessages2, false)
.anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody2));

assertTrue(byteBody1Exists, "test byte body 1");
assertTrue(byteBody2Exists, "test byte body 2");

}

@Test
Expand All @@ -139,6 +168,13 @@ void testScheduleMessage() {

assertTrue(exists, "test message body");

//test bytes
byte[] testByteBody = "test data".getBytes(StandardCharsets.UTF_8);
operations.scheduleMessages(testByteBody, OffsetDateTime.now(), null, null).block();
final boolean exists2 = StreamSupport.stream(clientReceiverWrapper.receiveMessages().toIterable().spliterator(), false)
.anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), testByteBody));
assertTrue(exists2, "test byte body");

// test if we have something other than string or byte[]
assertThrows(IllegalArgumentException.class, () -> {
operations.scheduleMessages(12345, OffsetDateTime.now(), null, null).block();
Expand Down Expand Up @@ -174,5 +210,24 @@ void testSchedulingBatchMessages() {
assertTrue(batch1Exists, "test message body 1");
assertTrue(batch2Exists, "test message body 2");
assertTrue(batch3Exists, "test message body 3");

//test bytes
final List<byte[]> inputBatch2 = new LinkedList<>();
byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
inputBatch2.add(byteBody1);
inputBatch2.add(byteBody2);

operations.scheduleMessages(inputBatch2, OffsetDateTime.now(), null, null).block();
final Spliterator<ServiceBusReceivedMessage> receivedMessages2
= clientReceiverWrapper.receiveMessages().toIterable().spliterator();

final boolean byteBody1Exists = StreamSupport.stream(receivedMessages2, false)
.anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody1));
final boolean byteBody2Exists = StreamSupport.stream(receivedMessages2, false)
.anyMatch(serviceBusReceivedMessage -> Arrays.equals(serviceBusReceivedMessage.getBody().toBytes(), byteBody2));

assertTrue(byteBody1Exists, "test byte body 1");
assertTrue(byteBody2Exists, "test byte body 2");
}
}
Loading