Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Response Channels #1534

Merged
merged 41 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
18067e3
[Java] Initial work on response channels.
mikeb01 Oct 24, 2023
bfc7bc5
[Java] Start adding more client based operations to the ClientUriStri…
mikeb01 Oct 25, 2023
d384e73
[Java] Partial implementation of handling multiple publications on th…
mikeb01 Oct 26, 2023
28520b9
[Java] Use new ControlMode.RESPONSE to indicate response channels.
mikeb01 Nov 6, 2023
ced2cfa
[Java] Add test to cover multiple terms with response channels.
mikeb01 Nov 6, 2023
3bbd82a
[Java] Small tidy-ups and TODO comment.
mikeb01 Nov 6, 2023
90a7807
[Java] First basic implementation of connecting response channels.
mikeb01 Nov 9, 2023
a96f873
[Java] Stop sending RSP_SETUP message once response channel it connec…
mikeb01 Nov 10, 2023
acb3de5
[Java] Ensure that new subscription is only added once.
mikeb01 Nov 14, 2023
9e37a82
[Java] Remove println
mikeb01 Nov 14, 2023
483d6b0
[Java] Updating ResponseClient/Server.
mikeb01 Nov 15, 2023
9300eaa
[Java] Test showing that messages will come back to individual sessions.
mikeb01 Nov 15, 2023
db737d4
[Java] Add additional validation for response channel settings and re…
mikeb01 Nov 16, 2023
cf00344
[Java] Use the response correlation id as part of the matching criter…
mikeb01 Nov 16, 2023
af4d1b1
[Java] Check that the subscription exists when setting the response-c…
mikeb01 Nov 16, 2023
eebb98e
[Java] Remove TODO.
mikeb01 Nov 16, 2023
060de88
[Java] Fix CodeQL issues.
mikeb01 Nov 17, 2023
4d09bd1
[Java] Disable ResponseChannelsTest for C/C++ build.
mikeb01 Nov 17, 2023
8b4bc4d
[Java] Test to show that multiple publications to the same response c…
mikeb01 Nov 21, 2023
e2bf277
[Java] Update error message.
mikeb01 Dec 6, 2023
941cc0f
[Java] Change RSP_SETUP message type to allow for ATS messages. Add A…
mikeb01 Dec 6, 2023
debf879
[C] Start adding C implementation for response channels. Add response…
mikeb01 Dec 6, 2023
7972647
[C] More progress toward response channels.
mikeb01 Dec 13, 2023
89df7f0
[C] Additional error reporting. Only set SEND_RESPONSE_FLAG if the pu…
mikeb01 Dec 14, 2023
0bc5b7e
[C] Allow defaulting when parsing 64 bit uri parameters. Add additio…
mikeb01 Dec 27, 2023
253ccf6
[Java] Fix checkstyle.
mikeb01 Dec 28, 2023
663f325
[Java] Add test to ensure that pending response subscription don't be…
mikeb01 Dec 29, 2023
6f56edb
[C] Make sure that response SubscriptionLinks that haven't been linke…
mikeb01 Dec 29, 2023
ec0b453
[Java] Start implementing dissector for RSP_SETUP frames.
mikeb01 Dec 29, 2023
47c1a8f
[Java] Add dissector for RSP_SETUP frame.
mikeb01 Dec 29, 2023
b1cc283
[C] Add dissector for RSP_SETUP frame.
mikeb01 Jan 3, 2024
ba5deb8
[Java] Fix documentation.
mikeb01 Jan 9, 2024
04ce545
[Java] Remove double read of volatile and reverse elements of condition.
mikeb01 Jan 9, 2024
202590a
[C] Remove debug logging.
mikeb01 Jan 9, 2024
af8e3d5
[C] Remove test.
mikeb01 Jan 10, 2024
91004ca
[Java] Add reference tracking on the ReceiveChannelEndpoint for uncon…
mikeb01 Jan 11, 2024
27e395d
[Java] Reverse order of adding connected response stream and decremen…
mikeb01 Jan 12, 2024
7621dcd
[C] Reverse order of adding connected response stream and decrementin…
mikeb01 Jan 12, 2024
8731c44
[C] Resolve publication image before getting the network publication …
mikeb01 Jan 15, 2024
fe88326
[C] Delete udp_channel on failure.
mikeb01 Jan 15, 2024
3db5e74
[Java] Close session when removing in ResponseServer example.
mikeb01 Jan 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions aeron-agent/src/main/java/io/aeron/agent/DriverEventDissector.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ final class DriverEventDissector
private static final RttMeasurementFlyweight RTT_MEASUREMENT = new RttMeasurementFlyweight();
private static final HeaderFlyweight HEADER = new HeaderFlyweight();
private static final ResolutionEntryFlyweight RESOLUTION = new ResolutionEntryFlyweight();
private static final ResponseSetupFlyweight RSP_SETUP = new ResponseSetupFlyweight();
private static final PublicationMessageFlyweight PUB_MSG = new PublicationMessageFlyweight();
private static final SubscriptionMessageFlyweight SUB_MSG = new SubscriptionMessageFlyweight();
private static final PublicationBuffersReadyFlyweight PUB_READY = new PublicationBuffersReadyFlyweight();
Expand Down Expand Up @@ -109,6 +110,11 @@ static void dissectFrame(
dissectResFrame(buffer, frameOffset, builder);
break;

case HeaderFlyweight.HDR_TYPE_RSP_SETUP:
RSP_SETUP.wrap(buffer, frameOffset, buffer.capacity() - frameOffset);
dissectRspSetupFrame(builder);
break;

default:
builder.append("FRAME_UNKNOWN: ").append(frameType);
break;
Expand Down Expand Up @@ -540,6 +546,22 @@ private static void dissectResFrame(
}
}

