Skip to content

Commit

Permalink
Subscription status callback initial impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed May 4, 2024
1 parent 4c05fda commit 3d31cd4
Showing 1 changed file with 109 additions and 10 deletions.
119 changes: 109 additions & 10 deletions source/mqtt_request_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -1164,24 +1164,124 @@ static void s_aws_mqtt_request_response_streaming_operation_extern_finalize(
}
}

struct on_subscription_status_changed_user_data {
struct aws_allocator *allocator;

struct aws_request_response_streaming_operation_binding *binding_ref;

enum aws_rr_streaming_subscription_event_type status;
int error_code;
};

static void s_on_subscription_status_changed_user_data_destroy(
struct on_subscription_status_changed_user_data *user_data) {
if (user_data == NULL) {
return;
}

user_data->binding_ref = s_aws_request_response_streaming_operation_binding_release(user_data->binding_ref);

aws_mem_release(user_data->allocator, user_data);
}

static struct on_subscription_status_changed_user_data *s_on_subscription_status_changed_user_data_new(
struct aws_request_response_streaming_operation_binding *binding,
enum aws_rr_streaming_subscription_event_type status,
int error_code) {

struct on_subscription_status_changed_user_data *user_data =
aws_mem_calloc(binding->allocator, 1, sizeof(struct on_subscription_status_changed_user_data));
user_data->allocator = binding->allocator;
user_data->status = status;
user_data->error_code = error_code;

user_data->binding_ref = s_aws_request_response_streaming_operation_binding_acquire(binding);

return user_data;

error:

s_on_subscription_status_changed_user_data_destroy(user_data);

return NULL;
}

static void s_napi_mqtt_streaming_operation_on_subscription_status_changed(
napi_env env,
napi_value function,
void *context,
void *user_data) {
(void)env;
(void)function;

(void)context;
(void)user_data;

struct on_subscription_status - changed_user_data *status_event = user_data;
struct aws_request_response_streaming_operation_binding *binding = status_event->binding_ref;

if (env && !binding->is_closed) {
napi_value params[3];
const size_t num_params = AWS_ARRAY_SIZE(params);

params[0] = NULL;
if (napi_get_reference_value(env, binding->node_streaming_operation_ref, &params[0]) != napi_ok ||
params[0] == NULL) {
AWS_LOGF_INFO(
AWS_LS_NODEJS_CRT_GENERAL,
"s_napi_mqtt_streaming_operation_on_subscription_status_changed - streaming operation node wrapper no "
"longer resolvable");
goto done;
}

AWS_NAPI_CALL(env, napi_create_int32(env, (int)status_event->status, &params[1]), {
AWS_LOGF_ERROR(
AWS_LS_NODEJS_CRT_GENERAL,
"s_napi_mqtt_streaming_operation_on_subscription_status_changed - failed to create status value");
goto done;
});

if (error_code == AWS_ERROR_SUCCESS) {
AWS_NAPI_CALL(env, napi_get_null(env, &params[2]), {
AWS_LOGF_ERROR(
AWS_LS_NODEJS_CRT_GENERAL,
"s_napi_mqtt_streaming_operation_on_subscription_status_changed - failed to get global null");
goto done;
});
} else {
AWS_NAPI_CALL(env, napi_create_int32(env, (int)status_event->error_code, &params[2]), {
AWS_LOGF_ERROR(
AWS_LS_NODEJS_CRT_GENERAL,
"s_napi_mqtt_streaming_operation_on_subscription_status_changed - failed to create error code "
"value");
goto done;
});
}

AWS_NAPI_ENSURE(
env,
aws_napi_dispatch_threadsafe_function(
env, binding->on_subscription_status_changed, NULL, function, num_params, params));
}

done:

s_on_subscription_status_changed_user_data_destroy(status_event);
}

static void s_mqtt_streaming_operation_on_subscription_status_changed(
enum aws_rr_streaming_subscription_event_type status,
int error_code,
void *user_data) {
(void)status;
(void)error_code;
(void)user_data;

struct aws_request_response_streaming_operation_binding *binding = user_data;

struct on_subscription_status_changed_user_data *status_changed_ud =
s_on_subscription_status_changed_user_data_new(binding, status, error_code);
if (status_changed_ud == NULL) {
return;
}

/* queue a callback in node's libuv thread */
AWS_NAPI_ENSURE(
NULL, aws_napi_queue_threadsafe_function(binding->on_subscription_status_changed, status_changed_ud));
}

struct on_incoming_publish_user_data {
Expand Down Expand Up @@ -1214,7 +1314,6 @@ static struct on_incoming_publish_user_data *s_on_incoming_publish_user_data_new
aws_mem_calloc(binding->allocator, 1, sizeof(struct on_incoming_publish_user_data));
user_data->allocator = binding->allocator;


user_data->payload = aws_mem_calloc(binding->allocator, 1, sizeof(struct aws_byte_buf));
if (aws_byte_buf_init_copy_from_cursor(user_data->payload, binding->allocator, payload)) {
goto error;
Expand Down Expand Up @@ -1296,7 +1395,8 @@ static void s_napi_mqtt_streaming_operation_on_incoming_publish(

AWS_NAPI_ENSURE(
env,
aws_napi_dispatch_threadsafe_function(env, binding->on_incoming_publish, NULL, function, num_params, params));
aws_napi_dispatch_threadsafe_function(
env, binding->on_incoming_publish, NULL, function, num_params, params));
}

done:
Expand All @@ -1307,8 +1407,7 @@ static void s_napi_mqtt_streaming_operation_on_incoming_publish(
static void s_mqtt_streaming_operation_on_incoming_publish(struct aws_byte_cursor payload, void *user_data) {
struct aws_request_response_streaming_operation_binding *binding = user_data;

struct on_incoming_publish_user_data *incoming_publish_ud =
s_on_incoming_publish_user_data_new(binding, payload);
struct on_incoming_publish_user_data *incoming_publish_ud = s_on_incoming_publish_user_data_new(binding, payload);
if (incoming_publish_ud == NULL) {
return;
}
Expand Down

0 comments on commit 3d31cd4

Please sign in to comment.