From e4e62f49e05a9f26b5c86f1eb829f2180604bac1 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Wed, 22 Feb 2023 18:04:14 +0000 Subject: [PATCH] PROTON-2838: Remove pn_data_t operations from frame codec We only convert to pn_data_t if the application requests the data as a pn_data_t. - renamed message rest of message pn_data members deprecated - Added some more useful emmiter/consumer code - parse message directly to raw bytes - parse error conditions without pn_data_t - Avoid using pn_amqp_decode_*C when not really needed - Tidied up message codec --- c/src/core/consumers.h | 15 +- c/src/core/emitters.h | 128 ++++++++--- c/src/core/engine-internal.h | 23 +- c/src/core/engine.c | 160 +++++++++++--- c/src/core/message.c | 231 ++++++++++++-------- c/src/core/transport.c | 328 +++++++++++----------------- c/src/core/util.h | 73 ++++++- c/tools/codec-generator/generate.py | 48 ++-- c/tools/codec-generator/specs.json | 36 +-- cpp/src/message.cpp | 28 ++- 10 files changed, 655 insertions(+), 415 deletions(-) diff --git a/c/src/core/consumers.h b/c/src/core/consumers.h index e9d685de78..92bd0638ae 100644 --- a/c/src/core/consumers.h +++ b/c/src/core/consumers.h @@ -680,17 +680,6 @@ static inline bool consume_described_maybe_type_anything(pni_consumer_t* consume return *qtype; } -static inline bool consume_copy(pni_consumer_t *consumer, pn_data_t *data) { - size_t iposition = consumer->position; - uint8_t type; - bool tq = consume_single_value(consumer, &type); - if (!tq || type==PNE_NULL) return false; - - pn_bytes_t value = {.size = consumer->position-iposition, .start = (const char*)consumer->output_start+iposition}; - ssize_t err = pn_data_decode(data, value.start, value.size); - return err>=0 && err==(ssize_t)value.size; -} - static inline bool consume_described_maybe_type_raw(pni_consumer_t *consumer, bool *qtype, uint64_t *type, pn_bytes_t *raw) { pni_consumer_t subconsumer; *qtype = consume_described_ulong_descriptor(consumer, &subconsumer, type); @@ -704,9 +693,9 @@ static inline bool consume_described_maybe_type_maybe_anything(pni_consumer_t *c return *qtype && *qanything; } -static inline bool consume_described_copy(pni_consumer_t *consumer, pn_data_t *data) { +static inline bool consume_described_raw(pni_consumer_t *consumer, pn_bytes_t *raw) { pni_consumer_t subconsumer; - return consume_described(consumer, &subconsumer) && consume_copy(&subconsumer, data); + return consume_described(consumer, &subconsumer) && consume_raw(&subconsumer, raw); } static inline bool consume_string(pni_consumer_t *consumer, pn_bytes_t *string) { diff --git a/c/src/core/emitters.h b/c/src/core/emitters.h index 68ddeb9d08..13d8be598b 100644 --- a/c/src/core/emitters.h +++ b/c/src/core/emitters.h @@ -558,49 +558,105 @@ static inline void emit_raw(pni_emitter_t* emitter, pni_compound_context* compou compound->count++; } -static inline void emit_multiple(pni_emitter_t* emitter, pni_compound_context* compound, pn_data_t* data) { - if (!data || pn_data_size(data) == 0) { - emit_null(emitter, compound); +// Keep this here as a placeholder until we do something more intelligent +static inline void emit_multiple(pni_emitter_t* emitter, pni_compound_context* compound, const pn_bytes_t bytes) { + emit_raw(emitter, compound, bytes); +} + +static inline void emit_described_type_raw(pni_emitter_t* emitter, pni_compound_context* compound, uint64_t descriptor, const pn_bytes_t bytes) { + emit_descriptor(emitter, compound, descriptor); + pni_compound_context c = make_compound(); + emit_raw(emitter, &c, bytes); + // Can only be a single item (probably a list though) + compound->count++; +} + +static inline void emit_condition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_condition_t* condition) { + if (!condition || !condition->name || !pn_string_get(condition->name)) { + emit_null(emitter, compound0); return; } - pn_handle_t point = pn_data_point(data); - pn_data_rewind(data); - // Rewind and position to first node so data type is defined. - pn_data_next(data); - - if (pn_data_type(data) == PN_ARRAY) { - switch (pn_data_get_array(data)) { - case 0: - emit_null(emitter, compound); - pn_data_restore(data, point); - return; - case 1: - emit_accumulated_nulls(emitter, compound); - pn_data_enter(data); - pn_data_narrow(data); - pni_emitter_data(emitter, data); - pn_data_widen(data); - break; - default: - emit_accumulated_nulls(emitter, compound); - pni_emitter_data(emitter, data); + emit_descriptor(emitter, compound0, ERROR); + for (bool small_encoding = true; ; small_encoding = false) { + pni_compound_context c = emit_list(emitter, compound0, small_encoding, true); + pni_compound_context compound = c; + pn_bytes_t name_bytes = pn_string_bytes(condition->name); + if (name_bytes.size==0) { + emit_null(emitter, &compound); + } else { + emit_symbol_bytes(emitter, &compound, name_bytes); } - } else { - emit_accumulated_nulls(emitter, compound); - pni_emitter_data(emitter, data); + pn_bytes_t description_bytes = pn_string_bytes(condition->description); + if (description_bytes.size==0) { + emit_null(emitter, &compound); + } else { + emit_string_bytes(emitter, &compound, description_bytes); + } + if (condition->info) { + emit_copy(emitter, &compound, condition->info); + } else { + emit_raw(emitter, &compound, condition->info_raw); + } + emit_end_list(emitter, &compound, small_encoding); + if (encode_succeeded(emitter, &compound)) break; } - - compound->count++; - pn_data_restore(data, point); } -static inline void emit_described_type_copy(pni_emitter_t* emitter, pni_compound_context* compound, uint64_t descriptor, pn_data_t* data) { - emit_descriptor(emitter, compound, descriptor); - pni_compound_context c = make_compound(); - emit_copy(emitter, &c, data); - // Can only be a single item (probably a list though) - compound->count++; +static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_disposition_t *disposition) +{ + if (!disposition || !disposition->type) { + emit_null(emitter, compound0); + return; + } + + emit_descriptor(emitter, compound0, disposition->type); + switch (disposition->type) { + case PN_RECEIVED: + for (bool small_encoding = true; ; small_encoding = false) { + pni_compound_context c = emit_list(emitter, compound0, small_encoding, true); + pni_compound_context compound = c; + emit_uint(emitter, &compound, disposition->section_number); + emit_ulong(emitter, &compound, disposition->section_offset); + emit_end_list(emitter, &compound, small_encoding); + if (encode_succeeded(emitter, &compound)) break; + } + return; + case PN_ACCEPTED: + case PN_RELEASED: + return; + case PN_REJECTED: + for (bool small_encoding = true; ; small_encoding = false) { + pni_compound_context c = emit_list(emitter, compound0, small_encoding, true); + pni_compound_context compound = c; + emit_condition(emitter, &compound, &disposition->condition); + emit_end_list(emitter, &compound, small_encoding); + if (encode_succeeded(emitter, &compound)) break; + } + return; + case PN_MODIFIED: + for (bool small_encoding = true; ; small_encoding = false) { + pni_compound_context c = emit_list(emitter, compound0, small_encoding, true); + pni_compound_context compound = c; + emit_bool(emitter, &compound, disposition->failed); + emit_bool(emitter, &compound, disposition->undeliverable); + if (disposition->annotations) { + emit_copy(emitter, &compound, disposition->annotations); + } else { + emit_raw(emitter, &compound, disposition->annotations_raw); + } + emit_end_list(emitter, &compound, small_encoding); + if (encode_succeeded(emitter, &compound)) break; + } + return; + default: + if (disposition->data) { + emit_copy(emitter, compound0, disposition->data); + } else { + emit_raw(emitter, compound0, disposition->data_raw); + } + return; + } } #endif // PROTON_EMITTERS_H diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h index 9f43036b62..2e836b8cf9 100644 --- a/c/src/core/engine-internal.h +++ b/c/src/core/engine-internal.h @@ -40,6 +40,7 @@ typedef enum pn_endpoint_type_t {CONNECTION, SESSION, SENDER, RECEIVER} pn_endpo typedef struct pn_endpoint_t pn_endpoint_t; struct pn_condition_t { + pn_bytes_t info_raw; pn_string_t *name; pn_string_t *description; pn_data_t *info; @@ -136,10 +137,9 @@ struct pn_transport_t { pn_connection_t *connection; // reference counted char *remote_container; char *remote_hostname; - pn_data_t *remote_offered_capabilities; - pn_data_t *remote_desired_capabilities; - pn_data_t *remote_properties; - pn_data_t *disp_data; + pn_bytes_t remote_offered_capabilities_raw; + pn_bytes_t remote_desired_capabilities_raw; + pn_bytes_t remote_properties_raw; // DEFAULT_MAX_FRAME_SIZE see PROTON-2640 #define PN_DEFAULT_MAX_FRAME_SIZE (32*1024) uint32_t local_max_frame; @@ -243,9 +243,15 @@ struct pn_connection_t { pn_string_t *auth_user; pn_string_t *authzid; pn_string_t *auth_password; + pn_bytes_t offered_capabilities_raw; + pn_bytes_t desired_capabilities_raw; + pn_bytes_t properties_raw; pn_data_t *offered_capabilities; pn_data_t *desired_capabilities; pn_data_t *properties; + pn_data_t *remote_offered_capabilities; + pn_data_t *remote_desired_capabilities; + pn_data_t *remote_properties; pn_collector_t *collector; pn_record_t *context; pn_list_t *delivery_pool; @@ -270,6 +276,10 @@ struct pn_session_t { struct pn_terminus_t { pn_string_t *address; + pn_bytes_t properties_raw; + pn_bytes_t capabilities_raw; + pn_bytes_t outcomes_raw; + pn_bytes_t filter_raw; pn_data_t *properties; pn_data_t *capabilities; pn_data_t *outcomes; @@ -297,7 +307,9 @@ struct pn_link_t { pn_delivery_t *current; pn_record_t *context; pn_data_t *properties; + pn_bytes_t properties_raw; pn_data_t *remote_properties; + pn_bytes_t remote_properties_raw; size_t unsettled_count; uint64_t max_message_size; uint64_t remote_max_message_size; @@ -320,7 +332,9 @@ struct pn_disposition_t { pn_condition_t condition; uint64_t type; pn_data_t *data; + pn_bytes_t data_raw; pn_data_t *annotations; + pn_bytes_t annotations_raw; uint64_t section_offset; uint32_t section_number; bool failed; @@ -357,6 +371,7 @@ struct pn_delivery_t { #define PN_SET_REMOTE(OLD, NEW) \ (OLD) = ((OLD) & PN_LOCAL_MASK) | (NEW) +pn_link_t *pn_link_new(int type, pn_session_t *session, pn_string_t *name); void pn_link_dump(pn_link_t *link); void pn_dump(pn_connection_t *conn); diff --git a/c/src/core/engine.c b/c/src/core/engine.c index faf096812c..3d3de7ade3 100644 --- a/c/src/core/engine.c +++ b/c/src/core/engine.c @@ -24,6 +24,8 @@ #include "engine-internal.h" +#include "consumers.h" +#include "core/frame_consumers.h" #include "fixed_string.h" #include "framing.h" #include "memory.h" @@ -220,6 +222,7 @@ pn_transport_t *pn_connection_transport(pn_connection_t *connection) void pn_condition_init(pn_condition_t *condition) { + condition->info_raw = (pn_bytes_t){0, NULL}; condition->name = NULL; condition->description = NULL; condition->info = NULL; @@ -233,6 +236,7 @@ pn_condition_t *pn_condition(void) { void pn_condition_tini(pn_condition_t *condition) { + pn_bytes_free(condition->info_raw); pn_data_free(condition->info); pn_free(condition->description); pn_free(condition->name); @@ -357,6 +361,10 @@ void pn_link_detach(pn_link_t *link) static void pni_terminus_free(pn_terminus_t *terminus) { pn_free(terminus->address); + pn_bytes_free(terminus->properties_raw); + pn_bytes_free(terminus->capabilities_raw); + pn_bytes_free(terminus->outcomes_raw); + pn_bytes_free(terminus->filter_raw); pn_free(terminus->properties); pn_free(terminus->capabilities); pn_free(terminus->outcomes); @@ -513,9 +521,15 @@ static void pn_connection_finalize(void *object) pn_free(conn->auth_user); pn_free(conn->authzid); pn_free(conn->auth_password); + pn_bytes_free(conn->offered_capabilities_raw); + pn_bytes_free(conn->desired_capabilities_raw); + pn_bytes_free(conn->properties_raw); pn_free(conn->offered_capabilities); pn_free(conn->desired_capabilities); pn_free(conn->properties); + pn_free(conn->remote_offered_capabilities); + pn_free(conn->remote_desired_capabilities); + pn_free(conn->remote_properties); pni_endpoint_tini(endpoint); pn_free(conn->delivery_pool); } @@ -548,9 +562,15 @@ pn_connection_t *pn_connection(void) conn->auth_user = pn_string(NULL); conn->authzid = pn_string(NULL); conn->auth_password = pn_string(NULL); - conn->offered_capabilities = pn_data(0); - conn->desired_capabilities = pn_data(0); - conn->properties = pn_data(0); + conn->offered_capabilities_raw = (pn_bytes_t){0, NULL}; + conn->desired_capabilities_raw = (pn_bytes_t){0, NULL}; + conn->properties_raw = (pn_bytes_t){0, NULL}; + conn->offered_capabilities = NULL; + conn->desired_capabilities = NULL; + conn->properties = NULL; + conn->remote_offered_capabilities = NULL; + conn->remote_desired_capabilities = NULL; + conn->remote_properties = NULL; conn->collector = NULL; conn->context = pn_record(); conn->delivery_pool = pn_list(&PN_CLASSCLASS(pn_delivery), 0); @@ -647,37 +667,49 @@ void pn_connection_set_password(pn_connection_t *connection, const char *passwor pn_data_t *pn_connection_offered_capabilities(pn_connection_t *connection) { assert(connection); + pni_switch_to_data(&connection->offered_capabilities_raw, &connection->offered_capabilities); return connection->offered_capabilities; } pn_data_t *pn_connection_desired_capabilities(pn_connection_t *connection) { assert(connection); + pni_switch_to_data(&connection->desired_capabilities_raw, &connection->desired_capabilities); return connection->desired_capabilities; } pn_data_t *pn_connection_properties(pn_connection_t *connection) { assert(connection); + pni_switch_to_data(&connection->properties_raw, &connection->properties); return connection->properties; } pn_data_t *pn_connection_remote_offered_capabilities(pn_connection_t *connection) { assert(connection); - return connection->transport ? connection->transport->remote_offered_capabilities : NULL; + if (!connection->transport) + return NULL; + pni_switch_to_data(&connection->transport->remote_offered_capabilities_raw, &connection->remote_offered_capabilities); + return connection->remote_offered_capabilities; } pn_data_t *pn_connection_remote_desired_capabilities(pn_connection_t *connection) { assert(connection); - return connection->transport ? connection->transport->remote_desired_capabilities : NULL; + if (!connection->transport) + return NULL; + pni_switch_to_data(&connection->transport->remote_desired_capabilities_raw, &connection->remote_desired_capabilities); + return connection->remote_desired_capabilities; } pn_data_t *pn_connection_remote_properties(pn_connection_t *connection) { assert(connection); - return connection->transport ? connection->transport->remote_properties : NULL; + if (!connection->transport) + return NULL; + pni_switch_to_data(&connection->transport->remote_properties_raw, &connection->remote_properties); + return connection->remote_properties; } const char *pn_connection_remote_container(pn_connection_t *connection) @@ -1109,10 +1141,14 @@ static void pni_terminus_init(pn_terminus_t *terminus, pn_terminus_type_t type) terminus->timeout = 0; terminus->dynamic = false; terminus->distribution_mode = PN_DIST_MODE_UNSPECIFIED; - terminus->properties = pn_data(0); - terminus->capabilities = pn_data(0); - terminus->outcomes = pn_data(0); - terminus->filter = pn_data(0); + terminus->properties_raw = (pn_bytes_t){0, NULL}; + terminus->capabilities_raw = (pn_bytes_t){0, NULL}; + terminus->outcomes_raw = (pn_bytes_t){0, NULL}; + terminus->filter_raw = (pn_bytes_t){0, NULL}; + terminus->properties = NULL; + terminus->capabilities = NULL; + terminus->outcomes = NULL; + terminus->filter = NULL; } static void pn_link_incref(void *object) @@ -1155,7 +1191,9 @@ static void pn_link_finalize(void *object) pn_decref(link->session); } pn_free(link->properties); + pn_bytes_free(link->properties_raw); pn_free(link->remote_properties); + pn_bytes_free(link->remote_properties_raw); } #define pn_link_refcount NULL @@ -1165,7 +1203,7 @@ static void pn_link_finalize(void *object) #define pn_link_compare NULL #define pn_link_inspect NULL -pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) +pn_link_t *pn_link_new(int type, pn_session_t *session, pn_string_t *name) { #define pn_link_new NULL #define pn_link_free NULL @@ -1177,7 +1215,7 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) pn_endpoint_init(&link->endpoint, type, session->connection); pni_add_link(session, link); pn_incref(session); // keep session until link finalized - link->name = pn_string(name); + link->name = name; pni_terminus_init(&link->source, PN_SOURCE); pni_terminus_init(&link->target, PN_TARGET); pni_terminus_init(&link->remote_source, PN_UNSPECIFIED); @@ -1201,7 +1239,9 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) link->detached = false; link->more_pending = false; link->properties = 0; + link->properties_raw = (pn_bytes_t){0, NULL}; link->remote_properties = 0; + link->remote_properties_raw = (pn_bytes_t){0, NULL}; // begin transport state link->state.local_handle = -1; @@ -1331,22 +1371,34 @@ int pn_terminus_set_dynamic(pn_terminus_t *terminus, bool dynamic) pn_data_t *pn_terminus_properties(pn_terminus_t *terminus) { - return terminus ? terminus->properties : NULL; + if (!terminus) + return NULL; + pni_switch_to_data(&terminus->properties_raw, &terminus->properties); + return terminus->properties; } pn_data_t *pn_terminus_capabilities(pn_terminus_t *terminus) { - return terminus ? terminus->capabilities : NULL; + if (!terminus) + return NULL; + pni_switch_to_data(&terminus->capabilities_raw, &terminus->capabilities); + return terminus->capabilities; } pn_data_t *pn_terminus_outcomes(pn_terminus_t *terminus) { - return terminus ? terminus->outcomes : NULL; + if (!terminus) + return NULL; + pni_switch_to_data(&terminus->outcomes_raw, &terminus->outcomes); + return terminus->outcomes; } pn_data_t *pn_terminus_filter(pn_terminus_t *terminus) { - return terminus ? terminus->filter : NULL; + if (!terminus) + return NULL; + pni_switch_to_data(&terminus->filter_raw, &terminus->filter); + return terminus->filter; } pn_distribution_mode_t pn_terminus_get_distribution_mode(const pn_terminus_t *terminus) @@ -1376,25 +1428,57 @@ int pn_terminus_copy(pn_terminus_t *terminus, pn_terminus_t *src) terminus->timeout = src->timeout; terminus->dynamic = src->dynamic; terminus->distribution_mode = src->distribution_mode; - err = pn_data_copy(terminus->properties, src->properties); - if (err) return err; - err = pn_data_copy(terminus->capabilities, src->capabilities); - if (err) return err; - err = pn_data_copy(terminus->outcomes, src->outcomes); - if (err) return err; - err = pn_data_copy(terminus->filter, src->filter); - if (err) return err; + pn_bytes_free(terminus->properties_raw); + terminus->properties_raw = pn_bytes_dup(src->properties_raw); + pn_bytes_free(terminus->capabilities_raw); + terminus->capabilities_raw = pn_bytes_dup(src->capabilities_raw); + pn_bytes_free(terminus->outcomes_raw); + terminus->outcomes_raw = pn_bytes_dup(src->outcomes_raw); + pn_bytes_free(terminus->filter_raw); + terminus->filter_raw = pn_bytes_dup(src->filter_raw); + if (!src->properties) { + pn_free(terminus->properties); + terminus->properties = NULL; + } else { + if (!terminus->properties) terminus->properties = pn_data(0); + err = pn_data_copy(terminus->properties, src->properties); + if (err) return err; + } + if (!src->capabilities) { + pn_free(terminus->capabilities); + terminus->capabilities = NULL; + } else { + if (!terminus->capabilities) terminus->capabilities = pn_data(0); + err = pn_data_copy(terminus->capabilities, src->capabilities); + if (err) return err; + } + if (!src->outcomes) { + pn_free(terminus->outcomes); + terminus->outcomes = NULL; + } else { + if (!terminus->outcomes) terminus->outcomes = pn_data(0); + err = pn_data_copy(terminus->outcomes, src->outcomes); + if (err) return err; + } + if (!src->filter) { + pn_free(terminus->filter); + terminus->filter = NULL; + } else { + if (!terminus->filter) terminus->filter = pn_data(0); + err = pn_data_copy(terminus->filter, src->filter); + if (err) return err; + } return 0; } pn_link_t *pn_sender(pn_session_t *session, const char *name) { - return pn_link_new(SENDER, session, name); + return pn_link_new(SENDER, session, pn_string(name)); } pn_link_t *pn_receiver(pn_session_t *session, const char *name) { - return pn_link_new(RECEIVER, session, name); + return pn_link_new(RECEIVER, session, pn_string(name)); } pn_state_t pn_link_state(pn_link_t *link) @@ -1427,7 +1511,9 @@ pn_session_t *pn_link_session(pn_link_t *link) static void pn_disposition_finalize(pn_disposition_t *ds) { pn_free(ds->data); + pn_bytes_free(ds->data_raw); pn_free(ds->annotations); + pn_bytes_free(ds->annotations_raw); pn_condition_tini(&ds->condition); } @@ -1503,8 +1589,10 @@ static void pn_delivery_finalize(void *object) static void pn_disposition_init(pn_disposition_t *ds) { - ds->data = pn_data(0); - ds->annotations = pn_data(0); + ds->data = NULL; + ds->data_raw = (pn_bytes_t){0, NULL}; + ds->annotations = NULL; + ds->annotations_raw = (pn_bytes_t){0, NULL}; pn_condition_init(&ds->condition); } @@ -1517,7 +1605,11 @@ static void pn_disposition_clear(pn_disposition_t *ds) ds->undeliverable = false; ds->settled = false; pn_data_clear(ds->data); + pn_bytes_free(ds->data_raw); + ds->data_raw = (pn_bytes_t){0, NULL}; pn_data_clear(ds->annotations); + pn_bytes_free(ds->annotations_raw); + ds->annotations_raw = (pn_bytes_t){0, NULL}; pn_condition_clear(&ds->condition); } @@ -1677,6 +1769,7 @@ uint64_t pn_disposition_type(pn_disposition_t *disposition) pn_data_t *pn_disposition_data(pn_disposition_t *disposition) { assert(disposition); + pni_switch_to_data(&disposition->data_raw, &disposition->data); return disposition->data; } @@ -1731,6 +1824,7 @@ void pn_disposition_set_undeliverable(pn_disposition_t *disposition, bool undeli pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition) { assert(disposition); + pni_switch_to_data(&disposition->annotations_raw, &disposition->annotations); return disposition->annotations; } @@ -2005,6 +2099,10 @@ pn_data_t *pn_link_properties(pn_link_t *link) pn_data_t *pn_link_remote_properties(pn_link_t *link) { assert(link); + // Annoying inconsistency: nearly everywhere else you *HAVE* to return an empty pn_data_t not NULL + if (link->remote_properties_raw.size) { + pni_switch_to_data(&link->remote_properties_raw, &link->remote_properties); + } return link->remote_properties; } @@ -2156,6 +2254,8 @@ void pn_condition_clear(pn_condition_t *condition) if (condition->name) pn_string_clear(condition->name); if (condition->description) pn_string_clear(condition->description); if (condition->info) pn_data_clear(condition->info); + pn_bytes_free (condition->info_raw); + condition->info_raw = (pn_bytes_t){0, NULL}; } const char *pn_condition_get_name(pn_condition_t *condition) @@ -2228,9 +2328,7 @@ int pn_condition_format(pn_condition_t *condition, const char *name, PN_PRINTF_F pn_data_t *pn_condition_info(pn_condition_t *condition) { assert(condition); - if (condition->info == NULL) { - condition->info = pn_data(0); - } + pni_switch_to_data(&condition->info_raw, &condition->info); return condition->info; } diff --git a/c/src/core/message.c b/c/src/core/message.c index 6c4975c000..4242087b3e 100644 --- a/c/src/core/message.c +++ b/c/src/core/message.c @@ -22,6 +22,7 @@ #include "platform/platform_fmt.h" #include "data.h" +#include "encodings.h" #include "max_align.h" #include "message-internal.h" #include "protocol.h" @@ -45,6 +46,10 @@ struct pn_message_t { pn_atom_t id; pn_atom_t correlation_id; + pn_bytes_t instructions_raw; + pn_bytes_t annotations_raw; + pn_bytes_t properties_raw; + pn_bytes_t body_raw; pn_timestamp_t expiry_time; pn_timestamp_t creation_time; pn_string_t *user_id; @@ -58,10 +63,10 @@ struct pn_message_t { pn_data_t *id_deprecated; pn_data_t *correlation_id_deprecated; - pn_data_t *instructions; - pn_data_t *annotations; - pn_data_t *properties; - pn_data_t *body; + pn_data_t *instructions_deprecated; + pn_data_t *annotations_deprecated; + pn_data_t *properties_deprecated; + pn_data_t *body_deprecated; pn_error_t *error; pn_sequence_t group_sequence; @@ -151,12 +156,16 @@ void pn_message_finalize(void *obj) pn_free(msg->reply_to_group_id); pni_msgid_clear(&msg->id); pni_msgid_clear(&msg->correlation_id); - if (msg->id_deprecated) pn_data_free(msg->id_deprecated); - if (msg->correlation_id_deprecated) pn_data_free(msg->correlation_id_deprecated); - pn_data_free(msg->instructions); - pn_data_free(msg->annotations); - pn_data_free(msg->properties); - pn_data_free(msg->body); + pn_bytes_free(msg->instructions_raw); + pn_bytes_free(msg->annotations_raw); + pn_bytes_free(msg->properties_raw); + pn_bytes_free(msg->body_raw); + pn_data_free(msg->id_deprecated); + pn_data_free(msg->correlation_id_deprecated); + pn_data_free(msg->instructions_deprecated); + pn_data_free(msg->annotations_deprecated); + pn_data_free(msg->properties_deprecated); + pn_data_free(msg->body_deprecated); pn_error_free(msg->error); } @@ -284,30 +293,30 @@ void pn_message_inspect(void *obj, pn_fixed_string_t *dst) comma = true; } - if (pn_data_size(msg->instructions)) { + if (pn_data_size(msg->instructions_deprecated)) { pn_fixed_string_addf(dst, "instructions="); - pn_finspect(msg->instructions, dst); + pn_finspect(msg->instructions_deprecated, dst); pn_fixed_string_addf(dst, ", "); comma = true; } - if (pn_data_size(msg->annotations)) { + if (pn_data_size(msg->annotations_deprecated)) { pn_fixed_string_addf(dst, "annotations="); - pn_finspect(msg->annotations, dst); + pn_finspect(msg->annotations_deprecated, dst); pn_fixed_string_addf(dst, ", "); comma = true; } - if (pn_data_size(msg->properties)) { + if (pn_data_size(msg->properties_deprecated)) { pn_fixed_string_addf(dst, "properties="); - pn_finspect(msg->properties, dst); + pn_finspect(msg->properties_deprecated, dst); pn_fixed_string_addf(dst, ", "); comma = true; } - if (pn_data_size(msg->body)) { + if (pn_data_size(msg->body_deprecated)) { pn_fixed_string_addf(dst, "body="); - pn_finspect(msg->body, dst); + pn_finspect(msg->body_deprecated, dst); pn_fixed_string_addf(dst, ", "); comma = true; } @@ -346,14 +355,18 @@ static pn_message_t *pni_message_new(size_t size) msg->group_id = pn_string(NULL); msg->group_sequence = 0; msg->reply_to_group_id = pn_string(NULL); + msg->instructions_raw = (pn_bytes_t){0, 0}; + msg->annotations_raw = (pn_bytes_t){0, 0}; + msg->properties_raw = (pn_bytes_t){0, 0}; + msg->body_raw = (pn_bytes_t){0, 0}; msg->inferred = false; msg->id_deprecated = NULL; msg->correlation_id_deprecated = NULL; - msg->instructions = pn_data(16); - msg->annotations = pn_data(16); - msg->properties = pn_data(16); - msg->body = pn_data(16); + msg->instructions_deprecated = NULL; + msg->annotations_deprecated = NULL; + msg->properties_deprecated = NULL; + msg->body_deprecated = NULL; msg->error = pn_error(); return msg; @@ -403,12 +416,20 @@ void pn_message_clear(pn_message_t *msg) msg->group_sequence = 0; pn_string_clear(msg->reply_to_group_id); msg->inferred = false; + pn_bytes_free(msg->annotations_raw); + pn_bytes_free(msg->instructions_raw); + pn_bytes_free(msg->properties_raw); + pn_bytes_free(msg->body_raw); + msg->instructions_raw = (pn_bytes_t){0, NULL}; + msg->annotations_raw = (pn_bytes_t){0, NULL}; + msg->properties_raw = (pn_bytes_t){0, NULL}; + msg->body_raw = (pn_bytes_t){0, NULL}; pn_data_clear(msg->id_deprecated); pn_data_clear(msg->correlation_id_deprecated); - pn_data_clear(msg->instructions); - pn_data_clear(msg->annotations); - pn_data_clear(msg->properties); - pn_data_clear(msg->body); + pn_data_clear(msg->instructions_deprecated); + pn_data_clear(msg->annotations_deprecated); + pn_data_clear(msg->properties_deprecated); + pn_data_clear(msg->body_deprecated); } int pn_message_errno(pn_message_t *msg) @@ -432,6 +453,12 @@ bool pn_message_is_inferred(pn_message_t *msg) int pn_message_set_inferred(pn_message_t *msg, bool inferred) { assert(msg); + // If the inferred value changed and we're only holding the raw bytes then we need to get the + // deprecated pn_data_t equivalent as the raw bytes must be reconstructed from the pn_data_t + // interpreted in light of the inferred flag. + if (msg->inferred!=inferred && msg->body_raw.size>0) { + (void) pn_message_body(msg); + } msg->inferred = inferred; return 0; } @@ -534,11 +561,6 @@ int pn_message_set_id(pn_message_t *msg, pn_msgid_t id) return 0; } -static pn_bytes_t pn_string_get_bytes(pn_string_t *string) -{ - return pn_bytes(pn_string_size(string), (char *) pn_string_get(string)); -} - static int pn_string_set_bytes(pn_string_t *string, pn_bytes_t bytes) { return pn_string_setn(string, bytes.start, bytes.size); @@ -547,7 +569,7 @@ static int pn_string_set_bytes(pn_string_t *string, pn_bytes_t bytes) pn_bytes_t pn_message_get_user_id(pn_message_t *msg) { assert(msg); - return pn_string_get_bytes(msg->user_id); + return pn_string_bytes(msg->user_id); } int pn_message_set_user_id(pn_message_t *msg, pn_bytes_t user_id) { @@ -711,6 +733,12 @@ int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size) assert(msg && bytes && size); pn_bytes_t msg_bytes = {.size=size, .start=bytes}; + pn_bytes_t instructions_bytes = {0, 0}; + pn_bytes_t annotations_bytes = {0, 0}; + pn_bytes_t properties_bytes = {0, 0}; + pn_bytes_t body_bytes = {0, 0}; + pn_bytes_t unknown_section_bytes = {0, 0}; + while (msg_bytes.size) { bool scanned; uint64_t desc; @@ -767,59 +795,68 @@ int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size) break; } case DELIVERY_ANNOTATIONS: { - pn_data_clear(msg->instructions); - pn_amqp_decode_DqC(msg_bytes, msg->instructions); - pn_data_rewind(msg->instructions); + pn_amqp_decode_DqR(msg_bytes, &instructions_bytes); break; } case MESSAGE_ANNOTATIONS: { - pn_data_clear(msg->annotations); - pn_amqp_decode_DqC(msg_bytes, msg->annotations); - pn_data_rewind(msg->annotations); + pn_amqp_decode_DqR(msg_bytes, &annotations_bytes); break; } case APPLICATION_PROPERTIES: { - pn_data_clear(msg->properties); - pn_amqp_decode_DqC(msg_bytes, msg->properties); - pn_data_rewind(msg->properties); + pn_amqp_decode_DqR(msg_bytes, &properties_bytes); break; } case DATA: case AMQP_SEQUENCE: { msg->inferred = true; - pn_data_clear(msg->body); - pn_amqp_decode_DqC(msg_bytes, msg->body); - pn_data_rewind(msg->body); + pn_amqp_decode_DqR(msg_bytes, &body_bytes); break; } case AMQP_VALUE: { msg->inferred = false; - pn_data_clear(msg->body); - pn_amqp_decode_DqC(msg_bytes, msg->body); - pn_data_rewind(msg->body); + pn_amqp_decode_DqR(msg_bytes, &body_bytes); break; } case FOOTER: break; default: { - pn_data_clear(msg->body); - pn_data_decode(msg->body, msg_bytes.start, msg_bytes.size); - pn_data_rewind(msg->body); + pn_amqp_decode_R(msg_bytes, &unknown_section_bytes); break; } } msg_bytes = (pn_bytes_t){.size=msg_bytes.size-section_size, .start=msg_bytes.start+section_size}; } + pn_bytes_free(msg->instructions_raw); + msg->instructions_raw = pn_bytes_dup(instructions_bytes); + pn_bytes_free(msg->annotations_raw); + msg->annotations_raw = pn_bytes_dup(annotations_bytes); + pn_bytes_free(msg->properties_raw); + msg->properties_raw = pn_bytes_dup(properties_bytes); + pn_bytes_free(msg->body_raw); + msg->body_raw = pn_bytes_dup(body_bytes); return 0; } + int pn_message_encode(pn_message_t *msg, char *bytes, size_t *isize) { + pn_rwbytes_t scratch = (pn_rwbytes_t){.size=*isize, .start=bytes}; + if (!pni_switch_to_raw_bytes(scratch, &msg->instructions_deprecated, &msg->instructions_raw)) { + return PN_OVERFLOW; + } + if (!pni_switch_to_raw_bytes(scratch, &msg->annotations_deprecated, &msg->annotations_raw)) { + return PN_OVERFLOW; + } + if (!pni_switch_to_raw_bytes(scratch, &msg->properties_deprecated, &msg->properties_raw)) { + return PN_OVERFLOW; + } + if (!pni_switch_to_raw_bytes(scratch, &msg->body_deprecated, &msg->body_raw)) { + return PN_OVERFLOW; + } size_t remaining = *isize; - size_t last_size = 0; size_t total = 0; /* "DL[?o?B?I?o?I]" */ - last_size = pn_amqp_encode_bytes_DLEQoQBQIQoQIe(bytes, remaining, HEADER, + size_t last_size = pn_amqp_encode_bytes_DLEQoQBQIQoQIe(bytes, remaining, HEADER, msg->durable, msg->durable, msg->priority!=HEADER_PRIORITY_DEFAULT, msg->priority, (bool)msg->ttl, msg->ttl, @@ -832,9 +869,8 @@ int pn_message_encode(pn_message_t *msg, char *bytes, size_t *isize) total += last_size; - if (pn_data_size(msg->instructions)) { - pn_data_rewind(msg->instructions); - last_size = pn_amqp_encode_bytes_DLC(bytes, remaining, DELIVERY_ANNOTATIONS, msg->instructions); + if (msg->instructions_raw.size>0) { + last_size = pn_amqp_encode_bytes_DLR(bytes, remaining, DELIVERY_ANNOTATIONS, msg->instructions_raw); if (last_size > remaining) return PN_OVERFLOW; remaining -= last_size; @@ -842,9 +878,8 @@ int pn_message_encode(pn_message_t *msg, char *bytes, size_t *isize) total += last_size; } - if (pn_data_size(msg->annotations)) { - pn_data_rewind(msg->annotations); - last_size = pn_amqp_encode_bytes_DLC(bytes, remaining, MESSAGE_ANNOTATIONS, msg->annotations); + if (msg->annotations_raw.size>0) { + last_size = pn_amqp_encode_bytes_DLR(bytes, remaining, MESSAGE_ANNOTATIONS, msg->annotations_raw); if (last_size > remaining) return PN_OVERFLOW; remaining -= last_size; @@ -880,9 +915,8 @@ int pn_message_encode(pn_message_t *msg, char *bytes, size_t *isize) bytes += last_size; total += last_size; - if (pn_data_size(msg->properties)) { - pn_data_rewind(msg->properties); - last_size = pn_amqp_encode_bytes_DLC(bytes, remaining, APPLICATION_PROPERTIES, msg->properties); + if (msg->properties_raw.size>0) { + last_size = pn_amqp_encode_bytes_DLR(bytes, remaining, APPLICATION_PROPERTIES, msg->properties_raw); if (last_size > remaining) return PN_OVERFLOW; remaining -= last_size; @@ -890,32 +924,29 @@ int pn_message_encode(pn_message_t *msg, char *bytes, size_t *isize) total += last_size; } - if (pn_data_size(msg->body)) { - pn_data_rewind(msg->body); - pn_data_next(msg->body); - pn_type_t body_type = pn_data_type(msg->body); - pn_data_rewind(msg->body); - + if (msg->body_raw.size>0) { uint64_t descriptor = AMQP_VALUE; if (msg->inferred) { - switch (body_type) { - case PN_BINARY: - descriptor = DATA; - break; - case PN_LIST: - descriptor = AMQP_SEQUENCE; - break; - default: - break; + switch ((uint8_t)msg->body_raw.start[0]) { + case PNE_VBIN8: + case PNE_VBIN32: + descriptor = DATA; + break; + case PNE_LIST0: + case PNE_LIST8: + case PNE_LIST32: + descriptor = AMQP_SEQUENCE; + break; } } - last_size = pn_amqp_encode_bytes_DLC(bytes, remaining, descriptor, msg->body); + last_size = pn_amqp_encode_bytes_DLR(bytes, remaining, descriptor, msg->body_raw); if (last_size > remaining) return PN_OVERFLOW; remaining -= last_size; bytes += last_size; total += last_size; } + *isize = total; return 0; } @@ -933,17 +964,17 @@ int pn_message_data(pn_message_t *msg, pn_data_t *data) return pn_error_format(msg->error, err, "data error: %s", pn_error_text(pn_data_error(data))); - if (pn_data_size(msg->instructions)) { - pn_data_rewind(msg->instructions); - err = pn_data_fill(data, "DLC", DELIVERY_ANNOTATIONS, msg->instructions); + if (pn_data_size(msg->instructions_deprecated)) { + pn_data_rewind(msg->instructions_deprecated); + err = pn_data_fill(data, "DLC", DELIVERY_ANNOTATIONS, msg->instructions_deprecated); if (err) return pn_error_format(msg->error, err, "data error: %s", pn_error_text(pn_data_error(data))); } - if (pn_data_size(msg->annotations)) { - pn_data_rewind(msg->annotations); - err = pn_data_fill(data, "DLC", MESSAGE_ANNOTATIONS, msg->annotations); + if (pn_data_size(msg->annotations_deprecated)) { + pn_data_rewind(msg->annotations_deprecated); + err = pn_data_fill(data, "DLC", MESSAGE_ANNOTATIONS, msg->annotations_deprecated); if (err) return pn_error_format(msg->error, err, "data error: %s", pn_error_text(pn_data_error(data))); @@ -974,18 +1005,18 @@ int pn_message_data(pn_message_t *msg, pn_data_t *data) return pn_error_format(msg->error, err, "data error: %s", pn_error_text(pn_data_error(data))); - if (pn_data_size(msg->properties)) { - pn_data_rewind(msg->properties); - err = pn_data_fill(data, "DLC", APPLICATION_PROPERTIES, msg->properties); + if (pn_data_size(msg->properties_deprecated)) { + pn_data_rewind(msg->properties_deprecated); + err = pn_data_fill(data, "DLC", APPLICATION_PROPERTIES, msg->properties_deprecated); if (err) return pn_error_format(msg->error, err, "data error: %s", pn_error_text(pn_data_error(data))); } - if (pn_data_size(msg->body)) { - pn_data_rewind(msg->body); - pn_data_next(msg->body); - pn_type_t body_type = pn_data_type(msg->body); + if (pn_data_size(msg->body_deprecated)) { + pn_data_rewind(msg->body_deprecated); + pn_data_next(msg->body_deprecated); + pn_type_t body_type = pn_data_type(msg->body_deprecated); uint64_t descriptor = AMQP_VALUE; if (msg->inferred) { @@ -1001,8 +1032,8 @@ int pn_message_data(pn_message_t *msg, pn_data_t *data) } } - pn_data_rewind(msg->body); - err = pn_data_fill(data, "DLC", descriptor, msg->body); + pn_data_rewind(msg->body_deprecated); + err = pn_data_fill(data, "DLC", descriptor, msg->body_deprecated); if (err) return pn_error_format(msg->error, err, "data error: %s", pn_error_text(pn_data_error(data))); @@ -1012,22 +1043,30 @@ int pn_message_data(pn_message_t *msg, pn_data_t *data) pn_data_t *pn_message_instructions(pn_message_t *msg) { - return msg ? msg->instructions : NULL; + if (!msg) return NULL; + pni_switch_to_data(&msg->instructions_raw, &msg->instructions_deprecated); + return msg->instructions_deprecated; } pn_data_t *pn_message_annotations(pn_message_t *msg) { - return msg ? msg->annotations : NULL; + if (!msg) return NULL; + pni_switch_to_data(&msg->annotations_raw, &msg->annotations_deprecated); + return msg->annotations_deprecated; } pn_data_t *pn_message_properties(pn_message_t *msg) { - return msg ? msg->properties : NULL; + if (!msg) return NULL; + pni_switch_to_data(&msg->properties_raw, &msg->properties_deprecated); + return msg->properties_deprecated; } pn_data_t *pn_message_body(pn_message_t *msg) { - return msg ? msg->body : NULL; + if (!msg) return NULL; + pni_switch_to_data(&msg->body_raw, &msg->body_deprecated); + return msg->body_deprecated; } ssize_t pn_message_encode2(pn_message_t *msg, pn_rwbytes_t *buffer) { diff --git a/c/src/core/transport.c b/c/src/core/transport.c index 10adaa3a69..04d05bc57f 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -461,10 +461,9 @@ static void pn_transport_initialize(void *object) transport->remote_idle_timeout = 0; transport->keepalive_deadline = 0; transport->last_bytes_output = 0; - transport->remote_offered_capabilities = pn_data(0); - transport->remote_desired_capabilities = pn_data(0); - transport->remote_properties = pn_data(0); - transport->disp_data = pn_data(0); + transport->remote_offered_capabilities_raw = (pn_bytes_t){0, NULL}; + transport->remote_desired_capabilities_raw = (pn_bytes_t){0, NULL}; + transport->remote_properties_raw = (pn_bytes_t){0, NULL}; pn_condition_init(&transport->remote_condition); pn_condition_init(&transport->condition); transport->error = pn_error(); @@ -652,10 +651,9 @@ static void pn_transport_finalize(void *object) pn_sasl_free(transport); pni_mem_deallocate(PN_CLASSCLASS(pn_strdup), transport->remote_container); pni_mem_deallocate(PN_CLASSCLASS(pn_strdup), transport->remote_hostname); - pn_free(transport->remote_offered_capabilities); - pn_free(transport->remote_desired_capabilities); - pn_free(transport->remote_properties); - pn_free(transport->disp_data); + pn_bytes_free(transport->remote_offered_capabilities_raw); + pn_bytes_free(transport->remote_desired_capabilities_raw); + pn_bytes_free(transport->remote_properties_raw); pn_condition_tini(&transport->remote_condition); pn_condition_tini(&transport->condition); pn_error_free(transport->error); @@ -851,35 +849,6 @@ bool pni_disposition_batchable(pn_disposition_t *disposition) } } -static int pni_disposition_encode(pn_disposition_t *disposition, pn_data_t *data) -{ - pn_condition_t *cond = &disposition->condition; - switch (disposition->type) { - case PN_RECEIVED: - PN_RETURN_IF_ERROR(pn_data_put_list(data)); - pn_data_enter(data); - PN_RETURN_IF_ERROR(pn_data_put_uint(data, disposition->section_number)); - PN_RETURN_IF_ERROR(pn_data_put_ulong(data, disposition->section_offset)); - pn_data_exit(data); - return 0; - case PN_ACCEPTED: - case PN_RELEASED: - return 0; - case PN_REJECTED: - return pn_data_fill(data, "[?DL[sSC]]", pn_condition_is_set(cond), ERROR, - pn_condition_get_name(cond), - pn_condition_get_description(cond), - pn_condition_info(cond)); - case PN_MODIFIED: - return pn_data_fill(data, "[ooC]", - disposition->failed, - disposition->undeliverable, - disposition->annotations); - default: - return pn_data_copy(data, disposition->data); - } -} - static int pni_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch, uint32_t handle, pn_sequence_t id, @@ -889,8 +858,7 @@ static int pni_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch, bool settled, bool more, pn_sequence_t frame_limit, - uint64_t code, - pn_data_t* state, + pn_disposition_t *disposition, bool resume, bool aborted, bool batchable) @@ -902,14 +870,14 @@ static int pni_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch, compute_performatives:; /* "DL[IIzI?o?on?DLC?o?o?o]" */ pn_bytes_t performative = - pn_amqp_encode_DLEIIzIQoQonQDLCQoQoQoe(&transport->scratch_space, TRANSFER, + pn_amqp_encode_DLEIIzIQoQondQoQoQoe(&transport->scratch_space, TRANSFER, handle, id, tag.size, tag.start, message_format, settled, settled, more_flag, more_flag, - (bool)code, code, state, + disposition, resume, resume, aborted, aborted, batchable, batchable); @@ -952,17 +920,7 @@ static int pni_post_close(pn_transport_t *transport, pn_condition_t *cond) if (!cond && transport->connection) { cond = pn_connection_condition(transport->connection); } - pn_string_t *condition = NULL; - pn_string_t *description = NULL; - pn_data_t *info = NULL; - if (pn_condition_is_set(cond)) { - condition = cond->name; - description = cond->description; - info = pn_condition_info(cond); - } - /* "DL[?DL[sSC]]" */ - pn_bytes_t buf = pn_amqp_encode_DLEQDLEsSCee(&transport->scratch_space, CLOSE, - (bool) condition, ERROR, pn_string_bytes(condition), pn_string_bytes(description), info); + pn_bytes_t buf = pn_amqp_encode_DLEce(&transport->scratch_space, CLOSE, cond); return pn_framing_send_amqp(transport, 0, buf); } @@ -1051,19 +1009,19 @@ int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, uint16_t remote_channel_max; uint32_t remote_max_frame; pn_bytes_t remote_container, remote_hostname; - pn_data_clear(transport->remote_offered_capabilities); - pn_data_clear(transport->remote_desired_capabilities); - pn_data_clear(transport->remote_properties); + pn_bytes_t remote_offered_capabilities; + pn_bytes_t remote_desired_capabilities; + pn_bytes_t remote_properties; - pn_amqp_decode_DqEQSQSQIQHIqqCCCe(payload, + pn_amqp_decode_DqEQSQSQIQHIqqRRRe(payload, &container_q, &remote_container, &hostname_q, &remote_hostname, &remote_max_frame_q, &remote_max_frame, &remote_channel_max_q, &remote_channel_max, &transport->remote_idle_timeout, - transport->remote_offered_capabilities, - transport->remote_desired_capabilities, - transport->remote_properties); + &remote_offered_capabilities, + &remote_desired_capabilities, + &remote_properties); /* * The default value is already stored in the variable. * But the scanner zeroes out values if it does not @@ -1086,6 +1044,13 @@ int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pni_mem_deallocate(PN_CLASSCLASS(pn_strdup), transport->remote_hostname); transport->remote_hostname = hostname_q ? pn_bytes_strdup(remote_hostname) : NULL; + pn_bytes_free(transport->remote_offered_capabilities_raw); + transport->remote_offered_capabilities_raw = pn_bytes_dup(remote_offered_capabilities); + pn_bytes_free(transport->remote_desired_capabilities_raw); + transport->remote_desired_capabilities_raw = pn_bytes_dup(remote_desired_capabilities); + pn_bytes_free(transport->remote_properties_raw); + transport->remote_properties_raw = pn_bytes_dup(remote_properties); + pn_connection_t *conn = transport->connection; if (conn) { PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE); @@ -1229,58 +1194,41 @@ int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel uint8_t snd_settle_mode, rcv_settle_mode; uint64_t max_msgsz; bool has_props; - pn_data_t *rem_props = pn_data(0); - pn_amqp_decode_DqESIoQBQBDqESIsIoqseDqESIsIoeqqILqqQCe(payload, + pn_bytes_t rem_props = (pn_bytes_t){0, NULL}; + pn_amqp_decode_DqESIoQBQBDqESIsIoqseDqESIsIoeqqILqqQRe(payload, &name, &handle, &is_sender, &snd_settle, &snd_settle_mode, &rcv_settle, &rcv_settle_mode, &source, &src_dr, &src_exp, &src_timeout, &src_dynamic, &dist_mode, &target, &tgt_dr, &tgt_exp, &tgt_timeout, &tgt_dynamic, - &idc, &max_msgsz, &has_props, rem_props); - char strbuf[128]; // avoid malloc for most link names - char *strheap = (name.size >= sizeof(strbuf)) ? (char *) malloc(name.size + 1) : NULL; - char *strname = strheap ? strheap : strbuf; - if (name.size > 0) strncpy(strname, name.start, name.size); - strname[name.size] = '\0'; - + &idc, &max_msgsz, &has_props, &rem_props); pn_session_t *ssn = pni_channel_state(transport, channel); if (!ssn) { pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel); - if (strheap) free(strheap); - pn_free(rem_props); return PN_EOS; } if (handle > ssn->local_handle_max) { pn_do_error(transport, "amqp:connection:framing-error", "remote handle %u is above handle_max %u", handle, ssn->local_handle_max); - if (strheap) free(strheap); - pn_free(rem_props); return PN_EOS; } pn_link_t *link = pni_find_link(ssn, name, is_sender); if (link && (int32_t)link->state.remote_handle >= 0) { - pn_do_error(transport, "amqp:invalid-field", "link name already attached: %s", strname); - if (strheap) free(strheap); - pn_free(rem_props); + pn_do_error(transport, "amqp:invalid-field", "link name already attached: %.*s", (int)name.size, name.start); return PN_EOS; } if (!link) { /* Make a new link for the attach */ if (is_sender) { - link = (pn_link_t *) pn_sender(ssn, strname); + link = (pn_link_t *) pn_link_new(SENDER, ssn, pn_stringn(name.start, name.size)); } else { - link = (pn_link_t *) pn_receiver(ssn, strname); + link = (pn_link_t *) pn_link_new(RECEIVER, ssn, pn_stringn(name.start, name.size)); } } - if (strheap) { - free(strheap); - } - if (has_props) { - link->remote_properties = rem_props; - } else { - pn_free(rem_props); + pn_bytes_free(link->remote_properties_raw); + link->remote_properties_raw = pn_bytes_dup(rem_props); } pni_map_remote_handle(link, handle); @@ -1322,37 +1270,41 @@ int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel if (rcv_settle) link->remote_rcv_settle_mode = rcv_settle_mode; - pn_data_clear(link->remote_source.properties); - pn_data_clear(link->remote_source.filter); - pn_data_clear(link->remote_source.outcomes); - pn_data_clear(link->remote_source.capabilities); - - pn_amqp_decode_DqEqqqqqDqEqqqqqCqCqCCee(payload, - link->remote_source.properties, - link->remote_source.filter, - link->remote_source.outcomes, - link->remote_source.capabilities); - - pn_data_rewind(link->remote_source.properties); - pn_data_rewind(link->remote_source.filter); - pn_data_rewind(link->remote_source.outcomes); - pn_data_rewind(link->remote_source.capabilities); - - pn_data_clear(link->remote_target.properties); - pn_data_clear(link->remote_target.capabilities); - + pn_bytes_t rem_src_properties = (pn_bytes_t) {0, NULL}; + pn_bytes_t rem_src_filter = (pn_bytes_t) {0, NULL}; + pn_bytes_t rem_src_outcomes = (pn_bytes_t) {0, NULL}; + pn_bytes_t rem_src_capabilities = (pn_bytes_t) {0, NULL}; + pn_amqp_decode_DqEqqqqqDqEqqqqqRqRqRRee(payload, + &rem_src_properties, + &rem_src_filter, + &rem_src_outcomes, + &rem_src_capabilities); + + pn_bytes_free(rsrc->properties_raw); + rsrc->properties_raw = pn_bytes_dup(rem_src_properties); + pn_bytes_free(rsrc->filter_raw); + rsrc->filter_raw = pn_bytes_dup(rem_src_filter); + pn_bytes_free(rsrc->outcomes_raw); + rsrc->outcomes_raw = pn_bytes_dup(rem_src_outcomes); + pn_bytes_free(rsrc->capabilities_raw); + rsrc->capabilities_raw = pn_bytes_dup(rem_src_capabilities); + + pn_bytes_t rem_tgt_properties = (pn_bytes_t) {0, NULL}; + pn_bytes_t rem_tgt_capabilities = (pn_bytes_t) {0, NULL}; if (pn_terminus_get_type(&link->remote_target) == PN_COORDINATOR) { // coordinator target only has a capabilities field - pn_amqp_decode_DqEqqqqqDqqDqECeqqqe(payload, - link->remote_target.capabilities); + pn_amqp_decode_DqEqqqqqDqqDqEReqqqe(payload, + &rem_tgt_capabilities); } else { - pn_amqp_decode_DqEqqqqqDqqDqEqqqqqCCee(payload, - link->remote_target.properties, - link->remote_target.capabilities); + pn_amqp_decode_DqEqqqqqDqqDqEqqqqqRRee(payload, + &rem_tgt_properties, + &rem_tgt_capabilities); } - pn_data_rewind(link->remote_target.properties); - pn_data_rewind(link->remote_target.capabilities); + pn_bytes_free(rtgt->properties_raw); + rtgt->properties_raw = pn_bytes_dup(rem_tgt_properties); + pn_bytes_free(rtgt->capabilities_raw); + rtgt->capabilities_raw = pn_bytes_dup(rem_tgt_capabilities); if (!is_sender) { link->state.delivery_count = idc; @@ -1459,8 +1411,8 @@ int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t chann } if (has_type) { delivery->remote.type = type; - pn_data_clear(delivery->remote.data); - pn_data_decode(delivery->remote.data, disp_data.start, disp_data.size); + pn_bytes_free(delivery->remote.data_raw); + delivery->remote.data_raw = pn_bytes_dup(disp_data); } link->state.delivery_count++; link->state.link_credit--; @@ -1567,7 +1519,7 @@ int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, return 0; } -static void pn_condition_set(pn_condition_t *condition, pn_bytes_t cond, pn_bytes_t desc) +static void pn_condition_set(pn_condition_t *condition, pn_bytes_t cond, pn_bytes_t desc, pn_bytes_t info) { if (condition->name == NULL) { condition->name = pn_string(NULL); @@ -1577,6 +1529,9 @@ static void pn_condition_set(pn_condition_t *condition, pn_bytes_t cond, pn_byte condition->description = pn_string(NULL); } pn_string_setn(condition->description, desc.start, desc.size); + pn_data_clear(condition->info); + pn_bytes_free(condition->info_raw); + condition->info_raw = pn_bytes_dup(info); } static inline bool sequence_lte(pn_sequence_t a, pn_sequence_t b) { @@ -1611,11 +1566,9 @@ static int pni_do_delivery_disposition(pn_transport_t * transport, pn_delivery_t case PN_REJECTED: { pn_bytes_t cond; pn_bytes_t desc; - pn_data_t *info = pn_condition_info(&remote->condition); - pn_data_clear(info); - pn_amqp_decode_DqEDqEsSCee(disp_data, &cond, &desc, info); - pn_data_rewind(info); - pn_condition_set(&remote->condition, cond, desc); + pn_bytes_t info; + pn_amqp_decode_DqEDqEsSRee(disp_data, &cond, &desc, &info); + pn_condition_set(&remote->condition, cond, desc, info); break; } @@ -1627,9 +1580,10 @@ static int pni_do_delivery_disposition(pn_transport_t * transport, pn_delivery_t bool failed; bool qundeliverable; bool undeliverable; - pn_data_clear(remote->annotations); - pn_amqp_decode_DqEQoQoCe(disp_data, &qfailed, &failed, &qundeliverable, &undeliverable, remote->annotations); - pn_data_rewind(remote->annotations); + pn_bytes_t annotations_raw = (pn_bytes_t){0, NULL}; + pn_amqp_decode_DqEQoQoRe(disp_data, &qfailed, &failed, &qundeliverable, &undeliverable, &annotations_raw); + pn_bytes_free(remote->annotations_raw); + remote->annotations_raw = pn_bytes_dup(annotations_raw); if (qfailed) { remote->failed = failed; @@ -1639,11 +1593,13 @@ static int pni_do_delivery_disposition(pn_transport_t * transport, pn_delivery_t } break; } - default: - pn_data_clear(remote->data); - pn_amqp_decode_DqC(disp_data, remote->data); - pn_data_rewind(remote->data); + default: { + pn_bytes_t data_raw = (pn_bytes_t){0, NULL}; + pn_amqp_decode_DqR(disp_data, &data_raw); + pn_bytes_free(remote->data_raw); + remote->data_raw = pn_bytes_dup(data_raw); break; + } } } @@ -1716,16 +1672,17 @@ int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t ch int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload) { + pn_session_t *ssn = pni_channel_state(transport, channel); + if (!ssn) { + return pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel); + } + uint32_t handle; bool closed; pn_bytes_t error_condition; pn_amqp_decode_DqEIoRe(payload, &handle, &closed, &error_condition); - pn_session_t *ssn = pni_channel_state(transport, channel); - if (!ssn) { - return pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel); - } pn_link_t *link = pni_handle_state(ssn, handle); if (!link) { return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", handle); @@ -1733,11 +1690,10 @@ int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel pn_bytes_t cond; pn_bytes_t desc; + pn_bytes_t info; + pn_amqp_decode_DqEsSRe(error_condition, &cond, &desc, &info); pn_condition_t* condition = &link->endpoint.remote_condition; - pn_condition_clear(condition); - pn_amqp_decode_DqEsSCe(error_condition, &cond, &desc, pn_condition_info(condition)); - pn_condition_set(condition, cond, desc); - pn_data_rewind(pn_condition_info(condition)); + pn_condition_set(condition, cond, desc, info); if (closed) { @@ -1760,11 +1716,10 @@ int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, p pn_bytes_t cond; pn_bytes_t desc; + pn_bytes_t info; + pn_amqp_decode_DqEDqEsSRee(payload, &cond, &desc, &info); pn_condition_t* condition = &ssn->endpoint.remote_condition; - pn_condition_clear(condition); - pn_amqp_decode_DqEDqEsSCee(payload, &cond, &desc, pn_condition_info(condition)); - pn_condition_set(condition, cond, desc); - pn_data_rewind(pn_condition_info(condition)); + pn_condition_set(condition, cond, desc, info); PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED); pn_collector_put_object(transport->connection->collector, ssn, PN_SESSION_REMOTE_CLOSE); pni_unmap_remote_channel(ssn); @@ -1777,11 +1732,10 @@ int pn_do_close(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t cond; pn_bytes_t desc; + pn_bytes_t info; + pn_amqp_decode_DqEDqEsSRee(payload, &cond, &desc, &info); pn_condition_t* condition = &transport->remote_condition; - pn_condition_clear(condition); - pn_amqp_decode_DqEDqEsSCee(payload, &cond, &desc, pn_condition_info(condition)); - pn_condition_set(condition, cond, desc); - pn_data_rewind(pn_condition_info(condition)); + pn_condition_set(condition, cond, desc, info); transport->close_rcvd = true; PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED); pn_collector_put_object(transport->connection->collector, conn, PN_CONNECTION_REMOTE_CLOSE); @@ -1872,8 +1826,10 @@ static int pni_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endp pn_bytes_t cid = pn_string_bytes(connection->container); if (cid.start==NULL) cid = (pn_bytes_t){.size=0, .start=""}; pni_calculate_channel_max(transport); - /* "DL[SS?I?H?InnMMC]" */ - pn_bytes_t buf = pn_amqp_encode_DLESSQIQHQInnMMCe(&transport->scratch_space, OPEN, + pni_switch_to_raw_multiple(&transport->scratch_space, &connection->offered_capabilities, &connection->offered_capabilities_raw); + pni_switch_to_raw_multiple(&transport->scratch_space, &connection->desired_capabilities, &connection->desired_capabilities_raw); + pni_switch_to_raw(&transport->scratch_space, &connection->properties, &connection->properties_raw); + pn_bytes_t buf = pn_amqp_encode_DLESSQIQHQInnMMRe(&transport->scratch_space, OPEN, cid, pn_string_bytes(connection->hostname), // TODO: This is messy, because we also have to allow local_max_frame_ to be 0 to mean unlimited @@ -1882,9 +1838,9 @@ static int pni_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endp transport->local_max_frame, transport->channel_max!=OPEN_CHANNEL_MAX_DEFAULT, transport->channel_max, (bool)idle_timeout, idle_timeout, - connection->offered_capabilities, - connection->desired_capabilities, - connection->properties); + connection->offered_capabilities_raw, + connection->desired_capabilities_raw, + connection->properties_raw); int err = pn_framing_send_amqp(transport, 0, buf); if (err) return err; transport->open_sent = true; @@ -2021,8 +1977,14 @@ static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endp return PN_ERR; } const pn_distribution_mode_t dist_mode = (pn_distribution_mode_t) link->source.distribution_mode; + // Until we encode directly from the raw + pni_switch_to_raw(&transport->scratch_space, &link->source.properties, &link->source.properties_raw); + pni_switch_to_raw(&transport->scratch_space, &link->source.filter, &link->source.filter_raw); + pni_switch_to_raw_multiple(&transport->scratch_space, &link->source.outcomes, &link->source.outcomes_raw); + pni_switch_to_raw_multiple(&transport->scratch_space, &link->source.capabilities, &link->source.capabilities_raw); + pni_switch_to_raw_multiple(&transport->scratch_space, &link->target.capabilities, &link->target.capabilities_raw); if (link->target.type == PN_COORDINATOR) { - pn_bytes_t buf = pn_amqp_encode_DLESIoBBQDLESIsIoCQsCnCCeDLECennIe(&transport->scratch_space, ATTACH, + pn_bytes_t buf = pn_amqp_encode_DLESIoBBQDLESIsIoRQsRnRReDLERennIe(&transport->scratch_space, ATTACH, pn_string_bytes(link->name), state->local_handle, endpoint->type == RECEIVER, @@ -2034,18 +1996,19 @@ static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endp expiry_symbol(&link->source), link->source.timeout, link->source.dynamic, - link->source.properties, + link->source.properties_raw, (dist_mode != PN_DIST_MODE_UNSPECIFIED), dist_mode2symbol(dist_mode), - link->source.filter, - link->source.outcomes, - link->source.capabilities, - COORDINATOR, link->target.capabilities, + link->source.filter_raw, + link->source.outcomes_raw, + link->source.capabilities_raw, + COORDINATOR, link->target.capabilities_raw, 0); int err = pn_framing_send_amqp(transport, ssn_state->local_channel, buf); if (err) return err; } else { - /* "DL[SIoBB?DL[SIsIoC?sCnMM]?DL[SIsIoCM]nnILnnC]" */ - pn_bytes_t buf = pn_amqp_encode_DLESIoBBQDLESIsIoCQsCnMMeQDLESIsIoCMennILnnCe(&transport->scratch_space, ATTACH, + pni_switch_to_raw(&transport->scratch_space, &link->properties, &link->properties_raw); + pni_switch_to_raw(&transport->scratch_space, &link->target.properties, &link->target.properties_raw); + pn_bytes_t buf = pn_amqp_encode_DLESIoBBQDLESIsIoRQsRnMMeQDLESIsIoRMennILnnRe(&transport->scratch_space, ATTACH, pn_string_bytes(link->name), state->local_handle, endpoint->type == RECEIVER, @@ -2058,11 +2021,11 @@ static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endp expiry_symbol(&link->source), link->source.timeout, link->source.dynamic, - link->source.properties, + link->source.properties_raw, (dist_mode != PN_DIST_MODE_UNSPECIFIED), dist_mode2symbol(dist_mode), - link->source.filter, - link->source.outcomes, - link->source.capabilities, + link->source.filter_raw, + link->source.outcomes_raw, + link->source.capabilities_raw, (bool) link->target.type, TARGET, pn_string_bytes(link->target.address), @@ -2070,12 +2033,12 @@ static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endp expiry_symbol(&link->target), link->target.timeout, link->target.dynamic, - link->target.properties, - link->target.capabilities, + link->target.properties_raw, + link->target.capabilities_raw, 0, link->max_message_size, - link->properties); + link->properties_raw); int err = pn_framing_send_amqp(transport, ssn_state->local_channel, buf); if (err) return err; } @@ -2162,13 +2125,10 @@ static int pni_post_disp(pn_transport_t *transport, pn_delivery_t *delivery) } if (!pni_disposition_batchable(&delivery->local)) { - pn_data_clear(transport->disp_data); - PN_RETURN_IF_ERROR(pni_disposition_encode(&delivery->local, transport->disp_data)); - /* "DL[oIn?o?DLC]" */ - pn_bytes_t buf = pn_amqp_encode_DLEoInQoQDLCe(&transport->scratch_space,DISPOSITION, + pn_bytes_t buf = pn_amqp_encode_DLEoInQode(&transport->scratch_space, DISPOSITION, role, state->id, delivery->local.settled, delivery->local.settled, - (bool)code, code, transport->disp_data); + &delivery->local); return pn_framing_send_amqp(transport, ssn->state.local_channel, buf); } @@ -2223,8 +2183,6 @@ static int pni_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *d pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes); size_t full_size = bytes.size; - pn_data_clear(transport->disp_data); - PN_RETURN_IF_ERROR(pni_disposition_encode(&delivery->local, transport->disp_data)); int count = pni_post_amqp_transfer_frame(transport, ssn_state->local_channel, link_state->local_handle, @@ -2233,8 +2191,7 @@ static int pni_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *d delivery->local.settled, !delivery->done, ssn_state->remote_incoming_window, - delivery->local.type, - transport->disp_data, + &delivery->local, false, /* Resume */ delivery->aborted, false /* Batchable */ @@ -2392,20 +2349,10 @@ static int pni_process_link_teardown(pn_transport_t *transport, pn_endpoint_t *e (int16_t) ssn_state->remote_channel != -2 && !transport->close_rcvd) return 0; - pn_string_t *name = NULL; - pn_string_t *description = NULL; - pn_data_t *info = NULL; - - if (pn_condition_is_set(&endpoint->condition)) { - name = endpoint->condition.name; - description = endpoint->condition.description; - info = pn_condition_info(&endpoint->condition); - } - /* "DL[I?o?DL[sSC]]" */ - pn_bytes_t buf = pn_amqp_encode_DLEIQoQDLEsSCee(&transport->scratch_space, DETACH, - state->local_handle, - !link->detached, !link->detached, - (bool)name, ERROR, pn_string_bytes(name), pn_string_bytes(description), info); + pn_bytes_t buf = pn_amqp_encode_DLEIQoce(&transport->scratch_space, DETACH, + state->local_handle, + !link->detached, !link->detached, + &endpoint->condition); int err = pn_framing_send_amqp(transport, ssn_state->local_channel, buf); if (err) return err; pni_unmap_local_handle(link); @@ -2468,18 +2415,7 @@ static int pni_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *en return 0; } - pn_string_t *name = NULL; - pn_string_t *description = NULL; - pn_data_t *info = NULL; - - if (pn_condition_is_set(&endpoint->condition)) { - name = endpoint->condition.name; - description = endpoint->condition.description; - info = pn_condition_info(&endpoint->condition); - } - /* "DL[?DL[sSC]]" */ - pn_bytes_t buf = pn_amqp_encode_DLEQDLEsSCee(&transport->scratch_space, END, - (bool) name, ERROR, pn_string_bytes(name), pn_string_bytes(description), info); + pn_bytes_t buf = pn_amqp_encode_DLEce(&transport->scratch_space, END, &endpoint->condition); int err = pn_framing_send_amqp(transport, state->local_channel, buf); if (err) return err; pni_unmap_local_channel(session); diff --git a/c/src/core/util.h b/c/src/core/util.h index d55d7b8987..7d7ac55a62 100644 --- a/c/src/core/util.h +++ b/c/src/core/util.h @@ -31,6 +31,7 @@ #include #include #include +#include #include #include "object_private.h" @@ -65,13 +66,26 @@ static inline void pn_bytes_free(pn_bytes_t in) { free((void*)in.start); } +static inline void pni_switch_to_data(pn_bytes_t *bytes, pn_data_t **data) { + if (*data == NULL) { + *data = pn_data(0); + } + if (bytes->start) { + pn_data_clear(*data); + pn_data_decode(*data, bytes->start, bytes->size); + pn_data_rewind(*data); + + pn_bytes_free(*bytes); + *bytes = (pn_bytes_t){0, NULL}; + } +} + static inline pn_rwbytes_t pn_rwbytes_alloc(size_t size) { char* space = malloc(size); size_t s = space ? size : 0; return (pn_rwbytes_t){.size=s, .start=space}; } - static inline pn_rwbytes_t pn_rwbytes_realloc(pn_rwbytes_t *in, size_t size) { char* space = realloc(in->start, size); size_t s = space ? size : 0; @@ -83,6 +97,63 @@ static inline void pn_rwbytes_free(pn_rwbytes_t in) { free((void*)in.start); } +static inline bool pni_switch_to_raw_bytes(pn_rwbytes_t scratch, pn_data_t **data, pn_bytes_t *bytes) +{ + if (pn_data_size(*data)) { + pn_data_rewind(*data); + ssize_t size = pn_data_encode(*data, scratch.start, scratch.size); + if (size == PN_OVERFLOW) return false; + + pn_bytes_free(*bytes); + *bytes = pn_bytes_dup((pn_bytes_t){.size=size, .start=scratch.start}); + pn_data_clear(*data); + } + return true; +} + +static inline void pni_switch_to_raw(pn_rwbytes_t *scratch, pn_data_t **data, pn_bytes_t *bytes) { + if (*data == NULL || pn_data_size(*data)==0) { + return; + } + ssize_t data_size = 0; + if (PN_OVERFLOW == (data_size = pn_data_encode(*data, scratch->start, scratch->size))) { + pn_rwbytes_realloc(scratch, pn_data_encoded_size(*data)); + data_size = pn_data_encode(*data, scratch->start, scratch->size); + } + + pn_bytes_free(*bytes); + *bytes = pn_bytes_dup((pn_bytes_t){.size=data_size, .start=scratch->start}); + pn_data_clear(*data); +} + +static inline void pni_switch_to_raw_multiple(pn_rwbytes_t *scratch, pn_data_t **data, pn_bytes_t *bytes) { + if (!*data || pn_data_size(*data) == 0) { + return; + } + pn_data_rewind(*data); + // Rewind and position to first node so data type is defined. + pn_data_next(*data); + + if (pn_data_type(*data) == PN_ARRAY) { + switch (pn_data_get_array(*data)) { + case 0: + pn_bytes_free(*bytes); + *bytes = (pn_bytes_t){0, NULL}; + pn_data_clear(*data); + break; + case 1: + pn_data_enter(*data); + pn_data_narrow(*data); + pni_switch_to_raw(scratch, data, bytes); + break; + default: + pni_switch_to_raw(scratch, data, bytes); + } + } else { + pni_switch_to_raw(scratch, data, bytes); + } +} + static inline void pni_write16(char *bytes, uint16_t value) { bytes[0] = 0xFF & (value >> 8); diff --git a/c/tools/codec-generator/generate.py b/c/tools/codec-generator/generate.py index 260edf80f3..da573db132 100644 --- a/c/tools/codec-generator/generate.py +++ b/c/tools/codec-generator/generate.py @@ -114,7 +114,7 @@ def gen_consume_params(self, first_arg: int) -> List[Tuple[str, str]]: *self.gen_consume_params_list(first_arg + len(args)) ] - def gen_emit_code(self, prefix: List[str], first_arg: int, indent: int) -> List[str]: + def gen_emit_list_code(self, prefix: List[str], first_arg: int, indent: int) -> List[str]: lines = [] arg = first_arg for n in self.list: @@ -122,6 +122,18 @@ def gen_emit_code(self, prefix: List[str], first_arg: int, indent: int) -> List[ arg += n.count_args return [i for i in itertools.chain(*lines)] + def gen_emit_code(self, prefix: List[str], first_arg: int, indent: int) -> List[str]: + return [ + f'{self.mk_indent(indent)}for (bool small_encoding = true; ; small_encoding = false) {{', + f'{self.mk_indent(indent+1)}pni_compound_context c = ' + f'{self.mk_funcall("emit_list", prefix+["small_encoding", "true"])};', + f'{self.mk_indent(indent+1)}pni_compound_context compound = c;', + *(self.gen_emit_list_code(prefix, first_arg, indent + 1)), + f'{self.mk_indent(indent+1)}{self.mk_funcall("emit_end_list", prefix+["small_encoding"])};', + f'{self.mk_indent(indent+1)}if (encode_succeeded({", ".join(prefix)})) break;', + f'{self.mk_indent(indent)}}}', + ] + def gen_consume_code(self, prefix: List[str], first_arg: int, indent: int) -> List[str]: lines = [] arg = first_arg @@ -154,14 +166,7 @@ def gen_emit_code(self, prefix: List[str], first_arg: int, indent: int) -> List[ args = self.gen_args(first_arg) return [ f'{self.mk_indent(indent)}emit_descriptor({", ".join(prefix+args)});', - f'{self.mk_indent(indent)}for (bool small_encoding = true; ; small_encoding = false) {{', - f'{self.mk_indent(indent+1)}pni_compound_context c = ' - f'{self.mk_funcall("emit_list", prefix+["small_encoding", "true"])};', - f'{self.mk_indent(indent+1)}pni_compound_context compound = c;', - *super().gen_emit_code(prefix, first_arg+len(args), indent + 1), - f'{self.mk_indent(indent+1)}{self.mk_funcall("emit_end_list", prefix+["small_encoding"])};', - f'{self.mk_indent(indent+1)}if (encode_succeeded({", ".join(prefix)})) break;', - f'{self.mk_indent(indent)}}}', + *super().gen_emit_code(prefix, first_arg+len(args), indent), ] @@ -192,7 +197,7 @@ def gen_emit_code(self, prefix: List[str], first_arg: int, indent: int) -> List[ f'{self.mk_indent(indent+1)}pni_compound_context c = ' f'{self.mk_funcall("emit_array", prefix+["small_encoding"]+args)};', f'{self.mk_indent(indent+1)}pni_compound_context compound = c;', - *super().gen_emit_code(prefix, first_arg+len(args), indent + 1), + *super().gen_emit_list_code(prefix, first_arg+len(args), indent + 1), f'{self.mk_indent(indent+1)}{self.mk_funcall("emit_end_array", prefix+["small_encoding"])};', f'{self.mk_indent(indent+1)}if (encode_succeeded({", ".join(prefix)})) break;', f'{self.mk_indent(indent)}}}', @@ -276,12 +281,20 @@ def parse_item(format: str) -> Tuple[ASTNode, str]: if not b: raise ParseError(format) return DescListIgnoreTypeNode(l), rest + elif format.startswith('['): + l, rest = parse_list(format[1:]) + b, rest = expect_char(rest, ']') + if not b: + raise ParseError(format) + return ListNode('list', l, []), rest elif format.startswith('D?LR'): return ASTNode('described_maybe_type_raw', ['bool', 'uint64_t', 'pn_bytes_t'], consume_types=['bool*', 'uint64_t*', 'pn_bytes_t*']), format[4:] elif format.startswith('D?L?.'): return ASTNode('described_maybe_type_maybe_anything', ['bool', 'uint64_t', 'bool'], consume_types=['bool*', 'uint64_t*', 'bool*']), format[4:] elif format.startswith('DLC'): return ASTNode('described_type_copy', ['uint64_t', 'pn_data_t*'], consume_types=['uint64_t*', 'pn_data_t*']), format[3:] + elif format.startswith('DLR'): + return ASTNode('described_type_raw', ['uint64_t', 'pn_bytes_t'], consume_types=['uint64_t*', 'pn_bytes_t*']), format[3:] elif format.startswith('DL.'): return ASTNode('described_type_anything', ['uint64_t']), format[3:] elif format.startswith('D?L.'): @@ -290,6 +303,8 @@ def parse_item(format: str) -> Tuple[ASTNode, str]: return NullNode('described_anything'), format[3:] elif format.startswith('D.C'): return ASTNode('described_copy', ['pn_data_t*'], consume_types=['pn_data_t*']), format[3:] + elif format.startswith('D.R'): + return ASTNode('described_raw', ['pn_bytes_t'], consume_types=['pn_bytes_t*']), format[3:] elif format.startswith('@T['): l, rest = parse_list(format[3:]) b, rest = expect_char(rest, ']') @@ -307,8 +322,10 @@ def parse_item(format: str) -> Tuple[ASTNode, str]: return ASTNode('symbol', ['pn_bytes_t'], consume_types=['pn_bytes_t*']), format[1:] elif format.startswith('S'): return ASTNode('string', ['pn_bytes_t'], consume_types=['pn_bytes_t*']), format[1:] - elif format.startswith('C'): - return ASTNode('copy', ['pn_data_t*'], consume_types=['pn_data_t*']), format[1:] + elif format.startswith('c'): + return ASTNode('condition', ['pn_condition_t*'], consume_types=['pn_condition_t*']), format[1:] + elif format.startswith('d'): + return ASTNode('disposition', ['pn_disposition_t*'], consume_types=['pn_disposition_t*']), format[1:] elif format.startswith('I'): return ASTNode('uint', ['uint32_t']), format[1:] elif format.startswith('H'): @@ -320,7 +337,7 @@ def parse_item(format: str) -> Tuple[ASTNode, str]: elif format.startswith('a'): return ASTNode('atom', ['pn_atom_t*'], consume_types=['pn_atom_t*']), format[1:] elif format.startswith('M'): - return ASTNode('multiple', ['pn_data_t*']), format[1:] + return ASTNode('multiple', ['pn_bytes_t']), format[1:] elif format.startswith('o'): return ASTNode('bool', ['bool']), format[1:] elif format.startswith('B'): @@ -340,7 +357,7 @@ def parse_item(format: str) -> Tuple[ASTNode, str]: # Need to translate '@[]*?.' to legal identifier characters # These will be fairly arbitrary and just need to avoid already used codes def make_legal_identifier(s: str) -> str: - subs = {'@': 'A', '[': 'E', ']': 'e', '{': 'F', '}': 'f', '*': 'j', '.': 'q', '?': 'Q'} + subs = {'@': 'A', '[': 'E', ']': 'e', '*': 'j', '.': 'q', '?': 'Q'} r = '' for c in s: if c in subs: @@ -437,6 +454,8 @@ def emit_function(name_prefix: str, fill_spec: str, prefix_args: List[Tuple[str, prefix_emit_header = """ #include "proton/codec.h" +#include "proton/condition.h" +#include "proton/disposition.h" #include "buffer.h" #include @@ -447,6 +466,7 @@ def emit_function(name_prefix: str, fill_spec: str, prefix_args: List[Tuple[str, prefix_consume_header = """ #include "proton/codec.h" +#include "proton/condition.h" #include #include diff --git a/c/tools/codec-generator/specs.json b/c/tools/codec-generator/specs.json index 2775073578..dddf6d6f8a 100644 --- a/c/tools/codec-generator/specs.json +++ b/c/tools/codec-generator/specs.json @@ -1,42 +1,44 @@ { "fill_specs": [ - "DLC", - "DL[?DL[sSC]]", + "R", + "DLR", + "DL[c]", "DL[?HIIII]", "DL[?IIII?I?I?In?o]", "DL[?o?B?I?o?I]", "DL[@T[*s]]", "DL[Bz]", - "DL[I?o?DL[sSC]]", - "DL[IIzI?o?on?DLC?o?o?o]", - "DL[SIoBB?DL[SIsIoC?sCnCC]DL[C]nnI]", - "DL[SIoBB?DL[SIsIoC?sCnMM]?DL[SIsIoCM]nnILnnC]", - "DL[SS?I?H?InnMMC]", + "DL[I?oc]", + "DL[IIzI?o?ond?o?o?o]", + "DL[SIoBB?DL[SIsIoR?sRnRR]DL[R]nnI]", + "DL[SIoBB?DL[SIsIoR?sRnMM]?DL[SIsIoRM]nnILnnR]", + "DL[SS?I?H?InnMMR]", "DL[S]", "DL[Z]", "DL[azSSSass?t?tS?IS]", "DL[oI?I?o?DL[]]", - "DL[oIn?o?DLC]", + "DL[oIn?od]", "DL[szS]" ], "scan_specs": [ - "D.C", - "D.[.....D..D.[.....CC]]", - "D.[.....D..D.[C]...]", + "R", + "D.R", + "D.[.....D..D.[.....RR]]", + "D.[.....D..D.[R]...]", "D.[.....D..DL....]", - "D.[.....D.[.....C.C.CC]]", + "D.[.....D.[.....R.R.RR]]", "D.[?HIII?I]", "D.[?IIII?I?II.o]", - "D.[?S?S?I?HI..CCC]", + "D.[?S?S?I?HI..RRR]", "D.[I?Iz.?oo.D?LRooo]", "D.[IoR]", - "D.[sSC]", - "D.[D.[sSC]]", - "D.[SIo?B?BD.[SIsIo.s]D.[SIsIo]..IL..?C]", + "D.[sSR]", + "D.[D.[sSR]]", + "D.[SIo?B?BD.[SIsIo.s]D.[SIsIo]..IL..?R]", "D.[azSSSassttSIS]", "D.[o?BIoI]", "D.[oI?IoR]", - "D.[?o?oC]", + "D.[?o?oR]", "D.[?I?L]", "D.[sz]", "D.[s]", diff --git a/cpp/src/message.cpp b/cpp/src/message.cpp index 0a3f639f6d..7ad5155f47 100644 --- a/cpp/src/message.cpp +++ b/cpp/src/message.cpp @@ -48,10 +48,6 @@ struct message::impl { annotation_map instructions; impl(pn_message_t *msg) { - body.reset(pn_message_body(msg)); - properties.reset(pn_message_properties(msg)); - annotations.reset(pn_message_annotations(msg)); - instructions.reset(pn_message_instructions(msg)); } void clear() { @@ -237,32 +233,50 @@ bool message::inferred() const { return pn_message_is_inferred(pn_msg()); } void message::inferred(bool b) { pn_message_set_inferred(pn_msg(), b); } -void message::body(const value& x) { body() = x; } +void message::body(const value& x) { impl().body.reset(pn_message_body(pn_msg())); body() = x; } -const value& message::body() const { return impl().body; } -value& message::body() { return impl().body; } +const value& message::body() const { impl().body.reset(pn_message_body(pn_msg())); return impl().body; } +value& message::body() { impl().body.reset(pn_message_body(pn_msg())); return impl().body; } message::property_map& message::properties() { + if (impl().properties.empty()) { + impl().properties.reset(pn_message_properties(pn_msg())); + } return impl().properties; } const message::property_map& message::properties() const { + if (impl().properties.empty()) { + impl().properties.reset(pn_message_properties(pn_msg())); + } return impl().properties; } message::annotation_map& message::message_annotations() { + if (impl().annotations.empty()) { + impl().annotations.reset(pn_message_annotations(pn_msg())); + } return impl().annotations; } const message::annotation_map& message::message_annotations() const { + if (impl().annotations.empty()) { + impl().annotations.reset(pn_message_annotations(pn_msg())); + } return impl().annotations; } message::annotation_map& message::delivery_annotations() { + if (impl().instructions.empty()) { + impl().instructions.reset(pn_message_instructions(pn_msg())); + } return impl().instructions; } const message::annotation_map& message::delivery_annotations() const { + if (impl().instructions.empty()) { + impl().instructions.reset(pn_message_instructions(pn_msg())); + } return impl().instructions; }