diff --git a/aeron-agent/src/main/java/io/aeron/agent/DriverEventDissector.java b/aeron-agent/src/main/java/io/aeron/agent/DriverEventDissector.java index 658c504e8a..6c1fad6def 100644 --- a/aeron-agent/src/main/java/io/aeron/agent/DriverEventDissector.java +++ b/aeron-agent/src/main/java/io/aeron/agent/DriverEventDissector.java @@ -39,6 +39,7 @@ final class DriverEventDissector private static final RttMeasurementFlyweight RTT_MEASUREMENT = new RttMeasurementFlyweight(); private static final HeaderFlyweight HEADER = new HeaderFlyweight(); private static final ResolutionEntryFlyweight RESOLUTION = new ResolutionEntryFlyweight(); + private static final ResponseSetupFlyweight RSP_SETUP = new ResponseSetupFlyweight(); private static final PublicationMessageFlyweight PUB_MSG = new PublicationMessageFlyweight(); private static final SubscriptionMessageFlyweight SUB_MSG = new SubscriptionMessageFlyweight(); private static final PublicationBuffersReadyFlyweight PUB_READY = new PublicationBuffersReadyFlyweight(); @@ -109,6 +110,11 @@ static void dissectFrame( dissectResFrame(buffer, frameOffset, builder); break; + case HeaderFlyweight.HDR_TYPE_RSP_SETUP: + RSP_SETUP.wrap(buffer, frameOffset, buffer.capacity() - frameOffset); + dissectRspSetupFrame(builder); + break; + default: builder.append("FRAME_UNKNOWN: ").append(frameType); break; @@ -540,6 +546,22 @@ private static void dissectResFrame( } } + private static void dissectRspSetupFrame(final StringBuilder builder) + { + builder.append("RSP_SETUP "); + HeaderFlyweight.appendFlagsAsChars(RSP_SETUP.flags(), builder); + + builder + .append(" len ") + .append(RSP_SETUP.frameLength()) + .append(' ') + .append(RSP_SETUP.sessionId()) + .append(':') + .append(RSP_SETUP.streamId()) + .append(" RSP_SESSION_ID ") + .append(RSP_SETUP.responseSessionId()); + } + private static void dissectResEntry(final StringBuilder builder) { builder diff --git a/aeron-client/src/main/c/protocol/aeron_udp_protocol.h b/aeron-client/src/main/c/protocol/aeron_udp_protocol.h index b0736d3a6c..dda7ea28be 100644 --- a/aeron-client/src/main/c/protocol/aeron_udp_protocol.h +++ b/aeron-client/src/main/c/protocol/aeron_udp_protocol.h @@ -134,6 +134,15 @@ typedef struct aeron_option_header_stct uint16_t type; } aeron_option_header_t; + +typedef struct aeron_response_setup_header_stct +{ + aeron_frame_header_t frame_header; + int32_t session_id; + int32_t stream_id; + int32_t response_session_id; +} +aeron_response_setup_header_t; #pragma pack(pop) int aeron_udp_protocol_group_tag(aeron_status_message_header_t *sm, int64_t *group_tag); @@ -151,6 +160,7 @@ int aeron_udp_protocol_group_tag(aeron_status_message_header_t *sm, int64_t *gro #define AERON_HDR_TYPE_ATS_DATA (0x08) #define AERON_HDR_TYPE_ATS_SETUP (0x09) #define AERON_HDR_TYPE_ATS_SM (0x0A) +#define AERON_HDR_TYPE_RSP_SETUP (0x0B) #define AERON_HDR_TYPE_EXT (0xFFFF) #define AERON_DATA_HEADER_LENGTH (sizeof(aeron_data_header_t)) @@ -166,6 +176,8 @@ int aeron_udp_protocol_group_tag(aeron_status_message_header_t *sm, int64_t *gro #define AERON_STATUS_MESSAGE_HEADER_SEND_SETUP_FLAG (UINT8_C(0x80)) #define AERON_STATUS_MESSAGE_HEADER_EOS_FLAG (UINT8_C(0x40)) +#define AERON_SETUP_HEADER_SEND_RESPONSE_FLAG (UINT8_C(0x80)) + #define AERON_RTTM_HEADER_REPLY_FLAG (UINT8_C(0x80)) #define AERON_RES_HEADER_TYPE_NAME_TO_IP4_MD (0x01) diff --git a/aeron-client/src/main/c/uri/aeron_uri.c b/aeron-client/src/main/c/uri/aeron_uri.c index b1f119280e..3c6ed614fb 100755 --- a/aeron-client/src/main/c/uri/aeron_uri.c +++ b/aeron-client/src/main/c/uri/aeron_uri.c @@ -362,12 +362,12 @@ int aeron_uri_get_int32(aeron_uri_params_t *uri_params, const char *key, int32_t return 1; } -int aeron_uri_get_int64(aeron_uri_params_t *uri_params, const char *key, int64_t *retval) +int aeron_uri_get_int64(aeron_uri_params_t *uri_params, const char *key, int64_t default_val, int64_t *retval) { const char *value_str; if ((value_str = aeron_uri_find_param_value(uri_params, key)) == NULL) { - *retval = 0; + *retval = default_val; return 0; } diff --git a/aeron-client/src/main/c/uri/aeron_uri.h b/aeron-client/src/main/c/uri/aeron_uri.h index 7b2e6ed617..f9eeb75260 100644 --- a/aeron-client/src/main/c/uri/aeron_uri.h +++ b/aeron-client/src/main/c/uri/aeron_uri.h @@ -43,6 +43,7 @@ aeron_uri_params_t; #define AERON_UDP_CHANNEL_CONTROL_MODE_KEY "control-mode" #define AERON_UDP_CHANNEL_CONTROL_MODE_MANUAL_VALUE "manual" #define AERON_UDP_CHANNEL_CONTROL_MODE_DYNAMIC_VALUE "dynamic" +#define AERON_UDP_CHANNEL_CONTROL_MODE_RESPONSE_VALUE "response" #define AERON_URI_INITIAL_TERM_ID_KEY "init-term-id" #define AERON_URI_TERM_ID_KEY "term-id" @@ -70,6 +71,7 @@ aeron_uri_params_t; #define AERON_URI_CHANNEL_RCV_TIMESTAMP_OFFSET_KEY "channel-rcv-ts-offset" #define AERON_URI_CHANNEL_SND_TIMESTAMP_OFFSET_KEY "channel-snd-ts-offset" #define AERON_URI_TIMESTAMP_OFFSET_RESERVED "reserved" +#define AERON_URI_RESPONSE_CORRELATION_ID_KEY "response-correlation-id" #define AERON_URI_INVALID_TAG (-1) typedef struct aeron_udp_channel_params_stct @@ -136,7 +138,7 @@ uint8_t aeron_uri_multicast_ttl(aeron_uri_t *uri); const char *aeron_uri_find_param_value(const aeron_uri_params_t *uri_params, const char *key); int aeron_uri_get_int32(aeron_uri_params_t *uri_params, const char *key, int32_t *retval); -int aeron_uri_get_int64(aeron_uri_params_t *uri_params, const char *key, int64_t *retval); +int aeron_uri_get_int64(aeron_uri_params_t *uri_params, const char *key, int64_t default_val, int64_t *retval); int aeron_uri_get_bool(aeron_uri_params_t *uri_params, const char *key, bool *retval); int aeron_uri_get_ats(aeron_uri_params_t *uri_params, aeron_uri_ats_status_t *uri_ats_status); int aeron_uri_sprint(aeron_uri_t *uri, char *buffer, size_t buffer_len); diff --git a/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java b/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java index 3a235a0f6b..c9a17e0ee0 100644 --- a/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java +++ b/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java @@ -74,6 +74,10 @@ public final class ChannelUriStringBuilder private String mediaReceiveTimestampOffset; private String channelReceiveTimestampOffset; private String channelSendTimestampOffset; + private String responseEndpoint; + private Long responseCorrelationId; + private Boolean isResponseChannel; + private Long responseSubscriptionId; /** * Default constructor @@ -134,6 +138,8 @@ public ChannelUriStringBuilder(final ChannelUri channelUri) mediaReceiveTimestampOffset(channelUri); channelReceiveTimestampOffset(channelUri); channelSendTimestampOffset(channelUri); + responseEndpoint(channelUri); + responseCorrelationId(channelUri); } /** @@ -175,6 +181,10 @@ public ChannelUriStringBuilder clear() mediaReceiveTimestampOffset = null; channelReceiveTimestampOffset = null; channelSendTimestampOffset = null; + responseEndpoint = null; + responseCorrelationId = null; + isResponseChannel = null; + responseSubscriptionId = null; return this; } @@ -428,8 +438,9 @@ public String controlEndpoint() public ChannelUriStringBuilder controlMode(final String controlMode) { if (null != controlMode && - !controlMode.equals(CommonContext.MDC_CONTROL_MODE_MANUAL) && - !controlMode.equals(CommonContext.MDC_CONTROL_MODE_DYNAMIC)) + !controlMode.equals(MDC_CONTROL_MODE_MANUAL) && + !controlMode.equals(MDC_CONTROL_MODE_DYNAMIC) && + !controlMode.equals(CONTROL_MODE_RESPONSE)) { throw new IllegalArgumentException("invalid control mode: " + controlMode); } @@ -1891,6 +1902,80 @@ public String channelSendTimestampOffset() return channelSendTimestampOffset; } + /** + * Set the response endpoint to be used for a response channel subscription or publication. + * + * @param responseEndpoint response endpoint to be used in this channel URI. + * @return this for a fluent API. + * @see CommonContext#RESPONSE_ENDPOINT_PARAM_NAME + */ + public ChannelUriStringBuilder responseEndpoint(final String responseEndpoint) + { + this.responseEndpoint = responseEndpoint; + return this; + } + + /** + * Set the response endpoint to be used for a response channel subscription or publication by extracting it from the + * ChannelUri. + * + * @param channelUri the existing URI to extract the responseEndpoint from. + * @return this for a fluent API. + */ + public ChannelUriStringBuilder responseEndpoint(final ChannelUri channelUri) + { + return responseEndpoint(channelUri.get(RESPONSE_ENDPOINT_PARAM_NAME)); + } + + /** + * The response endpoint to be used for a response channel subscription or publication. + * + * @return response endpoint. + */ + public String responseEndpoint() + { + return this.responseEndpoint; + } + + /** + * Set the correlation id from the image received on the response "server's" subscription to be used by a response + * publication. + * + * @param responseCorrelationId correlation id of an image from the response "server's" subscription. + * @return this for a fluent API. + */ + public ChannelUriStringBuilder responseCorrelationId(final Long responseCorrelationId) + { + this.responseCorrelationId = responseCorrelationId; + return this; + } + + /** + * Set the correlation id from the image received on the response "server's" subscription to be used by a response + * publication extracted from the channelUri. + * + * @param channelUri the existing URI to extract the responseCorrelationId from. + * @return this for a fluent API. + */ + public ChannelUriStringBuilder responseCorrelationId(final ChannelUri channelUri) + { + final String responseCorrelationIdString = channelUri.get(RESPONSE_CORRELATION_ID_PARAM_NAME); + + if (null != responseCorrelationIdString) + { + try + { + responseCorrelationId(Long.valueOf(responseCorrelationIdString)); + } + catch (final NumberFormatException ex) + { + throw new IllegalArgumentException("'response-correlation-id' must be a valid long value", ex); + } + } + + return this; + } + /** * Offset into a message to store the channel send timestamp. May also be the special value 'reserved' which means * to store the timestamp in the reserved value field. @@ -1936,7 +2021,7 @@ public ChannelUriStringBuilder channelSendTimestampOffset(final ChannelUri chann * * @return a channel URI String for the given parameters. */ - @SuppressWarnings("MethodLength") + @SuppressWarnings({ "MethodLength", "DuplicatedCode" }) public String build() { sb.setLength(0); @@ -1983,6 +2068,8 @@ public String build() appendParameter(sb, MEDIA_RCV_TIMESTAMP_OFFSET_PARAM_NAME, mediaReceiveTimestampOffset); appendParameter(sb, CHANNEL_RECEIVE_TIMESTAMP_OFFSET_PARAM_NAME, channelReceiveTimestampOffset); appendParameter(sb, CHANNEL_SEND_TIMESTAMP_OFFSET_PARAM_NAME, channelSendTimestampOffset); + appendParameter(sb, RESPONSE_ENDPOINT_PARAM_NAME, responseEndpoint); + appendParameter(sb, RESPONSE_CORRELATION_ID_PARAM_NAME, responseCorrelationId); final char lastChar = sb.charAt(sb.length() - 1); if (lastChar == '|' || lastChar == '?') diff --git a/aeron-client/src/main/java/io/aeron/CommonContext.java b/aeron-client/src/main/java/io/aeron/CommonContext.java index c2b6b4a0f6..4bb47dad16 100644 --- a/aeron-client/src/main/java/io/aeron/CommonContext.java +++ b/aeron-client/src/main/java/io/aeron/CommonContext.java @@ -219,6 +219,11 @@ public static InferableBoolean parse(final String value) */ public static final String MDC_CONTROL_MODE_DYNAMIC = "dynamic"; + /** + * Valid value for {@link #MDC_CONTROL_MODE_PARAM_NAME} when response control is desired. + */ + public static final String CONTROL_MODE_RESPONSE = "response"; + /** * Key for the session id for a publication or restricted subscription. */ @@ -352,6 +357,18 @@ public static InferableBoolean parse(final String value) */ public static final String RESERVED_OFFSET = "reserved"; + /** + * Parameter name for the field that will be used to specify the response endpoint on a subscription and publication + * used in a response "server". + */ + public static final String RESPONSE_ENDPOINT_PARAM_NAME = "response-endpoint"; + + /** + * Parameter name for the field that will be used to specify the correlation id used on a publication to connect it + * to a subscription's image in order to set up a response stream. + */ + public static final String RESPONSE_CORRELATION_ID_PARAM_NAME = "response-correlation-id"; + /** * Property name for a fallback {@link PrintStream} based logger when it is not possible to use the error logging * callback. Supported values are stdout, stderr, no_op (stderr is the default). diff --git a/aeron-client/src/main/java/io/aeron/protocol/HeaderFlyweight.java b/aeron-client/src/main/java/io/aeron/protocol/HeaderFlyweight.java index bd0b620899..48195109e5 100644 --- a/aeron-client/src/main/java/io/aeron/protocol/HeaderFlyweight.java +++ b/aeron-client/src/main/java/io/aeron/protocol/HeaderFlyweight.java @@ -79,6 +79,26 @@ public class HeaderFlyweight extends UnsafeBuffer */ public static final int HDR_TYPE_RES = 0x07; + /** + * header type ATS Data + */ + public static final int HDR_TYPE_ATS_DATA = 0x08; + + /** + * header type ATS Status Message + */ + public static final int HDR_TYPE_ATS_SM = 0x09; + + /** + * header type ATS Setup + */ + public static final int HDR_TYPE_ATS_SETUP = 0x0A; + + /** + * header type Response Setup + */ + public static final int HDR_TYPE_RSP_SETUP = 0x0B; + /** * header type EXT */ diff --git a/aeron-client/src/main/java/io/aeron/protocol/ResponseSetupFlyweight.java b/aeron-client/src/main/java/io/aeron/protocol/ResponseSetupFlyweight.java new file mode 100644 index 0000000000..c819d7cdf9 --- /dev/null +++ b/aeron-client/src/main/java/io/aeron/protocol/ResponseSetupFlyweight.java @@ -0,0 +1,144 @@ +/* + * Copyright 2014-2023 Real Logic Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.aeron.protocol; + +import org.agrona.concurrent.UnsafeBuffer; + +import java.nio.ByteBuffer; + +import static java.nio.ByteOrder.LITTLE_ENDIAN; + +/** + * HeaderFlyweight for Response Setup Message Frames. + *
+ *
+ * Stream Response Setup wiki page.
+ */
+public class ResponseSetupFlyweight extends HeaderFlyweight
+{
+ public static final int HEADER_LENGTH = 20;
+
+ /**
+ * Offset in the frame at which the session-id field begins.
+ */
+ private static final int SESSION_ID_FIELD_OFFSET = 8;
+
+ /**
+ * Offset in the frame at which the stream-id field begins.
+ */
+ private static final int STREAM_ID_FIELD_OFFSET = 12;
+
+ /**
+ * Offset in the frame at which the response session-id field begins.
+ */
+ private static final int RESPONSE_SESSION_ID_FIELD_OFFSET = 16;
+
+ /**
+ * Default constructor which can later be used to wrap a frame.
+ */
+ public ResponseSetupFlyweight()
+ {
+ }
+
+ /**
+ * Construct the flyweight over a frame.
+ *
+ * @param buffer containing the frame.
+ */
+ public ResponseSetupFlyweight(final UnsafeBuffer buffer)
+ {
+ super(buffer);
+ }
+
+ /**
+ * Construct the flyweight over a frame.
+ *
+ * @param buffer containing the frame.
+ */
+ public ResponseSetupFlyweight(final ByteBuffer buffer)
+ {
+ super(buffer);
+ }
+
+ /**
+ * Get session id field.
+ *
+ * @return session id field.
+ */
+ public int sessionId()
+ {
+ return getInt(SESSION_ID_FIELD_OFFSET, LITTLE_ENDIAN);
+ }
+
+ /**
+ * Set session id field.
+ *
+ * @param sessionId field value.
+ * @return this for a fluent API.
+ */
+ public ResponseSetupFlyweight sessionId(final int sessionId)
+ {
+ putInt(SESSION_ID_FIELD_OFFSET, sessionId, LITTLE_ENDIAN);
+
+ return this;
+ }
+
+ /**
+ * Get stream id field.
+ *
+ * @return stream id field.
+ */
+ public int streamId()
+ {
+ return getInt(STREAM_ID_FIELD_OFFSET, LITTLE_ENDIAN);
+ }
+
+ /**
+ * Set stream id field.
+ *
+ * @param streamId field value.
+ * @return this for a fluent API.
+ */
+ public ResponseSetupFlyweight streamId(final int streamId)
+ {
+ putInt(STREAM_ID_FIELD_OFFSET, streamId, LITTLE_ENDIAN);
+
+ return this;
+ }
+
+ /**
+ * Get response session id field.
+ *
+ * @return response session id field.
+ */
+ public int responseSessionId()
+ {
+ return getInt(RESPONSE_SESSION_ID_FIELD_OFFSET, LITTLE_ENDIAN);
+ }
+
+ /**
+ * Set stream id field.
+ *
+ * @param streamId field value.
+ * @return this for a fluent API.
+ */
+ public ResponseSetupFlyweight responseSessionId(final int streamId)
+ {
+ putInt(RESPONSE_SESSION_ID_FIELD_OFFSET, streamId, LITTLE_ENDIAN);
+
+ return this;
+ }
+}
diff --git a/aeron-client/src/main/java/io/aeron/protocol/SetupFlyweight.java b/aeron-client/src/main/java/io/aeron/protocol/SetupFlyweight.java
index 7ceb9da6b5..3dcce3817b 100644
--- a/aeron-client/src/main/java/io/aeron/protocol/SetupFlyweight.java
+++ b/aeron-client/src/main/java/io/aeron/protocol/SetupFlyweight.java
@@ -34,6 +34,11 @@ public class SetupFlyweight extends HeaderFlyweight
*/
public static final int HEADER_LENGTH = 40;
+ /**
+ * Subscriber should send response channel information in SM.
+ */
+ public static final short SEND_RESPONSE_SETUP_FLAG = 0x80;
+
/**
* Offset in the frame at which the term-offset field begins.
*/
diff --git a/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java b/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java
index 168e8ba5ea..f425caa9ad 100644
--- a/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java
+++ b/aeron-client/src/test/java/io/aeron/ChannelUriStringBuilderTest.java
@@ -210,7 +210,7 @@ void shouldBuildChannelBuilderUsingExistingStringWithAllTheFields()
"term-length=1048576|init-term-id=5|term-offset=64|term-id=4353|session-id=2314234|gtag=3|" +
"linger=100000055000001|sparse=true|eos=true|tether=false|group=false|ssc=true|so-sndbuf=8388608|" +
"so-rcvbuf=2097152|rcv-wnd=1048576|media-rcv-ts-offset=reserved|channel-rcv-ts-offset=0|" +
- "channel-snd-ts-offset=8";
+ "channel-snd-ts-offset=8|response-endpoint=127.0.0.3:0|response-correlation-id=12345";
final ChannelUri fromString = ChannelUri.parse(uri);
final ChannelUri fromBuilder = ChannelUri.parse(new ChannelUriStringBuilder(uri).build());
diff --git a/aeron-driver/src/main/c/aeron_data_packet_dispatcher.c b/aeron-driver/src/main/c/aeron_data_packet_dispatcher.c
index ba6236fdf9..833de0b914 100644
--- a/aeron-driver/src/main/c/aeron_data_packet_dispatcher.c
+++ b/aeron-driver/src/main/c/aeron_data_packet_dispatcher.c
@@ -386,6 +386,7 @@ int aeron_data_packet_dispatcher_create_publication(
header->term_offset,
header->term_length,
header->mtu,
+ header->frame_header.flags,
control_addr,
addr,
endpoint,
diff --git a/aeron-driver/src/main/c/aeron_driver_conductor.c b/aeron-driver/src/main/c/aeron_driver_conductor.c
index 7b98953873..393bea772e 100644
--- a/aeron-driver/src/main/c/aeron_driver_conductor.c
+++ b/aeron-driver/src/main/c/aeron_driver_conductor.c
@@ -78,7 +78,7 @@ static inline bool aeron_subscription_link_matches_allowing_wildcard(
{
return link->endpoint == endpoint &&
link->stream_id == stream_id &&
- (!link->has_session_id || (link->session_id == session_id));
+ ((!link->has_session_id && !link->is_response) || (link->session_id == session_id));
}
static inline bool aeron_driver_conductor_has_network_subscription_interest(
@@ -422,6 +422,33 @@ static inline int aeron_driver_conductor_validate_control_for_subscription(aeron
return 0;
}
+static int aeron_driver_conductor_validate_response_subscription(
+ aeron_driver_conductor_t *conductor,
+ aeron_udp_channel_t *udp_channel,
+ aeron_driver_uri_publication_params_t *param)
+{
+ if (AERON_UDP_CHANNEL_CONTROL_MODE_RESPONSE != udp_channel->control_mode &&
+ AERON_NULL_VALUE != param->response_correlation_id)
+ {
+ for (int last_index = (int)conductor->network_subscriptions.length - 1, i = last_index; i >= 0; i--)
+ {
+ aeron_subscription_link_t *link = &conductor->network_subscriptions.array[i];
+ if (param->response_correlation_id == link->registration_id)
+ {
+ return 0;
+ }
+ }
+
+ AERON_SET_ERR(
+ EINVAL,
+ "unable to find response subscription for response-correlation-id=%" PRId64,
+ param->response_correlation_id);
+ return -1;
+ }
+
+ return 0;
+}
+
static int aeron_time_tracking_name_resolver_resolve(
aeron_name_resolver_t *resolver,
const char *name,
@@ -1033,6 +1060,10 @@ void aeron_driver_conductor_unlink_from_endpoint(aeron_driver_conductor_t *condu
{
aeron_receive_channel_endpoint_decref_to_stream_and_session(endpoint, link->stream_id, link->session_id);
}
+ else if (link->is_response)
+ {
+ aeron_receive_channel_endpoint_decref_to_response_stream(endpoint, link->stream_id);
+ }
else
{
aeron_receive_channel_endpoint_decref_to_stream(endpoint, link->stream_id);
@@ -1753,6 +1784,49 @@ aeron_ipc_publication_t *aeron_driver_conductor_get_or_add_ipc_publication(
return ensure_capacity_result >= 0 ? publication : NULL;
}
+static int aeron_driver_conductor_find_response_publication_image(
+ aeron_driver_conductor_t *conductor,
+ const aeron_udp_channel_t *udp_channel,
+ aeron_driver_uri_publication_params_t *params,
+ aeron_publication_image_t **image)
+{
+ *image = NULL;
+ if (AERON_UDP_CHANNEL_CONTROL_MODE_RESPONSE != udp_channel->control_mode)
+ {
+ return 0;
+ }
+
+ if (AERON_NULL_VALUE == params->response_correlation_id)
+ {
+ AERON_SET_ERR(EINVAL, "%s", "control-mode=response was specified, but no response-correlation-id set");
+ return -1;
+ }
+
+ for (size_t i = 0; i < conductor->publication_images.length; i++)
+ {
+ aeron_publication_image_t *image_entry = conductor->publication_images.array[i].image;
+ if (aeron_publication_image_registration_id(image_entry) == params->response_correlation_id)
+ {
+ if (aeron_publication_image_has_send_response_setup(image_entry))
+ {
+ *image = image_entry;
+ return 0;
+ }
+ else
+ {
+ AERON_SET_ERR(
+ EINVAL,
+ "image.correlationId=%" PRId64 " did not request a response channel",
+ params->response_correlation_id);
+ return -1;
+ }
+ }
+ }
+
+ AERON_SET_ERR(EINVAL, "image.correlationId=%" PRId64 " not found", params->response_correlation_id);
+ return -1;
+}
+
aeron_network_publication_t *aeron_driver_conductor_get_or_add_network_publication(
aeron_driver_conductor_t *conductor,
aeron_client_t *client,
@@ -1760,6 +1834,7 @@ aeron_network_publication_t *aeron_driver_conductor_get_or_add_network_publicati
size_t uri_length,
const char *uri,
aeron_driver_uri_publication_params_t *params,
+ aeron_publication_image_t *response_publication_image,
int64_t registration_id,
int32_t stream_id,
bool is_exclusive)
@@ -1774,6 +1849,7 @@ aeron_network_publication_t *aeron_driver_conductor_get_or_add_network_publicati
bool is_session_id_in_use = false;
+ // TODO: Extract
for (size_t i = 0; i < conductor->network_publications.length; i++)
{
aeron_network_publication_t *pub_entry = conductor->network_publications.array[i].publication;
@@ -1781,7 +1857,9 @@ aeron_network_publication_t *aeron_driver_conductor_get_or_add_network_publicati
if (endpoint == pub_entry->endpoint && stream_id == pub_entry->stream_id)
{
if (AERON_NETWORK_PUBLICATION_STATE_ACTIVE == pub_entry->conductor_fields.state &&
- NULL == publication && !is_exclusive && !pub_entry->is_exclusive)
+ !is_exclusive &&
+ !pub_entry->is_exclusive &&
+ pub_entry->response_correlation_id == params->response_correlation_id)
{
publication = pub_entry;
}
@@ -1974,6 +2052,12 @@ aeron_network_publication_t *aeron_driver_conductor_get_or_add_network_publicati
publication->conductor_fields.managed_resource.incref(
publication->conductor_fields.managed_resource.clientd);
}
+
+ if (NULL != response_publication_image)
+ {
+ aeron_publication_image_set_response_session_id(
+ response_publication_image, (int64_t)publication->session_id);
+ }
}
return ensure_capacity_result >= 0 ? publication : NULL;
@@ -2348,6 +2432,7 @@ aeron_receive_channel_endpoint_t *aeron_driver_conductor_get_or_add_receive_chan
conductor->receive_channel_endpoints.array[conductor->receive_channel_endpoints.length++].endpoint = endpoint;
*status_indicator.value_addr = AERON_COUNTER_CHANNEL_ENDPOINT_STATUS_ACTIVE;
+ aeron_driver_receiver_proxy_on_add_endpoint(endpoint->receiver_proxy, endpoint);
}
else
{
@@ -3359,7 +3444,11 @@ int aeron_driver_conductor_on_add_ipc_publication(
aeron_driver_uri_publication_params_t params;
if (aeron_uri_parse(uri_length, uri, &aeron_uri_params) < 0 ||
- aeron_diver_uri_publication_params(&aeron_uri_params, ¶ms, conductor, is_exclusive) < 0)
+ aeron_diver_uri_publication_params(
+ &aeron_uri_params,
+ ¶ms,
+ conductor,
+ is_exclusive) < 0)
{
AERON_APPEND_ERR("%s", "Failed to parse IPC publication URI");
goto error_cleanup;
@@ -3438,9 +3527,14 @@ int aeron_driver_conductor_on_add_network_publication(
aeron_driver_uri_publication_params_t params;
if (aeron_udp_channel_parse(uri_length, uri, &conductor->name_resolver, &udp_channel, false) < 0 ||
- aeron_diver_uri_publication_params(&udp_channel->uri, ¶ms, conductor, is_exclusive) < 0 ||
+ aeron_diver_uri_publication_params(
+ &udp_channel->uri,
+ ¶ms,
+ conductor,
+ is_exclusive) < 0 ||
aeron_driver_conductor_validate_endpoint_for_publication(udp_channel) < 0 ||
- aeron_driver_conductor_validate_control_for_publication(udp_channel) < 0)
+ aeron_driver_conductor_validate_control_for_publication(udp_channel) < 0 ||
+ aeron_driver_conductor_validate_response_subscription(conductor, udp_channel, ¶ms) < 0)
{
AERON_APPEND_ERR("%s", "");
aeron_udp_channel_delete(udp_channel);
@@ -3455,6 +3549,15 @@ int aeron_driver_conductor_on_add_network_publication(
return -1;
}
+ aeron_publication_image_t *response_publication_image = NULL;
+ if (aeron_driver_conductor_find_response_publication_image(
+ conductor, udp_channel, ¶ms, &response_publication_image) < 0)
+ {
+ AERON_APPEND_ERR("%s", "");
+ aeron_udp_channel_delete(udp_channel);
+ return -1;
+ }
+
aeron_send_channel_endpoint_t *endpoint = aeron_driver_conductor_get_or_add_send_channel_endpoint(
conductor, udp_channel, ¶ms, correlation_id);
if (NULL == endpoint)
@@ -3490,12 +3593,14 @@ int aeron_driver_conductor_on_add_network_publication(
uri_length,
uri,
¶ms,
+ response_publication_image,
correlation_id,
command->stream_id,
is_exclusive);
if (NULL == publication)
{
+ AERON_APPEND_ERR("uri=%.*s", uri_length, uri);
return -1;
}
@@ -3621,6 +3726,7 @@ int aeron_driver_conductor_on_add_ipc_subscription(
link->registration_id = command->correlated.correlation_id;
link->is_reliable = true;
link->is_rejoin = true;
+ link->is_response = false;
link->group = AERON_INFER;
link->is_sparse = params.is_sparse;
link->is_tether = params.is_tether;
@@ -3761,6 +3867,30 @@ int aeron_driver_conductor_on_add_spy_subscription(
return 0;
}
+int aeron_driver_conductor_add_network_subscription_to_receiver(
+ aeron_receive_channel_endpoint_t *endpoint,
+ int32_t stream_id,
+ bool has_session_id,
+ int32_t session_id)
+{
+ if (has_session_id)
+ {
+ if (aeron_receive_channel_endpoint_incref_to_stream_and_session(endpoint, stream_id, session_id) < 0)
+ {
+ return -1;
+ }
+ }
+ else
+ {
+ if (aeron_receive_channel_endpoint_incref_to_stream(endpoint, stream_id) < 0)
+ {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
int aeron_driver_conductor_on_add_network_subscription(
aeron_driver_conductor_t *conductor, aeron_subscription_command_t *command)
{
@@ -3768,6 +3898,7 @@ int aeron_driver_conductor_on_add_network_subscription(
size_t uri_length = (size_t)command->channel_length;
int64_t correlation_id = command->correlated.correlation_id;
const char *uri = (const char *)command + sizeof(aeron_subscription_command_t);
+ aeron_udp_channel_control_mode control_mode = AERON_UDP_CHANNEL_CONTROL_MODE_NONE;
aeron_driver_uri_subscription_params_t params;
if (aeron_udp_channel_parse(uri_length, uri, &conductor->name_resolver, &udp_channel, false) < 0 ||
@@ -3779,6 +3910,8 @@ int aeron_driver_conductor_on_add_network_subscription(
return -1;
}
+ control_mode = udp_channel->control_mode;
+
if (NULL == aeron_driver_conductor_get_or_add_client(conductor, command->correlated.client_id))
{
AERON_APPEND_ERR("%s", "Failed to add client");
@@ -3829,18 +3962,20 @@ int aeron_driver_conductor_on_add_network_subscription(
return -1;
}
- if (params.has_session_id)
+ if (AERON_UDP_CHANNEL_CONTROL_MODE_RESPONSE == control_mode)
{
- if (aeron_receive_channel_endpoint_incref_to_stream_and_session(
- endpoint, command->stream_id, params.session_id) < 0)
+ if (aeron_receive_channel_endpoint_incref_to_response_stream(endpoint, command->stream_id) < 0)
{
+ AERON_APPEND_ERR("%s", "");
return -1;
}
}
else
{
- if (aeron_receive_channel_endpoint_incref_to_stream(endpoint, command->stream_id) < 0)
+ if (aeron_driver_conductor_add_network_subscription_to_receiver(
+ endpoint, command->stream_id, params.has_session_id, params.session_id) < 0)
{
+ AERON_APPEND_ERR("%s", "");
return -1;
}
}
@@ -3865,6 +4000,7 @@ int aeron_driver_conductor_on_add_network_subscription(
link->is_sparse = params.is_sparse;
link->is_tether = params.is_tether;
link->is_rejoin = params.is_rejoin;
+ link->is_response = AERON_UDP_CHANNEL_CONTROL_MODE_RESPONSE == control_mode;
link->group = params.group;
link->subscribable_list.length = 0;
link->subscribable_list.capacity = 0;
@@ -4837,6 +4973,7 @@ void aeron_driver_conductor_on_create_publication_image(void *clientd, void *ite
&command->src_address,
command->term_length,
command->mtu_length,
+ command->flags,
&conductor->loss_reporter,
is_reliable,
is_oldest_subscription_sparse,
@@ -4988,6 +5125,50 @@ void aeron_driver_conductor_on_receive_endpoint_removed(void *clientd, void *ite
}
}
+void aeron_driver_conductor_on_response_setup(void *clientd, void *item)
+{
+ aeron_driver_conductor_t *conductor = clientd;
+ aeron_command_response_setup_t *cmd = item;
+ const int64_t response_correlation_id = cmd->response_correlation_id;
+ const int32_t response_session_id = cmd->response_session_id;
+
+ for (size_t i = 0, length = conductor->network_subscriptions.length; i < length; i++)
+ {
+ aeron_subscription_link_t *subscription_link = &conductor->network_subscriptions.array[i];
+ if (subscription_link->registration_id == response_correlation_id &&
+ !subscription_link->has_session_id)
+ {
+ subscription_link->has_session_id = true;
+ subscription_link->session_id = response_session_id;
+ subscription_link->is_response = false;
+
+ aeron_driver_conductor_add_network_subscription_to_receiver(
+ subscription_link->endpoint,
+ subscription_link->stream_id,
+ subscription_link->has_session_id,
+ subscription_link->session_id);
+ aeron_receive_channel_endpoint_decref_to_response_stream(
+ subscription_link->endpoint, subscription_link->stream_id);
+ }
+ }
+}
+
+void aeron_driver_conductor_on_response_connected(void *clientd, void *item)
+{
+ aeron_driver_conductor_t *conductor = clientd;
+ aeron_command_response_connected_t *cmd = item;
+ const int64_t response_correlation_id = cmd->response_correlation_id;
+
+ for (size_t i = 0, length = conductor->publication_images.length; i < length; i++)
+ {
+ aeron_publication_image_t *publication_image = conductor->publication_images.array[i].image;
+ if (response_correlation_id == aeron_publication_image_registration_id(publication_image))
+ {
+ aeron_publication_image_remove_response_session_id(publication_image);
+ }
+ }
+}
+
void aeron_driver_conductor_on_release_resource(void *clientd, void *item)
{
aeron_command_release_resource_t *cmd = item;
diff --git a/aeron-driver/src/main/c/aeron_driver_conductor.h b/aeron-driver/src/main/c/aeron_driver_conductor.h
index c87d1f4200..3da59080e4 100644
--- a/aeron-driver/src/main/c/aeron_driver_conductor.h
+++ b/aeron-driver/src/main/c/aeron_driver_conductor.h
@@ -93,6 +93,7 @@ typedef struct aeron_subscription_link_stct
bool is_reliable;
bool is_rejoin;
bool has_session_id;
+ bool is_response;
aeron_inferable_boolean_t group;
int32_t stream_id;
int32_t session_id;
@@ -519,6 +520,10 @@ void aeron_driver_conductor_on_re_resolve_control(void *clientd, void *item);
void aeron_driver_conductor_on_receive_endpoint_removed(void *clientd, void *item);
+void aeron_driver_conductor_on_response_setup(void *clientd, void *item);
+
+void aeron_driver_conductor_on_response_connected(void *clientd, void *item);
+
void aeron_driver_conductor_on_release_resource(void *clientd, void *item);
aeron_send_channel_endpoint_t *aeron_driver_conductor_find_send_channel_endpoint_by_tag(
diff --git a/aeron-driver/src/main/c/aeron_driver_conductor_proxy.c b/aeron-driver/src/main/c/aeron_driver_conductor_proxy.c
index 2504bf9349..b6bbae5605 100644
--- a/aeron-driver/src/main/c/aeron_driver_conductor_proxy.c
+++ b/aeron-driver/src/main/c/aeron_driver_conductor_proxy.c
@@ -44,6 +44,7 @@ void aeron_driver_conductor_proxy_on_create_publication_image_cmd(
int32_t term_offset,
int32_t term_length,
int32_t mtu_length,
+ uint8_t flags,
struct sockaddr_storage *control_address,
struct sockaddr_storage *src_address,
void *endpoint,
@@ -59,6 +60,7 @@ void aeron_driver_conductor_proxy_on_create_publication_image_cmd(
.term_offset = term_offset,
.term_length = term_length,
.mtu_length = mtu_length,
+ .flags = flags,
.endpoint = endpoint,
.destination = destination
};
@@ -187,6 +189,52 @@ void aeron_driver_conductor_proxy_on_receive_endpoint_removed(
}
}
+void aeron_driver_conductor_proxy_on_response_setup(
+ aeron_driver_conductor_proxy_t *conductor_proxy,
+ int64_t response_correlation_id,
+ int32_t response_session_id)
+{
+ aeron_command_response_setup_t cmd = {
+ .base = {
+ .func = aeron_driver_conductor_on_response_setup,
+ .item = NULL
+ },
+ .response_correlation_id = response_correlation_id,
+ .response_session_id = response_session_id
+ };
+
+ if (AERON_THREADING_MODE_IS_SHARED_OR_INVOKER(conductor_proxy->threading_mode))
+ {
+ aeron_driver_conductor_on_response_setup(conductor_proxy->conductor, &cmd);
+ }
+ else
+ {
+ aeron_driver_conductor_proxy_offer(conductor_proxy, &cmd, sizeof(cmd));
+ }
+}
+
+void aeron_driver_conductor_proxy_on_response_connected(
+ aeron_driver_conductor_proxy_t *conductor_proxy,
+ int64_t response_correlation_id)
+{
+ aeron_command_response_connected_t cmd = {
+ .base = {
+ .func = aeron_driver_conductor_on_response_connected,
+ .item = NULL
+ },
+ .response_correlation_id = response_correlation_id
+ };
+
+ if (AERON_THREADING_MODE_IS_SHARED_OR_INVOKER(conductor_proxy->threading_mode))
+ {
+ aeron_driver_conductor_on_response_connected(conductor_proxy->conductor, &cmd);
+ }
+ else
+ {
+ aeron_driver_conductor_proxy_offer(conductor_proxy, &cmd, sizeof(cmd));
+ }
+}
+
void aeron_driver_conductor_proxy_on_release_resource(
aeron_driver_conductor_proxy_t *conductor_proxy,
void *managed_resource,
diff --git a/aeron-driver/src/main/c/aeron_driver_conductor_proxy.h b/aeron-driver/src/main/c/aeron_driver_conductor_proxy.h
index de3a1119e3..9a08a7249c 100644
--- a/aeron-driver/src/main/c/aeron_driver_conductor_proxy.h
+++ b/aeron-driver/src/main/c/aeron_driver_conductor_proxy.h
@@ -51,6 +51,7 @@ typedef struct aeron_command_create_publication_image_stct
int32_t term_offset;
int32_t term_length;
int32_t mtu_length;
+ uint8_t flags;
struct sockaddr_storage control_address;
struct sockaddr_storage src_address;
void *endpoint;
@@ -77,6 +78,21 @@ typedef struct aeron_command_delete_destination_stct
}
aeron_command_delete_destination_t;
+struct aeron_command_response_connected_stct
+{
+ aeron_command_base_t base;
+ int64_t response_correlation_id;
+};
+typedef struct aeron_command_response_connected_stct aeron_command_response_connected_t;
+
+struct aeron_command_response_setup_stct
+{
+ aeron_command_base_t base;
+ int64_t response_correlation_id;
+ int32_t response_session_id;
+};
+typedef struct aeron_command_response_setup_stct aeron_command_response_setup_t;
+
struct aeron_command_release_resource_stct
{
aeron_command_base_t base;
@@ -93,6 +109,7 @@ void aeron_driver_conductor_proxy_on_create_publication_image_cmd(
int32_t term_offset,
int32_t term_length,
int32_t mtu_length,
+ uint8_t flags,
struct sockaddr_storage *control_address,
struct sockaddr_storage *src_address,
void *endpoint,
@@ -125,9 +142,19 @@ void aeron_driver_conductor_proxy_on_receive_endpoint_removed(
aeron_driver_conductor_proxy_t *conductor_proxy,
void *endpoint);
+void aeron_driver_conductor_proxy_on_response_setup(
+ aeron_driver_conductor_proxy_t *conductor_proxy,
+ int64_t response_correlation_id,
+ int32_t response_session_id);
+
+void aeron_driver_conductor_proxy_on_response_connected(
+ aeron_driver_conductor_proxy_t *conductor_proxy,
+ int64_t response_correlation_id);
+
void aeron_driver_conductor_proxy_on_release_resource(
aeron_driver_conductor_proxy_t *conductor_proxy,
void *managed_resource,
aeron_driver_conductor_resource_type_t resource_type);
+
#endif //AERON_DRIVER_CONDUCTOR_PROXY_H
diff --git a/aeron-driver/src/main/c/aeron_driver_receiver.c b/aeron-driver/src/main/c/aeron_driver_receiver.c
index aa70526577..bf5723ca80 100644
--- a/aeron-driver/src/main/c/aeron_driver_receiver.c
+++ b/aeron-driver/src/main/c/aeron_driver_receiver.c
@@ -285,7 +285,7 @@ void aeron_driver_receiver_on_add_endpoint(void *clientd, void *command)
return;
}
- aeron_receive_channel_endpoint_add_pending_setup(endpoint, receiver);
+ aeron_receive_channel_endpoint_add_pending_setup(endpoint, receiver, 0, 0);
}
void aeron_driver_receiver_on_remove_endpoint(void *clientd, void *command)
@@ -364,6 +364,14 @@ void aeron_driver_receiver_on_add_subscription_by_session(void *clientd, void *i
{
AERON_APPEND_ERR("%s", "receiver on_add_subscription");
aeron_driver_receiver_log_error(receiver);
+ return;
+ }
+
+ if (aeron_receive_channel_endpoint_add_pending_setup(endpoint, receiver, cmd->session_id, cmd->stream_id) < 0)
+ {
+ AERON_APPEND_ERR("%s", "receiver on_add_subscription");
+ aeron_driver_receiver_log_error(receiver);
+ return;
}
}
@@ -415,7 +423,7 @@ void aeron_driver_receiver_on_add_destination(void *clientd, void *item)
if (destination->conductor_fields.udp_channel->has_explicit_control)
{
- if (aeron_receive_channel_endpoint_add_pending_setup_destination(endpoint, receiver, destination) < 0)
+ if (aeron_receive_channel_endpoint_add_pending_setup_destination(endpoint, receiver, destination, 0, 0) < 0)
{
AERON_APPEND_ERR("%s", "on_add_destination, pending_setup");
aeron_driver_receiver_log_error(receiver);
diff --git a/aeron-driver/src/main/c/aeron_network_publication.c b/aeron-driver/src/main/c/aeron_network_publication.c
index 7c931d348d..c584859a53 100644
--- a/aeron-driver/src/main/c/aeron_network_publication.c
+++ b/aeron-driver/src/main/c/aeron_network_publication.c
@@ -238,6 +238,10 @@ int aeron_network_publication_create(
_pub->conductor_fields.last_snd_pos = aeron_counter_get(_pub->snd_pos_position.value_addr);
_pub->conductor_fields.clean_position = _pub->conductor_fields.last_snd_pos;
+ _pub->endpoint_address.ss_family = AF_UNSPEC;
+ _pub->is_response = AERON_UDP_CHANNEL_CONTROL_MODE_RESPONSE == endpoint->conductor_fields.udp_channel->control_mode;
+ _pub->response_correlation_id = params->response_correlation_id;
+
*publication = _pub;
return 0;
@@ -290,6 +294,30 @@ bool aeron_network_publication_free(aeron_network_publication_t *publication)
return true;
}
+static int aeron_network_publication_do_send(
+ aeron_network_publication_t *publication,
+ struct iovec *iov,
+ size_t iov_length,
+ int64_t *bytes_sent)
+{
+ if (publication->is_response)
+ {
+ if (AF_UNSPEC != publication->endpoint_address.ss_family)
+ {
+ return aeron_send_channel_send_endpoint_address(
+ publication->endpoint, &publication->endpoint_address, iov, iov_length, bytes_sent);
+ }
+ else
+ {
+ return 0;
+ }
+ }
+ else
+ {
+ return aeron_send_channel_send(publication->endpoint, iov, iov_length, bytes_sent);
+ }
+}
+
int aeron_network_publication_setup_message_check(
aeron_network_publication_t *publication, int64_t now_ns, int32_t active_term_id, int32_t term_offset)
{
@@ -306,6 +334,10 @@ int aeron_network_publication_setup_message_check(
setup_header->frame_header.version = AERON_FRAME_HEADER_VERSION;
setup_header->frame_header.flags = 0;
setup_header->frame_header.type = AERON_HDR_TYPE_SETUP;
+ setup_header->frame_header.flags =
+ (!publication->is_response && AERON_NULL_VALUE != publication->response_correlation_id) ?
+ AERON_SETUP_HEADER_SEND_RESPONSE_FLAG : 0;
+
setup_header->term_offset = term_offset;
setup_header->session_id = publication->session_id;
setup_header->stream_id = publication->stream_id;
@@ -330,7 +362,7 @@ int aeron_network_publication_setup_message_check(
*publication->snd_pos_position.value_addr);
}
- if (0 <= (result = aeron_send_channel_send(publication->endpoint, &iov, 1, &bytes_sent)))
+ if (0 <= (result = aeron_network_publication_do_send(publication, &iov, 1, &bytes_sent)))
{
if (bytes_sent < (int64_t)iov.iov_len)
{
@@ -385,7 +417,7 @@ int aeron_network_publication_heartbeat_message_check(
iov.iov_base = heartbeat_buffer;
iov.iov_len = sizeof(aeron_data_header_t);
- if (0 <= (result = aeron_send_channel_send(publication->endpoint, &iov, 1, &bytes_sent)))
+ if (0 <= (result = aeron_network_publication_do_send(publication, &iov, 1, &bytes_sent)))
{
result = (int)bytes_sent;
if (bytes_sent < (int64_t)iov.iov_len)
@@ -453,7 +485,7 @@ int aeron_network_publication_send_data(
if (vlen > 0)
{
- result = aeron_send_channel_send(publication->endpoint, iov, vlen, &bytes_sent);
+ result = aeron_network_publication_do_send(publication, iov, vlen, &bytes_sent);
if (result == vlen) /* assume that a partial send from a broken stack will also move the snd-pos */
{
publication->time_of_last_data_or_heartbeat_ns = now_ns;
@@ -587,7 +619,7 @@ int aeron_network_publication_resend(void *clientd, int32_t term_id, int32_t ter
iov.iov_len = (uint32_t)available;
int64_t msg_bytes_sent = 0;
- int sendmsg_result = aeron_send_channel_send(publication->endpoint, &iov, 1, &msg_bytes_sent);
+ int sendmsg_result = aeron_network_publication_do_send(publication, &iov, 1, &msg_bytes_sent);
if (0 <= sendmsg_result)
{
if (msg_bytes_sent < (int64_t)iov.iov_len)
@@ -650,7 +682,11 @@ inline static void aeron_network_publication_update_connected_status(
}
void aeron_network_publication_on_status_message(
- aeron_network_publication_t *publication, const uint8_t *buffer, size_t length, struct sockaddr_storage *addr)
+ aeron_network_publication_t *publication,
+ aeron_driver_conductor_proxy_t *conductor_proxy,
+ const uint8_t *buffer,
+ size_t length,
+ struct sockaddr_storage *addr)
{
const int64_t time_ns = aeron_clock_cached_nano_time(publication->cached_clock);
const aeron_status_message_header_t *sm = (aeron_status_message_header_t *)buffer;
@@ -659,6 +695,7 @@ void aeron_network_publication_on_status_message(
if (!publication->has_receivers)
{
AERON_PUT_ORDERED(publication->has_receivers, true);
+ aeron_driver_conductor_proxy_on_response_connected(conductor_proxy, publication->response_correlation_id);
}
if (!publication->has_initial_connection)
@@ -715,7 +752,7 @@ void aeron_network_publication_on_rttm(
iov.iov_base = rttm_reply_buffer;
iov.iov_len = sizeof(aeron_rttm_header_t);
- if (0 <= aeron_send_channel_send(publication->endpoint, &iov, 1, &bytes_sent))
+ if (0 <= aeron_network_publication_do_send(publication, &iov, 1, &bytes_sent))
{
if (bytes_sent < (int64_t)iov.iov_len)
{
diff --git a/aeron-driver/src/main/c/aeron_network_publication.h b/aeron-driver/src/main/c/aeron_network_publication.h
index 2927ca92ce..25117f8902 100644
--- a/aeron-driver/src/main/c/aeron_network_publication.h
+++ b/aeron-driver/src/main/c/aeron_network_publication.h
@@ -80,6 +80,8 @@ typedef struct aeron_network_publication_stct
int64_t time_of_last_setup_ns;
uint8_t sender_fields_pad_rhs[AERON_CACHE_LINE_LENGTH];
+ struct sockaddr_storage endpoint_address;
+
char *log_file_name;
int64_t term_buffer_length;
int64_t term_window_length;
@@ -89,6 +91,7 @@ typedef struct aeron_network_publication_stct
int64_t connection_timeout_ns;
int64_t tag;
+ int64_t response_correlation_id;
int32_t session_id;
int32_t stream_id;
int32_t initial_term_id;
@@ -103,6 +106,7 @@ typedef struct aeron_network_publication_stct
bool signal_eos;
bool is_setup_elicited;
bool is_exclusive;
+ bool is_response;
volatile bool has_receivers;
volatile bool has_spies;
volatile bool is_connected;
@@ -162,7 +166,11 @@ void aeron_network_publication_on_nak(
aeron_network_publication_t *publication, int32_t term_id, int32_t term_offset, int32_t length);
void aeron_network_publication_on_status_message(
- aeron_network_publication_t *publication, const uint8_t *buffer, size_t length, struct sockaddr_storage *addr);
+ aeron_network_publication_t *publication,
+ aeron_driver_conductor_proxy_t *conductor_proxy,
+ const uint8_t *buffer,
+ size_t length,
+ struct sockaddr_storage *addr);
void aeron_network_publication_on_rttm(
aeron_network_publication_t *publication, const uint8_t *buffer, size_t length, struct sockaddr_storage *addr);
@@ -249,6 +257,18 @@ inline void aeron_network_publication_trigger_send_setup_frame(
length,
addr,
time_ns);
+
+ if (publication->is_response)
+ {
+ if (AF_INET == addr->ss_family)
+ {
+ memcpy(&publication->endpoint_address, addr, sizeof(struct sockaddr_in));
+ }
+ else if (AF_INET6 == addr->ss_family)
+ {
+ memcpy(&publication->endpoint_address, addr, sizeof(struct sockaddr_in6));
+ }
+ }
}
}
diff --git a/aeron-driver/src/main/c/aeron_publication_image.c b/aeron-driver/src/main/c/aeron_publication_image.c
index b9d6e4c94e..e44be24a1b 100644
--- a/aeron-driver/src/main/c/aeron_publication_image.c
+++ b/aeron-driver/src/main/c/aeron_publication_image.c
@@ -23,6 +23,8 @@
#include "aeron_driver_conductor.h"
#include "concurrent/aeron_term_gap_filler.h"
+#define AERON_PUBLICATION_RESPONSE_NULL_RESPONSE_SESSION_ID INT64_C(0xF000000000000000)
+
static void aeron_publication_image_connection_set_control_address(
aeron_publication_image_connection_t *connection, const struct sockaddr_storage *control_address)
{
@@ -52,6 +54,15 @@ static void aeron_update_active_transport_count(aeron_publication_image_t *image
}
}
+static bool aeron_publication_image_check_and_get_response_session_id(
+ aeron_publication_image_t *image, int32_t *response_session_id)
+{
+ int64_t _response_session_id;
+ AERON_GET_VOLATILE(_response_session_id, image->response_session_id);
+ *response_session_id = (int32_t)_response_session_id;
+ return ((int64_t)INT32_MIN) <= _response_session_id && _response_session_id <= ((int64_t)INT32_MAX);
+}
+
int aeron_publication_image_create(
aeron_publication_image_t **image,
aeron_receive_channel_endpoint_t *endpoint,
@@ -70,6 +81,7 @@ int aeron_publication_image_create(
struct sockaddr_storage *source_address,
int32_t term_buffer_length,
int32_t sender_mtu_length,
+ uint8_t flags,
aeron_loss_reporter_t *loss_reporter,
bool is_reliable,
bool is_sparse,
@@ -177,6 +189,7 @@ int aeron_publication_image_create(
_image->conductor_fields.is_reliable = is_reliable;
_image->conductor_fields.state = AERON_PUBLICATION_IMAGE_STATE_ACTIVE;
_image->conductor_fields.liveness_timeout_ns = (int64_t)context->image_liveness_timeout_ns;
+ _image->conductor_fields.flags = flags;
_image->session_id = session_id;
_image->stream_id = stream_id;
_image->rcv_hwm_position.counter_id = rcv_hwm_position->counter_id;
@@ -242,6 +255,7 @@ int aeron_publication_image_create(
_image->conductor_fields.clean_position = initial_position;
_image->conductor_fields.time_of_last_state_change_ns = now_ns;
+ aeron_publication_image_remove_response_session_id(_image);
aeron_counter_set_ordered(_image->rcv_hwm_position.value_addr, initial_position);
aeron_counter_set_ordered(_image->rcv_pos_position.value_addr, initial_position);
@@ -614,12 +628,44 @@ int aeron_publication_image_on_rttm(
int aeron_publication_image_send_pending_status_message(aeron_publication_image_t *image, int64_t now_ns)
{
int work_count = 0;
-
int64_t change_number;
+ int32_t response_session_id = 0;
AERON_GET_VOLATILE(change_number, image->end_sm_change);
+ const bool has_sm_timed_out = now_ns > (image->time_of_last_sm_ns + image->sm_timeout_ns);
- if (change_number != image->last_sm_change_number ||
- (now_ns > (image->time_of_last_sm_ns + image->sm_timeout_ns)))
+ if (has_sm_timed_out && aeron_publication_image_check_and_get_response_session_id(image, &response_session_id))
+ {
+ for (size_t i = 0, len = image->connections.length; i < len; i++)
+ {
+ aeron_publication_image_connection_t *connection = &image->connections.array[i];
+
+ if (aeron_publication_image_connection_is_alive(connection, now_ns))
+ {
+ int send_response_setup_result = aeron_receive_channel_endpoint_send_response_setup(
+ image->endpoint,
+ connection->destination,
+ connection->control_addr,
+ image->stream_id,
+ image->session_id,
+ response_session_id);
+
+ if (send_response_setup_result < 0)
+ {
+ work_count = send_response_setup_result;
+ break;
+ }
+
+ work_count++;
+ }
+ }
+ }
+
+ if (work_count < 0)
+ {
+ return work_count;
+ }
+
+ if (change_number != image->last_sm_change_number || has_sm_timed_out)
{
const int64_t sm_position = image->next_sm_position;
const int32_t receiver_window_length = image->next_sm_receiver_window_length;
@@ -989,6 +1035,11 @@ void aeron_publication_image_receiver_release(aeron_publication_image_t *image)
AERON_PUT_ORDERED(image->has_receiver_released, true);
}
+void aeron_publication_image_remove_response_session_id(aeron_publication_image_t *image)
+{
+ aeron_publication_image_set_response_session_id(image, AERON_PUBLICATION_RESPONSE_NULL_RESPONSE_SESSION_ID);
+}
+
extern bool aeron_publication_image_is_heartbeat(const uint8_t *buffer, size_t length);
extern bool aeron_publication_image_is_end_of_stream(const uint8_t *buffer, size_t length);
@@ -1016,4 +1067,9 @@ extern const char *aeron_publication_image_log_file_name(aeron_publication_image
extern int64_t aeron_publication_image_registration_id(aeron_publication_image_t *image);
+extern bool aeron_publication_image_has_send_response_setup(aeron_publication_image_t *image);
+
+void aeron_publication_image_set_response_session_id(
+ aeron_publication_image_t *image, int64_t response_session_id);
+
extern int64_t aeron_publication_image_join_position(aeron_publication_image_t *image);
diff --git a/aeron-driver/src/main/c/aeron_publication_image.h b/aeron-driver/src/main/c/aeron_publication_image.h
index a250c7a78b..4853cbbb7c 100644
--- a/aeron-driver/src/main/c/aeron_publication_image.h
+++ b/aeron-driver/src/main/c/aeron_publication_image.h
@@ -62,6 +62,7 @@ typedef struct aeron_publication_image_stct
int64_t liveness_timeout_ns;
int64_t clean_position;
aeron_receive_channel_endpoint_t *endpoint;
+ uint8_t flags;
}
conductor_fields;
@@ -129,6 +130,8 @@ typedef struct aeron_publication_image_stct
int64_t time_of_last_packet_ns;
+ volatile int64_t response_session_id;
+
volatile bool is_end_of_stream;
volatile bool is_sending_eos_sm;
volatile bool has_receiver_released;
@@ -161,6 +164,7 @@ int aeron_publication_image_create(
struct sockaddr_storage *source_address,
int32_t term_buffer_length,
int32_t sender_mtu_length,
+ uint8_t flags,
aeron_loss_reporter_t *loss_reporter,
bool is_reliable,
bool is_sparse,
@@ -309,6 +313,20 @@ inline int64_t aeron_publication_image_registration_id(aeron_publication_image_t
return image->conductor_fields.managed_resource.registration_id;
}
+inline bool aeron_publication_image_has_send_response_setup(aeron_publication_image_t *image)
+{
+ return AERON_SETUP_HEADER_SEND_RESPONSE_FLAG & image->conductor_fields.flags;
+}
+
+// Called from Conductor
+inline void aeron_publication_image_set_response_session_id(
+ aeron_publication_image_t *image, int64_t response_session_id)
+{
+ AERON_PUT_ORDERED(image->response_session_id, response_session_id);
+}
+
+void aeron_publication_image_remove_response_session_id(aeron_publication_image_t *image);
+
inline int64_t aeron_publication_image_join_position(aeron_publication_image_t *image)
{
int64_t position = *image->rcv_pos_position.value_addr;
diff --git a/aeron-driver/src/main/c/agent/aeron_driver_agent.c b/aeron-driver/src/main/c/agent/aeron_driver_agent.c
index 0bb1409c8e..464c252883 100644
--- a/aeron-driver/src/main/c/agent/aeron_driver_agent.c
+++ b/aeron-driver/src/main/c/agent/aeron_driver_agent.c
@@ -1585,6 +1585,9 @@ static const char *dissect_frame_type(int16_t type)
case AERON_HDR_TYPE_ATS_SM:
return "ATS_SM";
+ case AERON_HDR_TYPE_RSP_SETUP:
+ return "RSP_SETUP";
+
default:
return "unknown command";
}
@@ -1669,6 +1672,20 @@ static const char *dissect_frame(const void *message, size_t length)
break;
}
+ case AERON_HDR_TYPE_RSP_SETUP:
+ {
+ aeron_response_setup_header_t *rsp_setup = (aeron_response_setup_header_t *)message;
+
+ snprintf(buffer, sizeof(buffer) - 1, "%s 0x%x len %d %d:%d RSP_SESSION_ID %d",
+ dissect_frame_type(hdr->type),
+ hdr->flags,
+ hdr->frame_length,
+ rsp_setup->session_id,
+ rsp_setup->stream_id,
+ rsp_setup->response_session_id);
+ break;
+ }
+
case AERON_HDR_TYPE_RTTM:
{
aeron_rttm_header_t *rttm = (aeron_rttm_header_t *)message;
diff --git a/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c b/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c
index 7a490a4a29..8023c26a43 100644
--- a/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c
+++ b/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.c
@@ -34,7 +34,7 @@ int aeron_receive_channel_endpoint_set_group_tag(
aeron_driver_context_t *context)
{
int64_t group_tag = 0;
- int rc = aeron_uri_get_int64(&channel->uri.params.udp.additional_params, AERON_URI_GTAG_KEY, &group_tag);
+ int rc = aeron_uri_get_int64(&channel->uri.params.udp.additional_params, AERON_URI_GTAG_KEY, 0, &group_tag);
if (rc < 0)
{
return -1;
@@ -91,6 +91,13 @@ int aeron_receive_channel_endpoint_create(
return -1;
}
+ if (aeron_int64_counter_map_init(
+ &_endpoint->response_stream_id_to_refcnt_map, 0, 16, AERON_MAP_DEFAULT_LOAD_FACTOR) < 0)
+ {
+ AERON_APPEND_ERR("%s", "could not init response_stream_id_to_refcnt_map");
+ return -1;
+ }
+
_endpoint->conductor_fields.managed_resource.clientd = _endpoint;
_endpoint->conductor_fields.managed_resource.registration_id = -1;
_endpoint->conductor_fields.status = AERON_RECEIVE_CHANNEL_ENDPOINT_STATUS_ACTIVE;
@@ -142,6 +149,7 @@ int aeron_receive_channel_endpoint_delete(
aeron_int64_counter_map_delete(&endpoint->stream_id_to_refcnt_map);
aeron_int64_counter_map_delete(&endpoint->stream_and_session_id_to_refcnt_map);
+ aeron_int64_counter_map_delete(&endpoint->response_stream_id_to_refcnt_map);
aeron_data_packet_dispatcher_close(&endpoint->dispatcher);
bool delete_this_channel = false;
@@ -334,6 +342,40 @@ int aeron_receive_channel_endpoint_send_rttm(
return bytes_sent;
}
+int aeron_receive_channel_endpoint_send_response_setup(
+ aeron_receive_channel_endpoint_t *endpoint,
+ aeron_receive_destination_t *destination,
+ struct sockaddr_storage *addr,
+ int32_t stream_id,
+ int32_t session_id,
+ int32_t response_session_id)
+{
+ uint8_t buffer[sizeof(aeron_response_setup_header_t)];
+ aeron_response_setup_header_t *res_setup_header = (aeron_response_setup_header_t *)buffer;
+ struct iovec iov;
+
+ res_setup_header->frame_header.frame_length = sizeof(aeron_response_setup_header_t);
+ res_setup_header->frame_header.version = AERON_FRAME_HEADER_VERSION;
+ res_setup_header->frame_header.flags = UINT8_C(0);
+ res_setup_header->frame_header.type = AERON_HDR_TYPE_RSP_SETUP;
+ res_setup_header->session_id = session_id;
+ res_setup_header->stream_id = stream_id;
+ res_setup_header->response_session_id = response_session_id;
+
+ iov.iov_base = buffer;
+ iov.iov_len = sizeof(aeron_response_setup_header_t);
+ int bytes_sent = aeron_receive_channel_endpoint_send(endpoint, destination, addr, &iov);
+ if (bytes_sent != (int)iov.iov_len)
+ {
+ if (bytes_sent >= 0)
+ {
+ aeron_counter_increment(endpoint->short_sends_counter, 1);
+ }
+ }
+
+ return bytes_sent;
+}
+
void aeron_receive_channel_endpoint_dispatch(
aeron_udp_channel_data_paths_t *data_paths,
aeron_udp_channel_transport_t *transport,
@@ -503,6 +545,7 @@ void aeron_receive_channel_endpoint_try_remove_endpoint(aeron_receive_channel_en
{
if (0 == endpoint->stream_id_to_refcnt_map.size &&
0 == endpoint->stream_and_session_id_to_refcnt_map.size &&
+ 0 == endpoint->response_stream_id_to_refcnt_map.size &&
0 >= endpoint->conductor_fields.image_ref_count)
{
/* mark as CLOSING to be aware not to use again (to be receiver_released and deleted) */
@@ -521,16 +564,6 @@ int aeron_receive_channel_endpoint_incref_to_stream(aeron_receive_channel_endpoi
if (1 == count)
{
- const bool is_first_subscription =
- 1 == endpoint->stream_id_to_refcnt_map.size &&
- 0 == endpoint->stream_and_session_id_to_refcnt_map.size &&
- 0 == endpoint->conductor_fields.image_ref_count;
-
- if (is_first_subscription)
- {
- aeron_driver_receiver_proxy_on_add_endpoint(endpoint->receiver_proxy, endpoint);
- }
-
aeron_driver_receiver_proxy_on_add_subscription(endpoint->receiver_proxy, endpoint, stream_id);
}
@@ -563,6 +596,44 @@ int aeron_receive_channel_endpoint_decref_to_stream(aeron_receive_channel_endpoi
return result;
}
+int aeron_receive_channel_endpoint_incref_to_response_stream(
+ aeron_receive_channel_endpoint_t *endpoint, int32_t stream_id)
+{
+ int64_t count;
+ if (aeron_int64_counter_map_inc_and_get(&endpoint->response_stream_id_to_refcnt_map, stream_id, &count) < 0)
+ {
+ return -1;
+ }
+
+ return 0;
+}
+
+int aeron_receive_channel_endpoint_decref_to_response_stream(
+ aeron_receive_channel_endpoint_t *endpoint, int32_t stream_id)
+{
+ const int64_t count = aeron_int64_counter_map_get(&endpoint->response_stream_id_to_refcnt_map, stream_id);
+
+ if (0 == count)
+ {
+ return 0;
+ }
+
+ int64_t count_after_dec = 0;
+ int result = aeron_int64_counter_map_dec_and_get(
+ &endpoint->response_stream_id_to_refcnt_map, stream_id, &count_after_dec);
+ if (result < 0)
+ {
+ return -1;
+ }
+
+ if (0 == count_after_dec)
+ {
+ aeron_receive_channel_endpoint_try_remove_endpoint(endpoint);
+ }
+
+ return result;
+}
+
int aeron_receive_channel_endpoint_incref_to_stream_and_session(
aeron_receive_channel_endpoint_t *endpoint,
int32_t stream_id,
@@ -579,16 +650,6 @@ int aeron_receive_channel_endpoint_incref_to_stream_and_session(
if (1 == count)
{
- const bool is_first_subscription =
- 0 == endpoint->stream_id_to_refcnt_map.size &&
- 1 == endpoint->stream_and_session_id_to_refcnt_map.size &&
- 0 == endpoint->conductor_fields.image_ref_count;
-
- if (is_first_subscription)
- {
- aeron_driver_receiver_proxy_on_add_endpoint(endpoint->receiver_proxy, endpoint);
- }
-
aeron_driver_receiver_proxy_on_add_subscription_by_session(
endpoint->receiver_proxy, endpoint, stream_id, session_id);
}
@@ -919,21 +980,23 @@ int aeron_receive_channel_endpoint_remove_poll_transports(
int aeron_receive_channel_endpoint_add_pending_setup_destination(
aeron_receive_channel_endpoint_t *endpoint,
aeron_driver_receiver_t *receiver,
- aeron_receive_destination_t *destination)
+ aeron_receive_destination_t *destination,
+ int32_t session_id,
+ int32_t stream_id)
{
aeron_udp_channel_t *udp_channel = destination->conductor_fields.udp_channel;
if (destination->conductor_fields.udp_channel->has_explicit_control)
{
if (aeron_driver_receiver_add_pending_setup(
- receiver, endpoint, destination, 0, 0, &udp_channel->local_control) < 0)
+ receiver, endpoint, destination, session_id, stream_id, &udp_channel->local_control) < 0)
{
AERON_APPEND_ERR("%s", "Failed to add pending setup for receiver");
return -1;
}
if (aeron_receive_channel_endpoint_send_sm(
- endpoint, destination, &destination->current_control_addr, 0, 0, 0, 0, 0,
+ endpoint, destination, &destination->current_control_addr, stream_id, session_id, 0, 0, 0,
AERON_STATUS_MESSAGE_HEADER_SEND_SETUP_FLAG) < 0)
{
AERON_APPEND_ERR("%s", "Failed to send sm for receiver");
@@ -947,12 +1010,16 @@ int aeron_receive_channel_endpoint_add_pending_setup_destination(
}
int aeron_receive_channel_endpoint_add_pending_setup(
- aeron_receive_channel_endpoint_t *endpoint, aeron_driver_receiver_t *receiver)
+ aeron_receive_channel_endpoint_t *endpoint,
+ aeron_driver_receiver_t *receiver,
+ int32_t session_id,
+ int32_t stream_id)
{
for (size_t i = 0, len = endpoint->destinations.length; i < len; i++)
{
aeron_receive_destination_t *destination = endpoint->destinations.array[i].destination;
- if (aeron_receive_channel_endpoint_add_pending_setup_destination(endpoint, receiver, destination) < 0)
+ if (aeron_receive_channel_endpoint_add_pending_setup_destination(
+ endpoint, receiver, destination, session_id, stream_id) < 0)
{
AERON_APPEND_ERR("%s", "");
aeron_driver_receiver_log_error(receiver);
@@ -965,11 +1032,6 @@ int aeron_receive_channel_endpoint_add_pending_setup(
extern void aeron_receive_channel_endpoint_on_remove_pending_setup(
aeron_receive_channel_endpoint_t *endpoint, int32_t session_id, int32_t stream_id);
-extern int aeron_receive_channel_endpoint_on_remove_cool_down(
- aeron_receive_channel_endpoint_t *endpoint, int32_t session_id, int32_t stream_id);
-
-extern size_t aeron_receive_channel_endpoint_stream_count(aeron_receive_channel_endpoint_t *endpoint);
-
extern void aeron_receive_channel_endpoint_receiver_release(aeron_receive_channel_endpoint_t *endpoint);
extern bool aeron_receive_channel_endpoint_has_receiver_released(aeron_receive_channel_endpoint_t *endpoint);
diff --git a/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.h b/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.h
index 072188fb83..c89f97afdb 100644
--- a/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.h
+++ b/aeron-driver/src/main/c/media/aeron_receive_channel_endpoint.h
@@ -62,6 +62,7 @@ typedef struct aeron_receive_channel_endpoint_stct
aeron_data_packet_dispatcher_t dispatcher;
aeron_int64_counter_map_t stream_id_to_refcnt_map;
aeron_int64_counter_map_t stream_and_session_id_to_refcnt_map;
+ aeron_int64_counter_map_t response_stream_id_to_refcnt_map;
aeron_atomic_counter_t channel_status;
aeron_driver_receiver_proxy_t *receiver_proxy;
aeron_udp_channel_transport_bindings_t *transport_bindings;
@@ -131,6 +132,14 @@ int aeron_receive_channel_endpoint_send_rttm(
int64_t reception_delta,
bool is_reply);
+int aeron_receive_channel_endpoint_send_response_setup(
+ aeron_receive_channel_endpoint_t *endpoint,
+ aeron_receive_destination_t *destination,
+ struct sockaddr_storage *addr,
+ int32_t stream_id,
+ int32_t session_id,
+ int32_t response_session_id);
+
void aeron_receive_channel_endpoint_dispatch(
aeron_udp_channel_data_paths_t *data_paths,
aeron_udp_channel_transport_t *transport,
@@ -176,6 +185,12 @@ int aeron_receive_channel_endpoint_incref_to_stream_and_session(
int aeron_receive_channel_endpoint_decref_to_stream_and_session(
aeron_receive_channel_endpoint_t *endpoint, int32_t stream_id, int32_t session_id);
+int aeron_receive_channel_endpoint_incref_to_response_stream(
+ aeron_receive_channel_endpoint_t *endpoint, int32_t stream_id);
+
+int aeron_receive_channel_endpoint_decref_to_response_stream(
+ aeron_receive_channel_endpoint_t *endpoint, int32_t stream_id);
+
int aeron_receive_channel_endpoint_on_add_subscription(
aeron_receive_channel_endpoint_t *endpoint, int32_t stream_id);
int aeron_receive_channel_endpoint_on_remove_subscription(
@@ -223,12 +238,16 @@ int aeron_receive_channel_endpoint_remove_poll_transports(
int aeron_receive_channel_endpoint_add_pending_setup(
aeron_receive_channel_endpoint_t *endpoint,
- aeron_driver_receiver_t *receiver);
+ aeron_driver_receiver_t *receiver,
+ int32_t session_id,
+ int32_t stream_id);
int aeron_receive_channel_endpoint_add_pending_setup_destination(
aeron_receive_channel_endpoint_t *endpoint,
aeron_driver_receiver_t *receiver,
- aeron_receive_destination_t *destination);
+ aeron_receive_destination_t *destination,
+ int32_t session_id,
+ int32_t stream_id);
inline void aeron_receive_channel_endpoint_on_remove_pending_setup(
aeron_receive_channel_endpoint_t *endpoint, int32_t session_id, int32_t stream_id)
diff --git a/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.c b/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.c
index cf5bdb73a9..fd88e5b9af 100644
--- a/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.c
+++ b/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.c
@@ -334,6 +334,19 @@ int aeron_send_channel_send(
return result;
}
+int aeron_send_channel_send_endpoint_address(
+ aeron_send_channel_endpoint_t *endpoint,
+ struct sockaddr_storage* endpoint_address,
+ struct iovec *iov,
+ size_t iov_length,
+ int64_t *bytes_sent)
+{
+ aeron_send_channel_apply_timestamps(endpoint, iov, iov_length);
+
+ return endpoint->data_paths->send_func(
+ endpoint->data_paths, &endpoint->transport, endpoint_address, iov, iov_length, bytes_sent);
+}
+
int aeron_send_channel_endpoint_add_publication(
aeron_send_channel_endpoint_t *endpoint, aeron_network_publication_t *publication)
{
@@ -371,6 +384,7 @@ void aeron_send_channel_endpoint_dispatch(
aeron_driver_sender_t *sender = (aeron_driver_sender_t *)sender_clientd;
aeron_frame_header_t *frame_header = (aeron_frame_header_t *)buffer;
aeron_send_channel_endpoint_t *endpoint = (aeron_send_channel_endpoint_t *)endpoint_clientd;
+ aeron_driver_conductor_proxy_t *conductor_proxy = sender->context->conductor_proxy;
switch (frame_header->type)
{
@@ -389,7 +403,7 @@ void aeron_send_channel_endpoint_dispatch(
case AERON_HDR_TYPE_SM:
if (length >= sizeof(aeron_status_message_header_t) && length >= (size_t)frame_header->frame_length)
{
- aeron_send_channel_endpoint_on_status_message(endpoint, buffer, length, addr);
+ aeron_send_channel_endpoint_on_status_message(endpoint, conductor_proxy, buffer, length, addr);
aeron_counter_ordered_increment(sender->status_messages_received_counter, 1);
}
else
@@ -398,6 +412,17 @@ void aeron_send_channel_endpoint_dispatch(
}
break;
+ case AERON_HDR_TYPE_RSP_SETUP:
+ if (length >= sizeof(aeron_response_setup_header_t))
+ {
+ aeron_send_channel_endpoint_on_response_setup(endpoint, conductor_proxy, buffer, length, addr);
+ }
+ else
+ {
+ aeron_counter_increment(sender->invalid_frames_counter, 1);
+ }
+ break;
+
case AERON_HDR_TYPE_RTTM:
if (length >= sizeof(aeron_rttm_header_t))
{
@@ -429,7 +454,11 @@ void aeron_send_channel_endpoint_on_nak(
}
void aeron_send_channel_endpoint_on_status_message(
- aeron_send_channel_endpoint_t *endpoint, uint8_t *buffer, size_t length, struct sockaddr_storage *addr)
+ aeron_send_channel_endpoint_t *endpoint,
+ aeron_driver_conductor_proxy_t *conductor_proxy,
+ uint8_t *buffer,
+ size_t length,
+ struct sockaddr_storage *addr)
{
aeron_status_message_header_t *sm_header = (aeron_status_message_header_t *)buffer;
int64_t key_value = aeron_map_compound_key(sm_header->stream_id, sm_header->session_id);
@@ -449,7 +478,7 @@ void aeron_send_channel_endpoint_on_status_message(
}
else
{
- aeron_network_publication_on_status_message(publication, buffer, length, addr);
+ aeron_network_publication_on_status_message(publication, conductor_proxy, buffer, length, addr);
}
endpoint->time_of_last_sm_ns = aeron_clock_cached_nano_time(endpoint->cached_clock);
@@ -470,6 +499,30 @@ void aeron_send_channel_endpoint_on_rttm(
}
}
+void aeron_send_channel_endpoint_on_response_setup(
+ aeron_send_channel_endpoint_t *endpoint,
+ aeron_driver_conductor_proxy_t *conductor_proxy,
+ uint8_t *buffer,
+ size_t length,
+ struct sockaddr_storage *addr)
+{
+ aeron_response_setup_header_t *rsp_setup_header = (aeron_response_setup_header_t *)buffer;
+ int64_t key_value = aeron_map_compound_key(rsp_setup_header->stream_id, rsp_setup_header->session_id);
+ aeron_network_publication_t *publication = aeron_int64_to_ptr_hash_map_get(
+ &endpoint->publication_dispatch_map, key_value);
+
+ if (NULL != publication)
+ {
+ const int64_t response_correlation_id = publication->response_correlation_id;
+ if (AERON_NULL_VALUE != response_correlation_id)
+ {
+ aeron_driver_conductor_proxy_on_response_setup(
+ conductor_proxy, response_correlation_id, rsp_setup_header->response_session_id);
+ }
+ }
+}
+
+
int aeron_send_channel_endpoint_check_for_re_resolution(
aeron_send_channel_endpoint_t *endpoint, int64_t now_ns, aeron_driver_conductor_proxy_t *conductor_proxy)
{
diff --git a/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.h b/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.h
index f2b0ba62a0..c220e5a4d4 100644
--- a/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.h
+++ b/aeron-driver/src/main/c/media/aeron_send_channel_endpoint.h
@@ -89,6 +89,13 @@ int aeron_send_channel_send(
size_t iov_length,
int64_t *bytes_sent);
+int aeron_send_channel_send_endpoint_address(
+ aeron_send_channel_endpoint_t *endpoint,
+ struct sockaddr_storage* endpoint_address,
+ struct iovec *iov,
+ size_t iov_length,
+ int64_t *bytes_sent);
+
int aeron_send_channel_endpoint_add_publication(
aeron_send_channel_endpoint_t *endpoint, aeron_network_publication_t *publication);
@@ -110,11 +117,22 @@ void aeron_send_channel_endpoint_on_nak(
aeron_send_channel_endpoint_t *endpoint, uint8_t *buffer, size_t length, struct sockaddr_storage *addr);
void aeron_send_channel_endpoint_on_status_message(
- aeron_send_channel_endpoint_t *endpoint, uint8_t *buffer, size_t length, struct sockaddr_storage *addr);
+ aeron_send_channel_endpoint_t *endpoint,
+ aeron_driver_conductor_proxy_t *conductor_proxy,
+ uint8_t *buffer,
+ size_t length,
+ struct sockaddr_storage *addr);
void aeron_send_channel_endpoint_on_rttm(
aeron_send_channel_endpoint_t *endpoint, uint8_t *buffer, size_t length, struct sockaddr_storage *addr);
+void aeron_send_channel_endpoint_on_response_setup(
+ aeron_send_channel_endpoint_t *endpoint,
+ aeron_driver_conductor_proxy_t *conductor_proxy,
+ uint8_t *buffer,
+ size_t length,
+ struct sockaddr_storage *addr);
+
int aeron_send_channel_endpoint_check_for_re_resolution(
aeron_send_channel_endpoint_t *endpoint, int64_t now_ns, aeron_driver_conductor_proxy_t *conductor_proxy);
diff --git a/aeron-driver/src/main/c/media/aeron_udp_channel.c b/aeron-driver/src/main/c/media/aeron_udp_channel.c
index e8fdfebfcd..0151f632c2 100644
--- a/aeron-driver/src/main/c/media/aeron_udp_channel.c
+++ b/aeron-driver/src/main/c/media/aeron_udp_channel.c
@@ -256,6 +256,10 @@ int aeron_udp_channel_parse(
{
_channel->control_mode = AERON_UDP_CHANNEL_CONTROL_MODE_DYNAMIC;
}
+ else if (strcmp(_channel->uri.params.udp.control_mode, AERON_UDP_CHANNEL_CONTROL_MODE_RESPONSE_VALUE) == 0)
+ {
+ _channel->control_mode = AERON_UDP_CHANNEL_CONTROL_MODE_RESPONSE;
+ }
}
if (AERON_UDP_CHANNEL_CONTROL_MODE_DYNAMIC == _channel->control_mode && NULL == _channel->uri.params.udp.control)
@@ -269,12 +273,13 @@ int aeron_udp_channel_parse(
NULL == _channel->uri.params.udp.control &&
NULL == _channel->uri.params.udp.channel_tag;
- if (has_no_distinguishing_characteristic && AERON_UDP_CHANNEL_CONTROL_MODE_MANUAL != _channel->control_mode)
+ if (has_no_distinguishing_characteristic && AERON_UDP_CHANNEL_CONTROL_MODE_MANUAL != _channel->control_mode &&
+ AERON_UDP_CHANNEL_CONTROL_MODE_RESPONSE != _channel->control_mode)
{
AERON_SET_ERR(
-AERON_ERROR_CODE_INVALID_CHANNEL,
"%s",
- "URIs for UDP must specify endpoint, control, tags, or control-mode=manual");
+ "URIs for UDP must specify endpoint, control, tags, or control-mode=manual/response");
goto error_cleanup;
}
diff --git a/aeron-driver/src/main/c/media/aeron_udp_channel.h b/aeron-driver/src/main/c/media/aeron_udp_channel.h
index bb67de74e9..06cf838a67 100644
--- a/aeron-driver/src/main/c/media/aeron_udp_channel.h
+++ b/aeron-driver/src/main/c/media/aeron_udp_channel.h
@@ -29,7 +29,8 @@ enum aeron_udp_channel_control_mode_en
{
AERON_UDP_CHANNEL_CONTROL_MODE_NONE,
AERON_UDP_CHANNEL_CONTROL_MODE_DYNAMIC,
- AERON_UDP_CHANNEL_CONTROL_MODE_MANUAL
+ AERON_UDP_CHANNEL_CONTROL_MODE_MANUAL,
+ AERON_UDP_CHANNEL_CONTROL_MODE_RESPONSE
};
typedef enum aeron_udp_channel_control_mode_en aeron_udp_channel_control_mode;
diff --git a/aeron-driver/src/main/c/uri/aeron_driver_uri.c b/aeron-driver/src/main/c/uri/aeron_driver_uri.c
index 95845d7ffe..be67eae047 100644
--- a/aeron-driver/src/main/c/uri/aeron_driver_uri.c
+++ b/aeron-driver/src/main/c/uri/aeron_driver_uri.c
@@ -174,6 +174,7 @@ int aeron_diver_uri_publication_params(
params->has_session_id = false;
params->session_id = 0;
params->entity_tag = AERON_URI_INVALID_TAG;
+ params->response_correlation_id = AERON_NULL_VALUE;
aeron_uri_params_t *uri_params = AERON_URI_IPC == uri->type ?
&uri->params.ipc.additional_params : &uri->params.udp.additional_params;
@@ -320,6 +321,12 @@ int aeron_diver_uri_publication_params(
return -1;
}
+ if (aeron_uri_get_int64(
+ uri_params, AERON_URI_RESPONSE_CORRELATION_ID_KEY, AERON_NULL_VALUE, ¶ms->response_correlation_id) < 0)
+ {
+ return -1;
+ }
+
return 0;
}
diff --git a/aeron-driver/src/main/c/uri/aeron_driver_uri.h b/aeron-driver/src/main/c/uri/aeron_driver_uri.h
index 6885e94007..1984dcbe21 100644
--- a/aeron-driver/src/main/c/uri/aeron_driver_uri.h
+++ b/aeron-driver/src/main/c/uri/aeron_driver_uri.h
@@ -38,6 +38,7 @@ typedef struct aeron_driver_uri_publication_params_stct
bool has_session_id;
int32_t session_id;
int64_t entity_tag;
+ int64_t response_correlation_id;
}
aeron_driver_uri_publication_params_t;
diff --git a/aeron-driver/src/main/java/io/aeron/driver/DataPacketDispatcher.java b/aeron-driver/src/main/java/io/aeron/driver/DataPacketDispatcher.java
index 4baa01e3dc..b007a5f008 100644
--- a/aeron-driver/src/main/java/io/aeron/driver/DataPacketDispatcher.java
+++ b/aeron-driver/src/main/java/io/aeron/driver/DataPacketDispatcher.java
@@ -388,7 +388,8 @@ public void onSetupMessage(
msg.termOffset(),
msg.termLength(),
msg.mtuLength(),
- msg.ttl());
+ msg.ttl(),
+ msg.flags());
}
else if (null != sessionInterest.image)
{
@@ -409,7 +410,8 @@ else if (streamInterest.isAllSessions || streamInterest.subscribedSessionIds.con
msg.termOffset(),
msg.termLength(),
msg.mtuLength(),
- msg.ttl());
+ msg.ttl(),
+ msg.flags());
}
else
{
@@ -493,7 +495,8 @@ private void createPublicationImage(
final int termOffset,
final int termLength,
final int mtuLength,
- final int setupTtl)
+ final int setupTtl,
+ final short flags)
{
final InetSocketAddress controlAddress = channelEndpoint.isMulticast(transportIndex) ?
channelEndpoint.udpChannel(transportIndex).remoteControl() : srcAddress;
@@ -512,6 +515,7 @@ private void createPublicationImage(
termLength,
mtuLength,
transportIndex,
+ flags,
controlAddress,
srcAddress,
channelEndpoint);
diff --git a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java
index bcdf367cc5..f3504f3a71 100644
--- a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java
+++ b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java
@@ -69,8 +69,7 @@
public final class DriverConductor implements Agent
{
private static final long CLOCK_UPDATE_INTERNAL_NS = TimeUnit.MILLISECONDS.toNanos(1);
- private static final String[] INVALID_DESTINATION_KEYS =
- {
+ private static final String[] INVALID_DESTINATION_KEYS = {
MTU_LENGTH_PARAM_NAME,
RECEIVER_WINDOW_LENGTH_PARAM_NAME,
SOCKET_RCVBUF_PARAM_NAME,
@@ -240,6 +239,7 @@ void onCreatePublicationImage(
final int termBufferLength,
final int senderMtuLength,
final int transportIndex,
+ final short flags,
final InetSocketAddress controlAddress,
final InetSocketAddress sourceAddress,
final ReceiveChannelEndpoint channelEndpoint)
@@ -307,6 +307,7 @@ void onCreatePublicationImage(
initialTermId,
activeTermId,
initialTermOffset,
+ flags,
rawLog,
treatAsMulticast ? ctx.multicastFeedbackDelayGenerator() : ctx.unicastFeedbackDelayGenerator(),
subscriberPositions,
@@ -468,15 +469,18 @@ void onAddNetworkPublication(
validateEndpointForPublication(udpChannel);
validateControlForPublication(udpChannel);
validateMtuForMaxMessage(params, channel);
+ validateResponseSubscription(params);
final SendChannelEndpoint channelEndpoint = getOrCreateSendChannelEndpoint(params, udpChannel, correlationId);
NetworkPublication publication = null;
if (!isExclusive)
{
- publication = findPublication(networkPublications, streamId, channelEndpoint);
+ publication = findPublication(networkPublications, streamId, channelEndpoint, params.responseCorrelationId);
}
+ final PublicationImage responsePublicationImage = findResponsePublicationImage(params);
+
boolean isNewPublication = false;
if (null == publication)
{
@@ -522,6 +526,110 @@ void onAddNetworkPublication(
{
linkSpies(subscriptionLinks, publication);
}
+
+ if (null != responsePublicationImage)
+ {
+ responsePublicationImage.responseSessionId(publication.sessionId());
+ }
+ }
+
+ private PublicationImage findResponsePublicationImage(final PublicationParams params)
+ {
+ if (!params.isResponse)
+ {
+ return null;
+ }
+
+ if (Aeron.NULL_VALUE == params.responseCorrelationId)
+ {
+ throw new IllegalArgumentException(
+ "control-mode=response was specified, but no response-correlation-id set");
+ }
+
+ for (final PublicationImage publicationImage : publicationImages)
+ {
+ if (publicationImage.correlationId() == params.responseCorrelationId)
+ {
+ if (publicationImage.hasSendResponseSetup())
+ {
+ return publicationImage;
+ }
+ else
+ {
+ throw new IllegalArgumentException(
+ "image.correlationId=" + params.responseCorrelationId + " did not request a response channel");
+ }
+ }
+ }
+
+ throw new IllegalArgumentException("image.correlationId=" + params.responseCorrelationId + " not found");
+ }
+
+ void responseSetup(final long responseCorrelationId, final int responseSessionId)
+ {
+ for (int i = 0, subscriptionLinksSize = subscriptionLinks.size(); i < subscriptionLinksSize; i++)
+ {
+ final SubscriptionLink subscriptionLink = subscriptionLinks.get(i);
+ if (subscriptionLink.registrationId() == responseCorrelationId &&
+ subscriptionLink instanceof NetworkSubscriptionLink &&
+ !subscriptionLink.hasSessionId())
+ {
+ final NetworkSubscriptionLink link = (NetworkSubscriptionLink)subscriptionLink;
+ final SubscriptionParams params = new SubscriptionParams();
+ params.hasSessionId = true;
+ params.sessionId = responseSessionId;
+ params.isSparse = link.isSparse();
+ params.isTether = link.isTether();
+ params.group = link.group();
+ params.isReliable = link.isReliable();
+ params.isRejoin = link.isRejoin();
+
+ final NetworkSubscriptionLink newSubscriptionLink = new NetworkSubscriptionLink(
+ subscriptionLink.registrationId(),
+ subscriptionLink.channelEndpoint(),
+ subscriptionLink.streamId(),
+ subscriptionLink.channel(),
+ subscriptionLink.aeronClient(),
+ params);
+
+ subscriptionLinks.set(i, newSubscriptionLink);
+ addNetworkSubscriptionToReceiver(newSubscriptionLink);
+ newSubscriptionLink.channelEndpoint().decResponseRefToStream(newSubscriptionLink.streamId);
+
+ break;
+ }
+ }
+ }
+
+ void responseConnected(final long responseCorrelationId)
+ {
+ for (final PublicationImage publicationImage : publicationImages)
+ {
+ if (publicationImage.correlationId() == responseCorrelationId)
+ {
+ if (publicationImage.hasSendResponseSetup())
+ {
+ publicationImage.responseSessionId(null);
+ }
+ }
+ }
+ }
+
+ private void validateResponseSubscription(final PublicationParams params)
+ {
+ if (!params.isResponse && Aeron.NULL_VALUE != params.responseCorrelationId)
+ {
+ for (final SubscriptionLink subscriptionLink : subscriptionLinks)
+ {
+ if (params.responseCorrelationId == subscriptionLink.registrationId())
+ {
+ return;
+ }
+ }
+
+ throw new IllegalArgumentException(
+ "unable to find response subscription for response-correlation-id=" + params.responseCorrelationId);
+ }
}
void cleanupSpies(final NetworkPublication publication)
@@ -587,6 +695,10 @@ void cleanupSubscriptionLink(final SubscriptionLink subscription)
channelEndpoint, subscription.streamId(), subscription.sessionId());
}
}
+ else if (subscription.isResponse())
+ {
+ channelEndpoint.decResponseRefToStream(subscription.streamId());
+ }
else
{
if (0 == channelEndpoint.decRefToStream(subscription.streamId()))
@@ -815,6 +927,7 @@ void onAddNetworkSubscription(
final String channel, final int streamId, final long registrationId, final long clientId)
{
final UdpChannel udpChannel = UdpChannel.parse(channel, nameResolver);
+ final ControlMode controlMode = udpChannel.controlMode();
validateControlForSubscription(udpChannel);
validateTimestampConfiguration(udpChannel);
@@ -830,23 +943,37 @@ void onAddNetworkSubscription(
subscriptionLinks.add(subscription);
- if (params.hasSessionId)
+ if (ControlMode.RESPONSE == controlMode)
{
- if (1 == channelEndpoint.incRefToStreamAndSession(streamId, params.sessionId))
+ channelEndpoint.incResponseRefToStream(subscription.streamId);
+ }
+ else
+ {
+ addNetworkSubscriptionToReceiver(subscription);
+ }
+
+ clientProxy.onSubscriptionReady(registrationId, channelEndpoint.statusIndicatorCounter().id());
+ linkMatchingImages(subscription);
+ }
+
+ private void addNetworkSubscriptionToReceiver(final NetworkSubscriptionLink subscription)
+ {
+ final ReceiveChannelEndpoint channelEndpoint = subscription.channelEndpoint();
+
+ if (subscription.hasSessionId())
+ {
+ if (1 == channelEndpoint.incRefToStreamAndSession(subscription.streamId(), subscription.sessionId()))
{
- receiverProxy.addSubscription(channelEndpoint, streamId, params.sessionId);
+ receiverProxy.addSubscription(channelEndpoint, subscription.streamId(), subscription.sessionId());
}
}
else
{
- if (1 == channelEndpoint.incRefToStream(streamId))
+ if (1 == channelEndpoint.incRefToStream(subscription.streamId()))
{
- receiverProxy.addSubscription(channelEndpoint, streamId);
+ receiverProxy.addSubscription(channelEndpoint, subscription.streamId());
}
}
-
- clientProxy.onSubscriptionReady(registrationId, channelEndpoint.statusIndicatorCounter().id());
- linkMatchingImages(subscription);
}
void onAddIpcSubscription(final String channel, final int streamId, final long registrationId, final long clientId)
@@ -1237,7 +1364,8 @@ private ArrayList
diff --git a/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java b/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java
index ed96a5bbbc..04afa41361 100644
--- a/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java
+++ b/aeron-driver/src/main/java/io/aeron/driver/media/UdpChannel.java
@@ -17,7 +17,6 @@
import io.aeron.Aeron;
import io.aeron.ChannelUri;
-import io.aeron.CommonContext;
import io.aeron.driver.DefaultNameResolver;
import io.aeron.driver.NameResolver;
import io.aeron.driver.exceptions.InvalidChannelException;
@@ -150,10 +149,10 @@ public static UdpChannel parse(
final String tagIdStr = channelUri.channelTag();
final ControlMode controlMode = parseControlMode(channelUri);
- final int socketRcvbufLength = parseBufferLength(channelUri, CommonContext.SOCKET_RCVBUF_PARAM_NAME);
- final int socketSndbufLength = parseBufferLength(channelUri, CommonContext.SOCKET_SNDBUF_PARAM_NAME);
+ final int socketRcvbufLength = parseBufferLength(channelUri, SOCKET_RCVBUF_PARAM_NAME);
+ final int socketSndbufLength = parseBufferLength(channelUri, SOCKET_SNDBUF_PARAM_NAME);
final int receiverWindowLength = parseBufferLength(
- channelUri, CommonContext.RECEIVER_WINDOW_LENGTH_PARAM_NAME);
+ channelUri, RECEIVER_WINDOW_LENGTH_PARAM_NAME);
final boolean requiresAdditionalSuffix = !isDestination &&
(null == endpointAddress && null == controlAddress ||
@@ -169,10 +168,11 @@ public static UdpChannel parse(
"explicit control expected with dynamic control mode: " + channelUriString);
}
- if (hasNoDistinguishingCharacteristic && ControlMode.MANUAL != controlMode)
+ if (hasNoDistinguishingCharacteristic && ControlMode.MANUAL != controlMode &&
+ ControlMode.RESPONSE != controlMode)
{
throw new IllegalArgumentException(
- "URIs for UDP must specify an endpoint, control, tags, or control-mode=manual: " +
+ "URIs for UDP must specify an endpoint, control, tags, or control-mode=manual/response: " +
channelUriString);
}
@@ -229,7 +229,7 @@ public static UdpChannel parse(
.protocolFamily(getProtocolFamily(endpointAddress.getAddress()))
.canonicalForm(canonicalise(null, resolvedAddress, null, endpointAddress));
- final String ttlValue = channelUri.get(CommonContext.TTL_PARAM_NAME);
+ final String ttlValue = channelUri.get(TTL_PARAM_NAME);
if (null != ttlValue)
{
context.hasMulticastTtl(true).multicastTtl(Integer.parseInt(ttlValue));
@@ -237,8 +237,8 @@ public static UdpChannel parse(
}
else if (null != controlAddress)
{
- final String controlVal = channelUri.get(CommonContext.MDC_CONTROL_PARAM_NAME);
- final String endpointVal = channelUri.get(CommonContext.ENDPOINT_PARAM_NAME);
+ final String controlVal = channelUri.get(MDC_CONTROL_PARAM_NAME);
+ final String endpointVal = channelUri.get(ENDPOINT_PARAM_NAME);
String suffix = "";
if (requiresAdditionalSuffix)
@@ -265,7 +265,7 @@ else if (null != controlAddress)
searchAddress.getAddress() :
resolveToAddressOfInterface(findInterface(searchAddress), searchAddress);
- final String endpointVal = channelUri.get(CommonContext.ENDPOINT_PARAM_NAME);
+ final String endpointVal = channelUri.get(ENDPOINT_PARAM_NAME);
String suffix = "";
if (requiresAdditionalSuffix)
{
@@ -388,6 +388,8 @@ public static ControlMode parseControlMode(final ChannelUri channelUri)
return ControlMode.DYNAMIC;
case MDC_CONTROL_MODE_MANUAL:
return ControlMode.MANUAL;
+ case CONTROL_MODE_RESPONSE:
+ return ControlMode.RESPONSE;
default:
return ControlMode.NONE;
}
@@ -615,6 +617,16 @@ public boolean isDynamicControlMode()
return ControlMode.DYNAMIC == controlMode;
}
+ /**
+ * Does the channel have response control mode specified.
+ *
+ * @return does the channel have response control mode specified.
+ */
+ public boolean isResponseControlMode()
+ {
+ return ControlMode.RESPONSE == controlMode;
+ }
+
/**
* Does the channel have an explicit endpoint address?
*
@@ -824,9 +836,9 @@ public static InetSocketAddress destinationAddress(final ChannelUri uri, final N
{
validateConfiguration(uri);
- final String endpointValue = uri.get(CommonContext.ENDPOINT_PARAM_NAME);
+ final String endpointValue = uri.get(ENDPOINT_PARAM_NAME);
return SocketAddressParser.parse(
- endpointValue, CommonContext.ENDPOINT_PARAM_NAME, false, nameResolver);
+ endpointValue, ENDPOINT_PARAM_NAME, false, nameResolver);
}
catch (final Exception ex)
{
@@ -912,7 +924,7 @@ private static InetSocketAddress getMulticastControlAddress(final InetSocketAddr
private static InterfaceSearchAddress getInterfaceSearchAddress(final ChannelUri uri) throws UnknownHostException
{
- final String interfaceValue = uri.get(CommonContext.INTERFACE_PARAM_NAME);
+ final String interfaceValue = uri.get(INTERFACE_PARAM_NAME);
if (null != interfaceValue)
{
return InterfaceSearchAddress.parse(interfaceValue);
@@ -925,16 +937,16 @@ private static InetSocketAddress getEndpointAddress(final ChannelUri uri, final
throws UnknownHostException
{
InetSocketAddress address = null;
- final String endpointValue = uri.get(CommonContext.ENDPOINT_PARAM_NAME);
+ final String endpointValue = uri.get(ENDPOINT_PARAM_NAME);
if (null != endpointValue)
{
address = SocketAddressParser.parse(
- endpointValue, CommonContext.ENDPOINT_PARAM_NAME, false, nameResolver);
+ endpointValue, ENDPOINT_PARAM_NAME, false, nameResolver);
if (address.isUnresolved())
{
throw new UnknownHostException(
- "unresolved - " + CommonContext.ENDPOINT_PARAM_NAME + "=" + endpointValue +
+ "unresolved - " + ENDPOINT_PARAM_NAME + "=" + endpointValue +
", name-resolver=" + nameResolver.getClass().getName());
}
}
@@ -946,16 +958,16 @@ private static InetSocketAddress getExplicitControlAddress(final ChannelUri uri,
throws UnknownHostException
{
InetSocketAddress address = null;
- final String controlValue = uri.get(CommonContext.MDC_CONTROL_PARAM_NAME);
+ final String controlValue = uri.get(MDC_CONTROL_PARAM_NAME);
if (null != controlValue)
{
address = SocketAddressParser.parse(
- controlValue, CommonContext.MDC_CONTROL_PARAM_NAME, false, nameResolver);
+ controlValue, MDC_CONTROL_PARAM_NAME, false, nameResolver);
if (address.isUnresolved())
{
throw new UnknownHostException(
- "unresolved - " + CommonContext.MDC_CONTROL_PARAM_NAME + "=" + controlValue +
+ "unresolved - " + MDC_CONTROL_PARAM_NAME + "=" + controlValue +
", name-resolver=" + nameResolver.getClass().getName());
}
}
diff --git a/aeron-driver/src/test/c/aeron_driver_conductor_ipc_test.cpp b/aeron-driver/src/test/c/aeron_driver_conductor_ipc_test.cpp
index cc2428d96b..38f73a2a35 100644
--- a/aeron-driver/src/test/c/aeron_driver_conductor_ipc_test.cpp
+++ b/aeron-driver/src/test/c/aeron_driver_conductor_ipc_test.cpp
@@ -60,11 +60,6 @@ TEST_F(DriverConductorIpcTest, shouldBeAbleToAddSingleIpcSubscriptionThenAddSing
readAllBroadcastsFromConductor(mock_broadcast_handler);
}
-TEST_F(DriverConductorIpcTest, foo)
-{
- std::cout << sizeof(aeron_tetherable_position_t) << std::endl;
-}
-
// TODO: Parameterise
TEST_F(DriverConductorIpcTest, shouldBeAbleToAddSingleIpcPublicationThenAddSingleIpcSubscription)
{
diff --git a/aeron-driver/src/test/c/aeron_driver_uri_test.cpp b/aeron-driver/src/test/c/aeron_driver_uri_test.cpp
index b88a31bfef..c5f2a3ca6c 100644
--- a/aeron-driver/src/test/c/aeron_driver_uri_test.cpp
+++ b/aeron-driver/src/test/c/aeron_driver_uri_test.cpp
@@ -319,6 +319,15 @@ TEST_F(DriverUriTest, shouldDefaultMediaReceiveTimestampOffsetToAeronNullValue)
EXPECT_EQ(AERON_NULL_VALUE, offset);
}
+TEST_F(DriverUriTest, shouldParseAndDefaultResponseCorrelationId)
+{
+ aeron_driver_uri_publication_params_t params;
+
+ EXPECT_EQ(AERON_URI_PARSE("aeron:udp?endpoint=224.10.9.8", &m_uri), 0);
+ EXPECT_EQ(aeron_diver_uri_publication_params(&m_uri, ¶ms, &m_conductor, false), 0);
+ EXPECT_EQ(INT64_C(-1), params.response_correlation_id);
+}
+
class UriResolverTest : public testing::Test
{
public:
diff --git a/aeron-driver/src/test/c/aeron_network_publication_test.cpp b/aeron-driver/src/test/c/aeron_network_publication_test.cpp
index d1c395dc1b..1117343414 100644
--- a/aeron-driver/src/test/c/aeron_network_publication_test.cpp
+++ b/aeron-driver/src/test/c/aeron_network_publication_test.cpp
@@ -228,6 +228,11 @@ TEST_F(NetworkPublicationTest, shouldSendHeartbeatWhileSendingPeriodicSetups)
{
int64_t time_ns = 0;
+ aeron_driver_conductor_t conductor = {};
+ aeron_driver_conductor_proxy_t proxy = {};
+ proxy.conductor = &conductor;
+ proxy.threading_mode = AERON_THREADING_MODE_INVOKER;
+
aeron_network_publication_t *publication = createPublication("aeron:udp?endpoint=localhost:23245");
ASSERT_NE(nullptr, publication) << aeron_errmsg();
@@ -238,7 +243,7 @@ TEST_F(NetworkPublicationTest, shouldSendHeartbeatWhileSendingPeriodicSetups)
sockaddr_storage sockaddr = {};
aeron_network_publication_on_status_message(
- publication, data_buffer.data(), sizeof(aeron_status_message_header_t), &sockaddr);
+ publication, &proxy, data_buffer.data(), sizeof(aeron_status_message_header_t), &sockaddr);
aeron_network_publication_send(publication, time_ns);
ASSERT_TRUE(publication->has_receivers);
diff --git a/aeron-driver/src/test/c/aeron_receiver_test.h b/aeron-driver/src/test/c/aeron_receiver_test.h
index 6da4275e41..82da5a8d07 100644
--- a/aeron-driver/src/test/c/aeron_receiver_test.h
+++ b/aeron-driver/src/test/c/aeron_receiver_test.h
@@ -226,7 +226,7 @@ class ReceiverTestBase : public testing::Test
&image, endpoint, destination, m_context, correlation_id, session_id, stream_id, 0, 0, 0,
&hwm_position, &pos_position, congestion_control_strategy,
&channel->remote_control, &channel->local_data,
- TERM_BUFFER_SIZE, MTU, nullptr, true, true, false, &m_system_counters) < 0)
+ TERM_BUFFER_SIZE, MTU, UINT8_C(0), nullptr, true, true, false, &m_system_counters) < 0)
{
congestion_control_strategy->fini(congestion_control_strategy);
return nullptr;
diff --git a/aeron-driver/src/test/java/io/aeron/driver/DataPacketDispatcherTest.java b/aeron-driver/src/test/java/io/aeron/driver/DataPacketDispatcherTest.java
index cc821937b9..a496c59b0d 100644
--- a/aeron-driver/src/test/java/io/aeron/driver/DataPacketDispatcherTest.java
+++ b/aeron-driver/src/test/java/io/aeron/driver/DataPacketDispatcherTest.java
@@ -119,7 +119,7 @@ void shouldRequestCreateImageUponReceivingSetup()
verify(mockConductorProxy).createPublicationImage(
SESSION_ID, STREAM_ID, INITIAL_TERM_ID, ACTIVE_TERM_ID, TERM_OFFSET, TERM_LENGTH,
- MTU_LENGTH, 0, SRC_ADDRESS, SRC_ADDRESS, mockChannelEndpoint);
+ MTU_LENGTH, 0, (short)0, SRC_ADDRESS, SRC_ADDRESS, mockChannelEndpoint);
}
@Test
@@ -132,7 +132,7 @@ void shouldOnlyRequestCreateImageOnceUponReceivingSetup()
verify(mockConductorProxy).createPublicationImage(
SESSION_ID, STREAM_ID, INITIAL_TERM_ID, ACTIVE_TERM_ID, TERM_OFFSET, TERM_LENGTH,
- MTU_LENGTH, 0, SRC_ADDRESS, SRC_ADDRESS, mockChannelEndpoint);
+ MTU_LENGTH, 0, (short)0, SRC_ADDRESS, SRC_ADDRESS, mockChannelEndpoint);
}
@Test
@@ -196,7 +196,7 @@ void shouldNotIgnoreDataAndSetupAfterImageRemovedAndCoolDownRemoved()
.addPendingSetupMessage(SESSION_ID, STREAM_ID, 0, mockChannelEndpoint, false, SRC_ADDRESS);
inOrder.verify(mockConductorProxy).createPublicationImage(
SESSION_ID, STREAM_ID, INITIAL_TERM_ID, ACTIVE_TERM_ID, TERM_OFFSET, TERM_LENGTH,
- MTU_LENGTH, 0, SRC_ADDRESS, SRC_ADDRESS, mockChannelEndpoint);
+ MTU_LENGTH, 0, (short)0, SRC_ADDRESS, SRC_ADDRESS, mockChannelEndpoint);
}
@Test
diff --git a/aeron-driver/src/test/java/io/aeron/driver/DriverConductorTest.java b/aeron-driver/src/test/java/io/aeron/driver/DriverConductorTest.java
index 90e129e28c..ae70cc7093 100644
--- a/aeron-driver/src/test/java/io/aeron/driver/DriverConductorTest.java
+++ b/aeron-driver/src/test/java/io/aeron/driver/DriverConductorTest.java
@@ -116,7 +116,7 @@ class DriverConductorTest
private final SenderProxy senderProxy = mock(SenderProxy.class);
private final ReceiverProxy receiverProxy = mock(ReceiverProxy.class);
- private final DriverConductorProxy driverConductorProxy = mock(DriverConductorProxy.class);
+ private final DriverConductorProxy mockDriverConductorProxy = mock(DriverConductorProxy.class);
private ReceiveChannelEndpoint receiveChannelEndpoint = null;
private final CachedNanoClock nanoClock = new CachedNanoClock();
@@ -185,7 +185,7 @@ void before()
.systemCounters(spySystemCounters)
.receiverProxy(receiverProxy)
.senderProxy(senderProxy)
- .driverConductorProxy(driverConductorProxy)
+ .driverConductorProxy(mockDriverConductorProxy)
.receiveChannelEndpointThreadLocals(new ReceiveChannelEndpointThreadLocals())
.conductorCycleThresholdNs(600_000_000)
.nameResolver(DefaultNameResolver.INSTANCE)
@@ -594,7 +594,7 @@ void shouldTimeoutPublicationWithNoKeepaliveButNotDrained()
when(msg.consumptionTermOffset()).thenReturn(0);
when(msg.receiverWindowLength()).thenReturn(10);
- publication.onStatusMessage(msg, new InetSocketAddress("localhost", 4059));
+ publication.onStatusMessage(msg, new InetSocketAddress("localhost", 4059), mockDriverConductorProxy);
appendUnfragmentedMessage(
rawLog, index, 0, termId, headerWriter, srcBuffer, 0, 256);
@@ -604,7 +604,7 @@ void shouldTimeoutPublicationWithNoKeepaliveButNotDrained()
() -> nanoClock.nanoTime() >= CLIENT_LIVENESS_TIMEOUT_NS * 1.25,
(timeNs) ->
{
- publication.onStatusMessage(msg, new InetSocketAddress("localhost", 4059));
+ publication.onStatusMessage(msg, new InetSocketAddress("localhost", 4059), mockDriverConductorProxy);
publication.updateHasReceivers(timeNs);
});
@@ -689,11 +689,11 @@ void shouldCreateImageOnSubscription()
verify(receiverProxy).registerReceiveChannelEndpoint(captor1.capture());
receiveChannelEndpoint = captor1.getValue();
- receiveChannelEndpoint.openChannel(driverConductorProxy);
+ receiveChannelEndpoint.openChannel(mockDriverConductorProxy);
driverConductor.onCreatePublicationImage(
SESSION_ID, STREAM_ID_1, initialTermId, activeTermId, termOffset, TERM_BUFFER_LENGTH, MTU_LENGTH, 0,
- mock(InetSocketAddress.class), sourceAddress, receiveChannelEndpoint);
+ (short)0, mock(InetSocketAddress.class), sourceAddress, receiveChannelEndpoint);
final ArgumentCaptorrequestEndpoint
if specified.
+ * @param responseChannel channel fragment to allow for configuration parameters on the response subscription.
+ * May be null. The 'control' parameter is not required and will be removed if specified.
+ */
+ public ResponseClient(
+ final Aeron aeron,
+ final FragmentHandler handler,
+ final String requestEndpoint,
+ final int requestStreamId,
+ final String responseControl,
+ final int responseStreamId,
+ final String requestChannel,
+ final String responseChannel)
+ {
+ this.aeron = aeron;
+ this.handler = handler;
+ this.requestEndpoint = requestEndpoint;
+ this.requestStreamId = requestStreamId;
+ this.responseControl = responseControl;
+ this.responseStreamId = responseStreamId;
+
+ requestUriBuilder = null != requestChannel ?
+ new ChannelUriStringBuilder(requestChannel) : new ChannelUriStringBuilder();
+ requestUriBuilder
+ .media("udp")
+ .endpoint(requestEndpoint);
+ responseUriBuilder = null != responseChannel ?
+ new ChannelUriStringBuilder(responseChannel) : new ChannelUriStringBuilder();
+ responseUriBuilder
+ .media("udp")
+ .controlMode("response")
+ .controlEndpoint(responseControl);
+ }
+
+ /**
+ * Overload for {@link ResponseServer#ResponseServer(Aeron, Function, String, int, String, int, String, String)}
+ * that defaults the channels to null.
+ *
+ * @param aeron client to use to connect to the server.
+ * @param handler callback to handle response messages.
+ * @param requestEndpoint request publication's endpoint.
+ * @param requestStreamId request publication streamId
+ * @param responseControl control address for the response subscription.
+ * @param responseStreamId response response streamId
+ */
+ public ResponseClient(
+ final Aeron aeron,
+ final FragmentHandler handler,
+ final String requestEndpoint,
+ final int requestStreamId,
+ final String responseControl,
+ final int responseStreamId)
+ {
+ this(aeron, handler, requestEndpoint, requestStreamId, responseControl, responseStreamId, null, null);
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public int doWork()
+ {
+ int workCount = 0;
+
+ if (null == subscription)
+ {
+ subscription = aeron.addSubscription(responseUriBuilder.build(), responseStreamId);
+ }
+
+ if (null == publication && null != subscription)
+ {
+ publication = aeron.addPublication(
+ requestUriBuilder.responseCorrelationId(subscription.registrationId()).build(),
+ requestStreamId);
+
+ workCount++;
+ }
+
+ if (null != subscription)
+ {
+ workCount += subscription.poll(handler, 10);
+ }
+
+ return workCount;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String roleName()
+ {
+ return "ResponseClient";
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void close()
+ {
+
+ }
+
+ /**
+ * Check that the client is connected.
+ *
+ * @return true if the client is connected.
+ */
+ public boolean isConnected()
+ {
+ return null != subscription && subscription.isConnected() && null != publication && publication.isConnected();
+ }
+
+ /**
+ * Offer a message on the request channel.
+ *
+ * @param message to be sent
+ * @return result code from the publication.
+ */
+ public long offer(final DirectBuffer message)
+ {
+ return publication.offer(message);
+ }
+
+ /**
+ * Get the response control for the server.
+ *
+ * @return response control for the server.
+ */
+ public String responseControl()
+ {
+ return responseControl;
+ }
+
+ /**
+ * Get the request endpoint for the server.
+ *
+ * @return request endpoint for the server.
+ */
+ public String requestEndpoint()
+ {
+ return requestEndpoint;
+ }
+
+ /**
+ * Get the response subscription for the client.
+ *
+ * @return response subscription for the client.
+ */
+ public Subscription subscription()
+ {
+ return subscription;
+ }
+
+ /**
+ * Get the request publication for the client.
+ *
+ * @return request publication for the client.
+ */
+ public Publication publication()
+ {
+ return publication;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String toString()
+ {
+ return "ResponseClient{" +
+ "publication.isConnected=" + publication.isConnected() +
+ ", subscription.isConnected=" + subscription.isConnected() +
+ '}';
+ }
+}
diff --git a/aeron-samples/src/main/java/io/aeron/response/ResponseServer.java b/aeron-samples/src/main/java/io/aeron/response/ResponseServer.java
new file mode 100644
index 0000000000..0549798929
--- /dev/null
+++ b/aeron-samples/src/main/java/io/aeron/response/ResponseServer.java
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2014-2023 Real Logic Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.aeron.response;
+
+import io.aeron.Aeron;
+import io.aeron.ChannelUriStringBuilder;
+import io.aeron.Image;
+import io.aeron.Publication;
+import io.aeron.Subscription;
+import io.aeron.logbuffer.Header;
+import org.agrona.CloseHelper;
+import org.agrona.DirectBuffer;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.agrona.concurrent.Agent;
+import org.agrona.concurrent.OneToOneConcurrentArrayQueue;
+
+import java.util.Objects;
+import java.util.function.Function;
+
+/**
+ * A basic sample response server that allows the users to specify a simple function to represent the handling of a
+ * request and then return a response. This approach will be effective when request processing is very short. For
+ * certain types of response servers, e.g. returning a large volume of data from a database, this pattern will be
+ * ineffective. For those types of use cases, something more complex, likely involving thread pooling would be required.
+ */
+public class ResponseServer implements AutoCloseable, Agent
+{
+ private final Aeron aeron;
+ private final Long2ObjectHashMap