Skip to content

Commit

Permalink
Response Channels (#1534)
Browse files Browse the repository at this point in the history
* [Java] Initial work on response channels.

* [Java] Start adding more client based operations to the ClientUriStringBuilder and add initial parts of a system test.

* [Java] Partial implementation of handling multiple publications on the same endpoint.

* [Java] Use new ControlMode.RESPONSE to indicate response channels.

* [Java] Add test to cover multiple terms with response channels.

* [Java] Small tidy-ups and TODO comment.

* [Java] First basic implementation of connecting response channels.

* [Java] Stop sending RSP_SETUP message once response channel it connected.

* [Java] Ensure that new subscription is only added once.

* [Java] Remove println

* [Java] Updating ResponseClient/Server.

* [Java] Test showing that messages will come back to individual sessions.

* [Java] Add additional validation for response channel settings and rewrite some tests.

* [Java] Use the response correlation id as part of the matching criteria when finding existing publications.

* [Java] Check that the subscription exists when setting the response-correlation-id on the request publication.

* [Java] Remove TODO.

* [Java] Fix CodeQL issues.

* [Java] Disable ResponseChannelsTest for C/C++ build.

* [Java] Test to show that multiple publications to the same response channel can be created.

* [Java] Update error message.

* [Java] Change RSP_SETUP message type to allow for ATS messages. Add ATS constants to Java definitions for completeness.

* [C] Start adding C implementation for response channels. Add response setup message and constants.

* [C] More progress toward response channels.

* [C] Additional error reporting. Only set SEND_RESPONSE_FLAG if the publication is not for a response channel.

* [C] Allow defaulting when parsing 64 bit uri parameters.  Add additional validation to verify response-correlation-id uri parameters.

* [Java] Fix checkstyle.

* [Java] Add test to ensure that pending response subscription don't become connected before they should. Make sure that response SubscriptionLinks that having been linked to a publication don't match.

* [C] Make sure that response SubscriptionLinks that haven't been linked to a publication don't match.

* [Java] Start implementing dissector for RSP_SETUP frames.

* [Java] Add dissector for RSP_SETUP frame.

* [C] Add dissector for RSP_SETUP frame.

* [Java] Fix documentation.

* [Java] Remove double read of volatile and reverse elements of condition.

* [C] Remove debug logging.

* [C] Remove test.

* [Java] Add reference tracking on the ReceiveChannelEndpoint for unconnected response channels to prevent endpoints being closed at the wrong time.

* [Java] Reverse order of adding connected response stream and decrementing placeholder for the response stream.

* [C] Reverse order of adding connected response stream and decrementing placeholder for the response stream.

* [C] Resolve publication image before getting the network publication so failure doesn't result in leaking resources.

* [C] Delete udp_channel on failure.

* [Java] Close session when removing in ResponseServer example.
  • Loading branch information
mikeb01 authored Jan 15, 2024
1 parent 341d57c commit faa2854
Show file tree
Hide file tree
Showing 62 changed files with 2,691 additions and 176 deletions.
22 changes: 22 additions & 0 deletions aeron-agent/src/main/java/io/aeron/agent/DriverEventDissector.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ final class DriverEventDissector
private static final RttMeasurementFlyweight RTT_MEASUREMENT = new RttMeasurementFlyweight();
private static final HeaderFlyweight HEADER = new HeaderFlyweight();
private static final ResolutionEntryFlyweight RESOLUTION = new ResolutionEntryFlyweight();
private static final ResponseSetupFlyweight RSP_SETUP = new ResponseSetupFlyweight();
private static final PublicationMessageFlyweight PUB_MSG = new PublicationMessageFlyweight();
private static final SubscriptionMessageFlyweight SUB_MSG = new SubscriptionMessageFlyweight();
private static final PublicationBuffersReadyFlyweight PUB_READY = new PublicationBuffersReadyFlyweight();
Expand Down Expand Up @@ -109,6 +110,11 @@ static void dissectFrame(
dissectResFrame(buffer, frameOffset, builder);
break;

case HeaderFlyweight.HDR_TYPE_RSP_SETUP:
RSP_SETUP.wrap(buffer, frameOffset, buffer.capacity() - frameOffset);
dissectRspSetupFrame(builder);
break;

default:
builder.append("FRAME_UNKNOWN: ").append(frameType);
break;
Expand Down Expand Up @@ -540,6 +546,22 @@ private static void dissectResFrame(
}
}

private static void dissectRspSetupFrame(final StringBuilder builder)
{
builder.append("RSP_SETUP ");
HeaderFlyweight.appendFlagsAsChars(RSP_SETUP.flags(), builder);

builder
.append(" len ")
.append(RSP_SETUP.frameLength())
.append(' ')
.append(RSP_SETUP.sessionId())
.append(':')
.append(RSP_SETUP.streamId())
.append(" RSP_SESSION_ID ")
.append(RSP_SETUP.responseSessionId());
}

private static void dissectResEntry(final StringBuilder builder)
{
builder
Expand Down
12 changes: 12 additions & 0 deletions aeron-client/src/main/c/protocol/aeron_udp_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ typedef struct aeron_option_header_stct
uint16_t type;
}
aeron_option_header_t;

typedef struct aeron_response_setup_header_stct
{
aeron_frame_header_t frame_header;
int32_t session_id;
int32_t stream_id;
int32_t response_session_id;
}
aeron_response_setup_header_t;
#pragma pack(pop)

int aeron_udp_protocol_group_tag(aeron_status_message_header_t *sm, int64_t *group_tag);
Expand All @@ -151,6 +160,7 @@ int aeron_udp_protocol_group_tag(aeron_status_message_header_t *sm, int64_t *gro
#define AERON_HDR_TYPE_ATS_DATA (0x08)
#define AERON_HDR_TYPE_ATS_SETUP (0x09)
#define AERON_HDR_TYPE_ATS_SM (0x0A)
#define AERON_HDR_TYPE_RSP_SETUP (0x0B)
#define AERON_HDR_TYPE_EXT (0xFFFF)

#define AERON_DATA_HEADER_LENGTH (sizeof(aeron_data_header_t))
Expand All @@ -166,6 +176,8 @@ int aeron_udp_protocol_group_tag(aeron_status_message_header_t *sm, int64_t *gro
#define AERON_STATUS_MESSAGE_HEADER_SEND_SETUP_FLAG (UINT8_C(0x80))
#define AERON_STATUS_MESSAGE_HEADER_EOS_FLAG (UINT8_C(0x40))

#define AERON_SETUP_HEADER_SEND_RESPONSE_FLAG (UINT8_C(0x80))

#define AERON_RTTM_HEADER_REPLY_FLAG (UINT8_C(0x80))

#define AERON_RES_HEADER_TYPE_NAME_TO_IP4_MD (0x01)
Expand Down
4 changes: 2 additions & 2 deletions aeron-client/src/main/c/uri/aeron_uri.c
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,12 @@ int aeron_uri_get_int32(aeron_uri_params_t *uri_params, const char *key, int32_t
return 1;
}

int aeron_uri_get_int64(aeron_uri_params_t *uri_params, const char *key, int64_t *retval)
int aeron_uri_get_int64(aeron_uri_params_t *uri_params, const char *key, int64_t default_val, int64_t *retval)
{
const char *value_str;
if ((value_str = aeron_uri_find_param_value(uri_params, key)) == NULL)
{
*retval = 0;
*retval = default_val;
return 0;
}

Expand Down
4 changes: 3 additions & 1 deletion aeron-client/src/main/c/uri/aeron_uri.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ aeron_uri_params_t;
#define AERON_UDP_CHANNEL_CONTROL_MODE_KEY "control-mode"
#define AERON_UDP_CHANNEL_CONTROL_MODE_MANUAL_VALUE "manual"
#define AERON_UDP_CHANNEL_CONTROL_MODE_DYNAMIC_VALUE "dynamic"
#define AERON_UDP_CHANNEL_CONTROL_MODE_RESPONSE_VALUE "response"

#define AERON_URI_INITIAL_TERM_ID_KEY "init-term-id"
#define AERON_URI_TERM_ID_KEY "term-id"
Expand Down Expand Up @@ -70,6 +71,7 @@ aeron_uri_params_t;
#define AERON_URI_CHANNEL_RCV_TIMESTAMP_OFFSET_KEY "channel-rcv-ts-offset"
#define AERON_URI_CHANNEL_SND_TIMESTAMP_OFFSET_KEY "channel-snd-ts-offset"
#define AERON_URI_TIMESTAMP_OFFSET_RESERVED "reserved"
#define AERON_URI_RESPONSE_CORRELATION_ID_KEY "response-correlation-id"
#define AERON_URI_INVALID_TAG (-1)

typedef struct aeron_udp_channel_params_stct
Expand Down Expand Up @@ -136,7 +138,7 @@ uint8_t aeron_uri_multicast_ttl(aeron_uri_t *uri);

const char *aeron_uri_find_param_value(const aeron_uri_params_t *uri_params, const char *key);
int aeron_uri_get_int32(aeron_uri_params_t *uri_params, const char *key, int32_t *retval);
int aeron_uri_get_int64(aeron_uri_params_t *uri_params, const char *key, int64_t *retval);
int aeron_uri_get_int64(aeron_uri_params_t *uri_params, const char *key, int64_t default_val, int64_t *retval);
int aeron_uri_get_bool(aeron_uri_params_t *uri_params, const char *key, bool *retval);
int aeron_uri_get_ats(aeron_uri_params_t *uri_params, aeron_uri_ats_status_t *uri_ats_status);
int aeron_uri_sprint(aeron_uri_t *uri, char *buffer, size_t buffer_len);
Expand Down
93 changes: 90 additions & 3 deletions aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public final class ChannelUriStringBuilder
private String mediaReceiveTimestampOffset;
private String channelReceiveTimestampOffset;
private String channelSendTimestampOffset;
private String responseEndpoint;
private Long responseCorrelationId;
private Boolean isResponseChannel;
private Long responseSubscriptionId;

/**
* Default constructor
Expand Down Expand Up @@ -134,6 +138,8 @@ public ChannelUriStringBuilder(final ChannelUri channelUri)
mediaReceiveTimestampOffset(channelUri);
channelReceiveTimestampOffset(channelUri);
channelSendTimestampOffset(channelUri);
responseEndpoint(channelUri);
responseCorrelationId(channelUri);
}

/**
Expand Down Expand Up @@ -175,6 +181,10 @@ public ChannelUriStringBuilder clear()
mediaReceiveTimestampOffset = null;
channelReceiveTimestampOffset = null;
channelSendTimestampOffset = null;
responseEndpoint = null;
responseCorrelationId = null;
isResponseChannel = null;
responseSubscriptionId = null;

return this;
}
Expand Down Expand Up @@ -428,8 +438,9 @@ public String controlEndpoint()
public ChannelUriStringBuilder controlMode(final String controlMode)
{
if (null != controlMode &&
!controlMode.equals(CommonContext.MDC_CONTROL_MODE_MANUAL) &&
!controlMode.equals(CommonContext.MDC_CONTROL_MODE_DYNAMIC))
!controlMode.equals(MDC_CONTROL_MODE_MANUAL) &&
!controlMode.equals(MDC_CONTROL_MODE_DYNAMIC) &&
!controlMode.equals(CONTROL_MODE_RESPONSE))
{
throw new IllegalArgumentException("invalid control mode: " + controlMode);
}
Expand Down Expand Up @@ -1891,6 +1902,80 @@ public String channelSendTimestampOffset()
return channelSendTimestampOffset;
}

/**
* Set the response endpoint to be used for a response channel subscription or publication.
*
* @param responseEndpoint response endpoint to be used in this channel URI.
* @return this for a fluent API.
* @see CommonContext#RESPONSE_ENDPOINT_PARAM_NAME
*/
public ChannelUriStringBuilder responseEndpoint(final String responseEndpoint)
{
this.responseEndpoint = responseEndpoint;
return this;
}

/**
* Set the response endpoint to be used for a response channel subscription or publication by extracting it from the
* ChannelUri.
*
* @param channelUri the existing URI to extract the responseEndpoint from.
* @return this for a fluent API.
*/
public ChannelUriStringBuilder responseEndpoint(final ChannelUri channelUri)
{
return responseEndpoint(channelUri.get(RESPONSE_ENDPOINT_PARAM_NAME));
}

/**
* The response endpoint to be used for a response channel subscription or publication.
*
* @return response endpoint.
*/
public String responseEndpoint()
{
return this.responseEndpoint;
}

/**
* Set the correlation id from the image received on the response "server's" subscription to be used by a response
* publication.
*
* @param responseCorrelationId correlation id of an image from the response "server's" subscription.
* @return this for a fluent API.
*/
public ChannelUriStringBuilder responseCorrelationId(final Long responseCorrelationId)
{
this.responseCorrelationId = responseCorrelationId;
return this;
}

/**
* Set the correlation id from the image received on the response "server's" subscription to be used by a response
* publication extracted from the channelUri.
*
* @param channelUri the existing URI to extract the responseCorrelationId from.
* @return this for a fluent API.
*/
public ChannelUriStringBuilder responseCorrelationId(final ChannelUri channelUri)
{
final String responseCorrelationIdString = channelUri.get(RESPONSE_CORRELATION_ID_PARAM_NAME);

if (null != responseCorrelationIdString)
{
try
{
responseCorrelationId(Long.valueOf(responseCorrelationIdString));
}
catch (final NumberFormatException ex)
{
throw new IllegalArgumentException("'response-correlation-id' must be a valid long value", ex);
}
}

return this;
}

/**
* Offset into a message to store the channel send timestamp. May also be the special value 'reserved' which means
* to store the timestamp in the reserved value field.
Expand Down Expand Up @@ -1936,7 +2021,7 @@ public ChannelUriStringBuilder channelSendTimestampOffset(final ChannelUri chann
*
* @return a channel URI String for the given parameters.
*/
@SuppressWarnings("MethodLength")
@SuppressWarnings({ "MethodLength", "DuplicatedCode" })
public String build()
{
sb.setLength(0);
Expand Down Expand Up @@ -1983,6 +2068,8 @@ public String build()
appendParameter(sb, MEDIA_RCV_TIMESTAMP_OFFSET_PARAM_NAME, mediaReceiveTimestampOffset);
appendParameter(sb, CHANNEL_RECEIVE_TIMESTAMP_OFFSET_PARAM_NAME, channelReceiveTimestampOffset);
appendParameter(sb, CHANNEL_SEND_TIMESTAMP_OFFSET_PARAM_NAME, channelSendTimestampOffset);
appendParameter(sb, RESPONSE_ENDPOINT_PARAM_NAME, responseEndpoint);
appendParameter(sb, RESPONSE_CORRELATION_ID_PARAM_NAME, responseCorrelationId);

final char lastChar = sb.charAt(sb.length() - 1);
if (lastChar == '|' || lastChar == '?')
Expand Down
17 changes: 17 additions & 0 deletions aeron-client/src/main/java/io/aeron/CommonContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ public static InferableBoolean parse(final String value)
*/
public static final String MDC_CONTROL_MODE_DYNAMIC = "dynamic";

/**
* Valid value for {@link #MDC_CONTROL_MODE_PARAM_NAME} when response control is desired.
*/
public static final String CONTROL_MODE_RESPONSE = "response";

/**
* Key for the session id for a publication or restricted subscription.
*/
Expand Down Expand Up @@ -352,6 +357,18 @@ public static InferableBoolean parse(final String value)
*/
public static final String RESERVED_OFFSET = "reserved";

/**
* Parameter name for the field that will be used to specify the response endpoint on a subscription and publication
* used in a response "server".
*/
public static final String RESPONSE_ENDPOINT_PARAM_NAME = "response-endpoint";

/**
* Parameter name for the field that will be used to specify the correlation id used on a publication to connect it
* to a subscription's image in order to set up a response stream.
*/
public static final String RESPONSE_CORRELATION_ID_PARAM_NAME = "response-correlation-id";

/**
* Property name for a fallback {@link PrintStream} based logger when it is not possible to use the error logging
* callback. Supported values are stdout, stderr, no_op (stderr is the default).
Expand Down
20 changes: 20 additions & 0 deletions aeron-client/src/main/java/io/aeron/protocol/HeaderFlyweight.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,26 @@ public class HeaderFlyweight extends UnsafeBuffer
*/
public static final int HDR_TYPE_RES = 0x07;

/**
* header type ATS Data
*/
public static final int HDR_TYPE_ATS_DATA = 0x08;

/**
* header type ATS Status Message
*/
public static final int HDR_TYPE_ATS_SM = 0x09;

/**
* header type ATS Setup
*/
public static final int HDR_TYPE_ATS_SETUP = 0x0A;

/**
* header type Response Setup
*/
public static final int HDR_TYPE_RSP_SETUP = 0x0B;

/**
* header type EXT
*/
Expand Down
Loading

0 comments on commit faa2854

Please sign in to comment.