Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding possibility to send binary messages to Azure Service Bus #11838

Merged
merged 6 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "amqpTransportType": getOrCreateConfiguration(target).setAmqpTransportType(property(camelContext, com.azure.core.amqp.AmqpTransportType.class, value)); return true;
case "autowiredenabled":
case "autowiredEnabled": target.setAutowiredEnabled(property(camelContext, boolean.class, value)); return true;
case "binary": getOrCreateConfiguration(target).setBinary(property(camelContext, boolean.class, value)); return true;
case "bridgeerrorhandler":
case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
case "clientoptions":
Expand Down Expand Up @@ -95,6 +96,7 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "amqpTransportType": return com.azure.core.amqp.AmqpTransportType.class;
case "autowiredenabled":
case "autowiredEnabled": return boolean.class;
case "binary": return boolean.class;
case "bridgeerrorhandler":
case "bridgeErrorHandler": return boolean.class;
case "clientoptions":
Expand Down Expand Up @@ -152,6 +154,7 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "amqpTransportType": return getOrCreateConfiguration(target).getAmqpTransportType();
case "autowiredenabled":
case "autowiredEnabled": return target.isAutowiredEnabled();
case "binary": return getOrCreateConfiguration(target).isBinary();
case "bridgeerrorhandler":
case "bridgeErrorHandler": return target.isBridgeErrorHandler();
case "clientoptions":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "amqpRetryOptions": target.getConfiguration().setAmqpRetryOptions(property(camelContext, com.azure.core.amqp.AmqpRetryOptions.class, value)); return true;
case "amqptransporttype":
case "amqpTransportType": target.getConfiguration().setAmqpTransportType(property(camelContext, com.azure.core.amqp.AmqpTransportType.class, value)); return true;
case "binary": target.getConfiguration().setBinary(property(camelContext, boolean.class, value)); return true;
case "bridgeerrorhandler":
case "bridgeErrorHandler": target.setBridgeErrorHandler(property(camelContext, boolean.class, value)); return true;
case "clientoptions":
Expand Down Expand Up @@ -87,6 +88,7 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "amqpRetryOptions": return com.azure.core.amqp.AmqpRetryOptions.class;
case "amqptransporttype":
case "amqpTransportType": return com.azure.core.amqp.AmqpTransportType.class;
case "binary": return boolean.class;
case "bridgeerrorhandler":
case "bridgeErrorHandler": return boolean.class;
case "clientoptions":
Expand Down Expand Up @@ -145,6 +147,7 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "amqpRetryOptions": return target.getConfiguration().getAmqpRetryOptions();
case "amqptransporttype":
case "amqpTransportType": return target.getConfiguration().getAmqpTransportType();
case "binary": return target.getConfiguration().isBinary();
case "bridgeerrorhandler":
case "bridgeErrorHandler": return target.isBridgeErrorHandler();
case "clientoptions":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ public class ServiceBusEndpointUriFactory extends org.apache.camel.support.compo
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Set<String> MULTI_VALUE_PREFIXES;
static {
Set<String> props = new HashSet<>(26);
Set<String> props = new HashSet<>(27);
props.add("amqpRetryOptions");
props.add("amqpTransportType");
props.add("binary");
props.add("bridgeErrorHandler");
props.add("clientOptions");
props.add("connectionString");
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ include::partial$component-endpoint-headers.adoc[]
// component headers: END

=== Message Body
In the producer, this component accepts message body of `String` type or `List<String>` to send batch messages.
In the producer, this component accepts message body of `String`, `byte[]` and `BinaryData` types or `List<String>`, `List<byte[]>` and `List<BinaryData>` to send batch messages.

In the consumer, the returned message body will be of type `String.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public class ServiceBusConfiguration implements Cloneable {
private ServiceBusTransactionContext serviceBusTransactionContext;
@UriParam(label = "producer")
private OffsetDateTime scheduledEnqueueTime;
@UriParam(label = "producer", defaultValue = "false")
private boolean binary;

/**
* Selected topic name or the queue name, that is depending on serviceBusType config. For example if
Expand Down Expand Up @@ -340,6 +342,17 @@ public void setPeekNumMaxMessages(Integer peekNumMaxMessages) {
this.peekNumMaxMessages = peekNumMaxMessages;
}

/**
* Set binary mode. If true, message body will be sent as byte[]. By default, it is false.
*/
public boolean isBinary() {
return binary;
}

public void setBinary(boolean binary) {
this.binary = binary;
}

// *************************************************
//
// *************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,23 @@
*/
package org.apache.camel.component.azure.servicebus;

import java.io.File;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;

import com.azure.core.util.BinaryData;
import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.azure.servicebus.client.ServiceBusClientFactory;
import org.apache.camel.component.azure.servicebus.client.ServiceBusSenderAsyncClientWrapper;
import org.apache.camel.component.azure.servicebus.operations.ServiceBusSenderOperations;
Expand Down Expand Up @@ -147,12 +152,16 @@ private BiConsumer<Exchange, AsyncCallback> sendMessages() {

Mono<Void> sendMessageAsync;

if (exchange.getMessage().getBody() instanceof Iterable) {
if (inputBody instanceof Iterable<?>) {
sendMessageAsync
= serviceBusSenderOperations.sendMessages(convertBodyToList((Iterable<Object>) inputBody),
= serviceBusSenderOperations.sendMessages(convertBodyToList((Iterable<?>) inputBody),
configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties);
} else {
sendMessageAsync = serviceBusSenderOperations.sendMessages(exchange.getMessage().getBody(String.class),
Object convertedBody = inputBody instanceof BinaryData ? inputBody
: getConfiguration().isBinary() ? convertBodyToBinary(exchange)
: exchange.getMessage().getBody(String.class);

sendMessageAsync = serviceBusSenderOperations.sendMessages(convertedBody,
configurationOptionsProxy.getServiceBusTransactionContext(exchange), applicationProperties);
}

Expand All @@ -170,15 +179,18 @@ private BiConsumer<Exchange, AsyncCallback> scheduleMessages() {

Mono<List<Long>> scheduleMessagesAsync;

if (exchange.getMessage().getBody() instanceof Iterable) {
if (inputBody instanceof Iterable<?>) {
scheduleMessagesAsync
= serviceBusSenderOperations.scheduleMessages(convertBodyToList((Iterable<Object>) inputBody),
= serviceBusSenderOperations.scheduleMessages(convertBodyToList((Iterable<?>) inputBody),
configurationOptionsProxy.getScheduledEnqueueTime(exchange),
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
applicationProperties);
} else {
Object convertedBody = inputBody instanceof BinaryData ? inputBody
: getConfiguration().isBinary() ? convertBodyToBinary(exchange)
: exchange.getMessage().getBody(String.class);
scheduleMessagesAsync
= serviceBusSenderOperations.scheduleMessages(exchange.getMessage().getBody(String.class),
= serviceBusSenderOperations.scheduleMessages(convertedBody,
configurationOptionsProxy.getScheduledEnqueueTime(exchange),
configurationOptionsProxy.getServiceBusTransactionContext(exchange),
applicationProperties);
Expand All @@ -189,12 +201,44 @@ private BiConsumer<Exchange, AsyncCallback> scheduleMessages() {
};
}

private List<String> convertBodyToList(final Iterable<Object> inputBody) {
private List<?> convertBodyToList(final Iterable<?> inputBody) {
return StreamSupport.stream(inputBody.spliterator(), false)
.map(body -> getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, body))
.map(this::convertMessageBody)
.toList();
}

private Object convertBodyToBinary(Exchange exchange) {
Object body = exchange.getMessage().getBody();
if (body instanceof InputStream) {
return BinaryData.fromStream((InputStream) body);
} else if (body instanceof Path) {
return BinaryData.fromFile((Path) body);
} else if (body instanceof File) {
return BinaryData.fromFile(((File) body).toPath());
} else {
return BinaryData.fromBytes(exchange.getMessage().getBody(byte[].class));
}
}

private Object convertMessageBody(Object inputBody) {
TypeConverter typeConverter = getEndpoint().getCamelContext().getTypeConverter();
if (inputBody instanceof BinaryData) {
return inputBody;
} else if (getConfiguration().isBinary()) {
if (inputBody instanceof InputStream) {
return BinaryData.fromStream((InputStream) inputBody);
} else if (inputBody instanceof Path) {
return BinaryData.fromFile((Path) inputBody);
} else if (inputBody instanceof File) {
return BinaryData.fromFile(((File) inputBody).toPath());
} else {
return typeConverter.convertTo(byte[].class, inputBody);
}
} else {
return typeConverter.convertTo(String.class, inputBody);
}
}

private <T> void subscribeToMono(
final Mono<T> inputMono, final Exchange exchange, final Consumer<T> resultsCallback, final AsyncCallback callback) {
inputMono
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static ServiceBusMessage createServiceBusMessage(
}

public static Iterable<ServiceBusMessage> createServiceBusMessages(
final Iterable<Object> data, final Map<String, Object> applicationProperties) {
final Iterable<?> data, final Map<String, Object> applicationProperties) {
return StreamSupport.stream(data.spliterator(), false)
.map(obj -> createServiceBusMessage(obj, applicationProperties))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public Mono<Void> sendMessages(
final Object data,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties) {
if (data instanceof Iterable) {
return sendMessages((Iterable<Object>) data, context, applicationProperties);
if (data instanceof Iterable<?>) {
return sendMessages((Iterable<?>) data, context, applicationProperties);
}

return sendMessage(data, context, applicationProperties);
Expand All @@ -58,15 +58,15 @@ public Mono<List<Long>> scheduleMessages(
throw new IllegalArgumentException("To schedule a message, you need to set scheduledEnqueueTime.");
}

if (data instanceof Iterable) {
return scheduleMessages((Iterable<Object>) data, scheduledEnqueueTime, context, applicationProperties);
if (data instanceof Iterable<?>) {
return scheduleMessages((Iterable<?>) data, scheduledEnqueueTime, context, applicationProperties);
}

return scheduleMessage(data, scheduledEnqueueTime, context, applicationProperties);
}

private Mono<Void> sendMessages(
final Iterable<Object> data,
final Iterable<?> data,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties) {
final Iterable<ServiceBusMessage> messages = ServiceBusUtils.createServiceBusMessages(data, applicationProperties);
Expand Down Expand Up @@ -108,7 +108,7 @@ private Mono<List<Long>> scheduleMessage(
}

private Mono<List<Long>> scheduleMessages(
final Iterable<Object> data, final OffsetDateTime scheduledEnqueueTime,
final Iterable<?> data, final OffsetDateTime scheduledEnqueueTime,
final ServiceBusTransactionContext context,
final Map<String, Object> applicationProperties) {
final Iterable<ServiceBusMessage> messages = ServiceBusUtils.createServiceBusMessages(data, applicationProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ void testCreateEndpointWithConfig() throws Exception {
params.put("serviceBusType", ServiceBusType.topic);
params.put("prefetchCount", 10);
params.put("connectionString", "testString");
params.put("binary", "true");

final ServiceBusEndpoint endpoint
= (ServiceBusEndpoint) context.getComponent("azure-servicebus", ServiceBusComponent.class)
Expand All @@ -61,6 +62,7 @@ void testCreateEndpointWithConfig() throws Exception {
assertEquals("testTopicOrQueue", endpoint.getConfiguration().getTopicOrQueueName());
assertEquals(10, endpoint.getConfiguration().getPrefetchCount());
assertEquals("testString", endpoint.getConfiguration().getConnectionString());
assertEquals(true, endpoint.getConfiguration().isBinary());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.camel.component.azure.servicebus;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.StreamSupport;
Expand All @@ -38,11 +40,16 @@ void testCreateServiceBusMessage() {
final ServiceBusMessage message2 = ServiceBusUtils.createServiceBusMessage(String.valueOf(12345), null);

assertEquals("12345", message2.getBody().toString());

//test bytes
byte[] testByteBody = "test string".getBytes(StandardCharsets.UTF_8);
final ServiceBusMessage message3 = ServiceBusUtils.createServiceBusMessage(testByteBody, null);
assertArrayEquals(testByteBody, message3.getBody().toBytes());
}

@Test
void testCreateServiceBusMessages() {
final List<Object> inputMessages = new LinkedList<>();
final List<String> inputMessages = new LinkedList<>();
inputMessages.add("test data");
inputMessages.add(String.valueOf(12345));

Expand All @@ -52,5 +59,19 @@ void testCreateServiceBusMessages() {
.anyMatch(record -> record.getBody().toString().equals("test data")));
assertTrue(StreamSupport.stream(busMessages.spliterator(), false)
.anyMatch(record -> record.getBody().toString().equals("12345")));

//Test bytes
final List<byte[]> inputMessages2 = new LinkedList<>();
byte[] byteBody1 = "test data".getBytes(StandardCharsets.UTF_8);
byte[] byteBody2 = "test data2".getBytes(StandardCharsets.UTF_8);
inputMessages2.add(byteBody1);
inputMessages2.add(byteBody2);

final Iterable<ServiceBusMessage> busMessages2 = ServiceBusUtils.createServiceBusMessages(inputMessages2, null);

assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
.anyMatch(message -> Arrays.equals(message.getBody().toBytes(), byteBody1)));
assertTrue(StreamSupport.stream(busMessages2.spliterator(), false)
.anyMatch(message -> Arrays.equals(message.getBody().toBytes(), byteBody2)));
}
}
Loading