private static void dissectRspSetupFrame(final StringBuilder builder)
{
builder.append("RSP_SETUP ");
HeaderFlyweight.appendFlagsAsChars(RSP_SETUP.flags(), builder);

builder
.append(" len ")
.append(RSP_SETUP.frameLength())
.append(' ')
.append(RSP_SETUP.sessionId())
.append(':')
.append(RSP_SETUP.streamId())
.append(" RSP_SESSION_ID ")
.append(RSP_SETUP.responseSessionId());
}

private static void dissectResEntry(final StringBuilder builder)
{
builder
Expand Down
12 changes: 12 additions & 0 deletions aeron-client/src/main/c/protocol/aeron_udp_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ typedef struct aeron_option_header_stct
uint16_t type;
}
aeron_option_header_t;

typedef struct aeron_response_setup_header_stct
{
aeron_frame_header_t frame_header;
int32_t session_id;
int32_t stream_id;
int32_t response_session_id;
}
aeron_response_setup_header_t;
#pragma pack(pop)

int aeron_udp_protocol_group_tag(aeron_status_message_header_t *sm, int64_t *group_tag);
Expand All @@ -151,6 +160,7 @@ int aeron_udp_protocol_group_tag(aeron_status_message_header_t *sm, int64_t *gro
#define AERON_HDR_TYPE_ATS_DATA (0x08)
#define AERON_HDR_TYPE_ATS_SETUP (0x09)
#define AERON_HDR_TYPE_ATS_SM (0x0A)
#define AERON_HDR_TYPE_RSP_SETUP (0x0B)
#define AERON_HDR_TYPE_EXT (0xFFFF)

#define AERON_DATA_HEADER_LENGTH (sizeof(aeron_data_header_t))
Expand All @@ -166,6 +176,8 @@ int aeron_udp_protocol_group_tag(aeron_status_message_header_t *sm, int64_t *gro
#define AERON_STATUS_MESSAGE_HEADER_SEND_SETUP_FLAG (UINT8_C(0x80))
#define AERON_STATUS_MESSAGE_HEADER_EOS_FLAG (UINT8_C(0x40))

#define AERON_SETUP_HEADER_SEND_RESPONSE_FLAG (UINT8_C(0x80))

#define AERON_RTTM_HEADER_REPLY_FLAG (UINT8_C(0x80))

#define AERON_RES_HEADER_TYPE_NAME_TO_IP4_MD (0x01)
Expand Down
4 changes: 2 additions & 2 deletions aeron-client/src/main/c/uri/aeron_uri.c
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,12 @@ int aeron_uri_get_int32(aeron_uri_params_t *uri_params, const char *key, int32_t
return 1;
}

int aeron_uri_get_int64(aeron_uri_params_t *uri_params, const char *key, int64_t *retval)
int aeron_uri_get_int64(aeron_uri_params_t *uri_params, const char *key, int64_t default_val, int64_t *retval)
{
const char *value_str;
if ((value_str = aeron_uri_find_param_value(uri_params, key)) == NULL)
{
*retval = 0;
*retval = default_val;
return 0;
}

Expand Down
4 changes: 3 additions & 1 deletion aeron-client/src/main/c/uri/aeron_uri.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ aeron_uri_params_t;
#define AERON_UDP_CHANNEL_CONTROL_MODE_KEY "control-mode"
#define AERON_UDP_CHANNEL_CONTROL_MODE_MANUAL_VALUE "manual"
#define AERON_UDP_CHANNEL_CONTROL_MODE_DYNAMIC_VALUE "dynamic"
#define AERON_UDP_CHANNEL_CONTROL_MODE_RESPONSE_VALUE "response"

#define AERON_URI_INITIAL_TERM_ID_KEY "init-term-id"
#define AERON_URI_TERM_ID_KEY "term-id"
Expand Down Expand Up @@ -70,6 +71,7 @@ aeron_uri_params_t;
#define AERON_URI_CHANNEL_RCV_TIMESTAMP_OFFSET_KEY "channel-rcv-ts-offset"
#define AERON_URI_CHANNEL_SND_TIMESTAMP_OFFSET_KEY "channel-snd-ts-offset"
#define AERON_URI_TIMESTAMP_OFFSET_RESERVED "reserved"
#define AERON_URI_RESPONSE_CORRELATION_ID_KEY "response-correlation-id"
#define AERON_URI_INVALID_TAG (-1)

typedef struct aeron_udp_channel_params_stct
Expand Down Expand Up @@ -136,7 +138,7 @@ uint8_t aeron_uri_multicast_ttl(aeron_uri_t *uri);

const char *aeron_uri_find_param_value(const aeron_uri_params_t *uri_params, const char *key);
int aeron_uri_get_int32(aeron_uri_params_t *uri_params, const char *key, int32_t *retval);
int aeron_uri_get_int64(aeron_uri_params_t *uri_params, const char *key, int64_t *retval);
int aeron_uri_get_int64(aeron_uri_params_t *uri_params, const char *key, int64_t default_val, int64_t *retval);
int aeron_uri_get_bool(aeron_uri_params_t *uri_params, const char *key, bool *retval);
int aeron_uri_get_ats(aeron_uri_params_t *uri_params, aeron_uri_ats_status_t *uri_ats_status);
int aeron_uri_sprint(aeron_uri_t *uri, char *buffer, size_t buffer_len);
Expand Down
93 changes: 90 additions & 3 deletions aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public final class ChannelUriStringBuilder
private String mediaReceiveTimestampOffset;
private String channelReceiveTimestampOffset;
private String channelSendTimestampOffset;
private String responseEndpoint;
private Long responseCorrelationId;
private Boolean isResponseChannel;
private Long responseSubscriptionId;

/**
* Default constructor
Expand Down Expand Up @@ -134,6 +138,8 @@ public ChannelUriStringBuilder(final ChannelUri channelUri)
mediaReceiveTimestampOffset(channelUri);
channelReceiveTimestampOffset(channelUri);
channelSendTimestampOffset(channelUri);
responseEndpoint(channelUri);
responseCorrelationId(channelUri);
}

/**
Expand Down Expand Up @@ -175,6 +181,10 @@ public ChannelUriStringBuilder clear()
mediaReceiveTimestampOffset = null;
channelReceiveTimestampOffset = null;
channelSendTimestampOffset = null;
responseEndpoint = null;
responseCorrelationId = null;
isResponseChannel = null;
responseSubscriptionId = null;

return this;
}
Expand Down Expand Up @@ -428,8 +438,9 @@ public String controlEndpoint()
public ChannelUriStringBuilder controlMode(final String controlMode)
{
if (null != controlMode &&
!controlMode.equals(CommonContext.MDC_CONTROL_MODE_MANUAL) &&
!controlMode.equals(CommonContext.MDC_CONTROL_MODE_DYNAMIC))
!controlMode.equals(MDC_CONTROL_MODE_MANUAL) &&
!controlMode.equals(MDC_CONTROL_MODE_DYNAMIC) &&
!controlMode.equals(CONTROL_MODE_RESPONSE))
{
throw new IllegalArgumentException("invalid control mode: " + controlMode);
}
Expand Down Expand Up @@ -1891,6 +1902,80 @@ public String channelSendTimestampOffset()
return channelSendTimestampOffset;
}

/**
* Set the response endpoint to be used for a response channel subscription or publication.
*
* @param responseEndpoint response endpoint to be used in this channel URI.
* @return this for a fluent API.
* @see CommonContext#RESPONSE_ENDPOINT_PARAM_NAME
*/
public ChannelUriStringBuilder responseEndpoint(final String responseEndpoint)
{
this.responseEndpoint = responseEndpoint;
return this;
}

/**
* Set the response endpoint to be used for a response channel subscription or publication by extracting it from the
* ChannelUri.
*
* @param channelUri the existing URI to extract the responseEndpoint from.
* @return this for a fluent API.
*/
public ChannelUriStringBuilder responseEndpoint(final ChannelUri channelUri)
{
return responseEndpoint(channelUri.get(RESPONSE_ENDPOINT_PARAM_NAME));
}

/**
* The response endpoint to be used for a response channel subscription or publication.
*
* @return response endpoint.
*/
public String responseEndpoint()
{
return this.responseEndpoint;
}

/**
* Set the correlation id from the image received on the response "server's" subscription to be used by a response
* publication.
*
* @param responseCorrelationId correlation id of an image from the response "server's" subscription.
* @return this for a fluent API.
*/
public ChannelUriStringBuilder responseCorrelationId(final Long responseCorrelationId)
{
this.responseCorrelationId = responseCorrelationId;
return this;
}

/**
* Set the correlation id from the image received on the response "server's" subscription to be used by a response
* publication extracted from the channelUri.
*
* @param channelUri the existing URI to extract the responseCorrelationId from.
* @return this for a fluent API.
*/
public ChannelUriStringBuilder responseCorrelationId(final ChannelUri channelUri)
{
final String responseCorrelationIdString = channelUri.get(RESPONSE_CORRELATION_ID_PARAM_NAME);

if (null != responseCorrelationIdString)
{
try
{
responseCorrelationId(Long.valueOf(responseCorrelationIdString));
}
catch (final NumberFormatException ex)
{
throw new IllegalArgumentException("'response-correlation-id' must be a valid long value", ex);
}
}

return this;
}

/**
* Offset into a message to store the channel send timestamp. May also be the special value 'reserved' which means
* to store the timestamp in the reserved value field.
Expand Down Expand Up @@ -1936,7 +2021,7 @@ public ChannelUriStringBuilder channelSendTimestampOffset(final ChannelUri chann
*
* @return a channel URI String for the given parameters.
*/
@SuppressWarnings("MethodLength")
@SuppressWarnings({ "MethodLength", "DuplicatedCode" })
public String build()
{
sb.setLength(0);
Expand Down Expand Up @@ -1983,6 +2068,8 @@ public String build()
appendParameter(sb, MEDIA_RCV_TIMESTAMP_OFFSET_PARAM_NAME, mediaReceiveTimestampOffset);
appendParameter(sb, CHANNEL_RECEIVE_TIMESTAMP_OFFSET_PARAM_NAME, channelReceiveTimestampOffset);
appendParameter(sb, CHANNEL_SEND_TIMESTAMP_OFFSET_PARAM_NAME, channelSendTimestampOffset);
appendParameter(sb, RESPONSE_ENDPOINT_PARAM_NAME, responseEndpoint);
appendParameter(sb, RESPONSE_CORRELATION_ID_PARAM_NAME, responseCorrelationId);

final char lastChar = sb.charAt(sb.length() - 1);
if (lastChar == '|' || lastChar == '?')
Expand Down
17 changes: 17 additions & 0 deletions aeron-client/src/main/java/io/aeron/CommonContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ public static InferableBoolean parse(final String value)
*/
public static final String MDC_CONTROL_MODE_DYNAMIC = "dynamic";

/**
* Valid value for {@link #MDC_CONTROL_MODE_PARAM_NAME} when response control is desired.
*/
public static final String CONTROL_MODE_RESPONSE = "response";

/**
* Key for the session id for a publication or restricted subscription.
*/
Expand Down Expand Up @@ -352,6 +357,18 @@ public static InferableBoolean parse(final String value)
*/
public static final String RESERVED_OFFSET = "reserved";

/**
* Parameter name for the field that will be used to specify the response endpoint on a subscription and publication
* used in a response "server".
*/
public static final String RESPONSE_ENDPOINT_PARAM_NAME = "response-endpoint";

/**
* Parameter name for the field that will be used to specify the correlation id used on a publication to connect it
* to a subscription's image in order to set up a response stream.
*/
public static final String RESPONSE_CORRELATION_ID_PARAM_NAME = "response-correlation-id";

/**
* Property name for a fallback {@link PrintStream} based logger when it is not possible to use the error logging
* callback. Supported values are stdout, stderr, no_op (stderr is the default).
Expand Down
20 changes: 20 additions & 0 deletions aeron-client/src/main/java/io/aeron/protocol/HeaderFlyweight.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,26 @@ public class HeaderFlyweight extends UnsafeBuffer
*/
public static final int HDR_TYPE_RES = 0x07;

/**
* header type ATS Data
*/
public static final int HDR_TYPE_ATS_DATA = 0x08;

/**
* header type ATS Status Message
*/
public static final int HDR_TYPE_ATS_SM = 0x09;

/**
* header type ATS Setup
*/
public static final int HDR_TYPE_ATS_SETUP = 0x0A;

/**
* header type Response Setup
*/
public static final int HDR_TYPE_RSP_SETUP = 0x0B;

/**
* header type EXT
*/
Expand Down
Loading