diff --git a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiCompositeConfig.java b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiCompositeConfig.java index ec6f12730d..cc549f8db7 100644 --- a/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiCompositeConfig.java +++ b/runtime/binding-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/asyncapi/internal/config/AsyncapiCompositeConfig.java @@ -18,7 +18,6 @@ import static java.util.stream.Collectors.toMap; import java.util.List; -import java.util.function.LongFunction; import java.util.function.ToLongFunction; import org.agrona.collections.Long2ObjectHashMap; @@ -35,7 +34,6 @@ public final class AsyncapiCompositeConfig public final List routes; public final List namespaces; - private final LongFunction resolveLabel; private final ToLongFunction resolveSchemaId; private Long2ObjectHashMap operationsById; @@ -56,10 +54,6 @@ public AsyncapiCompositeConfig( this.routes = routes; this.namespaces = namespaces; - final Long2ObjectHashMap labelsBySchemaId = new Long2ObjectHashMap<>(); - schemas.forEach(s -> labelsBySchemaId.put(s.schemaId, s.apiLabel)); - this.resolveLabel = labelsBySchemaId::get; - final Object2LongHashMap schemaIdsByLabel = new Object2LongHashMap<>(NO_SCHEMA_ID); schemas.forEach(s -> schemaIdsByLabel.put(s.apiLabel, s.schemaId)); this.resolveSchemaId = schemaIdsByLabel::get; @@ -93,12 +87,6 @@ public AsyncapiCompositeRouteConfig resolve( .orElse(null); } - public String resolveApiId( - long apiId) - { - return resolveLabel.apply(apiId); - } - public long resolveApiId( String apiId) { diff --git a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncNamespaceGenerator.java b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncNamespaceGenerator.java deleted file mode 100644 index 3cba779052..0000000000 --- a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncNamespaceGenerator.java +++ /dev/null @@ -1,479 +0,0 @@ -/* - * Copyright 2021-2024 Aklivity Inc - * - * Licensed under the Aklivity Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * https://www.aklivity.io/aklivity-community-license/ - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config; - -import static io.aklivity.zilla.runtime.engine.config.KindConfig.PROXY; -import static java.util.Collections.emptyList; -import static java.util.stream.Collectors.toList; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.ToLongFunction; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiOperationView; -import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiReplyView; -import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiView; -import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaConditionConfig; -import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithConfig; -import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfigBuilder; -import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchFilterConfig; -import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchMergeConfig; -import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceAsyncHeaderConfig; -import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceConfigBuilder; -import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiHeaderView; -import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiMediaTypeView; -import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiOperationView; -import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiPathView; -import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiResponseView; -import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiSchemaView; -import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiView; -import io.aklivity.zilla.runtime.engine.config.BindingConfig; -import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; -import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; -import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; -import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; -import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder; -import io.aklivity.zilla.runtime.engine.config.TelemetryRefConfigBuilder; - -public final class OpenapiAsyncNamespaceGenerator -{ - private static final String CORRELATION_ID = "\\{correlationId\\}"; - private static final String PARAMETERS = "\\{(?!correlationId)(\\w+)\\}"; - private static final Pattern JSON_CONTENT_TYPE = Pattern.compile("^application/(?:.+\\+)?json$"); - private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\{([^}]+)\\}"); - private static final Pattern CORRELATION_PATTERN = Pattern.compile(CORRELATION_ID); - - private final Matcher jsonContentType = JSON_CONTENT_TYPE.matcher(""); - private final Matcher parameters = PARAMETER_PATTERN.matcher(""); - private final Matcher correlation = CORRELATION_PATTERN.matcher(""); - - public NamespaceConfig generate( - BindingConfig binding, - Map openapis, - Map asyncapis, - ToLongFunction resolveApiId) - { - final List metricRefs = binding.telemetryRef != null ? - binding.telemetryRef.metricRefs : emptyList(); - - List routes = binding.routes.stream() - .map(r -> new OpenapiAsyncapiRouteConfig(r, resolveApiId)) - .collect(toList()); - - return NamespaceConfig.builder() - .name(String.format("%s/http_kafka", binding.qname)) - .inject(n -> this.injectNamespaceMetric(n, !metricRefs.isEmpty())) - .binding() - .name("http_kafka0") - .type("http-kafka") - .kind(PROXY) - .inject(b -> this.injectMetrics(b, metricRefs, "http-kafka")) - .inject(b -> this.injectHttpKafkaRoutes(b, binding.qname, openapis, asyncapis, routes)) - .build() - .build(); - } - - private BindingConfigBuilder injectHttpKafkaRoutes( - BindingConfigBuilder binding, - String qname, - Map openapis, - Map asyncapis, - List routes) - { - for (OpenapiAsyncapiRouteConfig route : routes) - { - for (OpenapiAsyncapiConditionConfig condition : route.when) - { - Optional openapiConfig = openapis.entrySet().stream() - .filter(e -> e.getKey().equals(condition.apiId)) - .map(Map.Entry::getValue) - .findFirst(); - Optional asyncapiConfig = asyncapis.entrySet().stream() - .filter(e -> e.getKey().equals(route.with.apiId)) - .map(Map.Entry::getValue) - .findFirst(); - - if (openapiConfig.isPresent() && asyncapiConfig.isPresent()) - { - final OpenapiView openapi = openapiConfig.get(); - final AsyncapiView asyncapi = asyncapiConfig.get(); - - computeRoutes(binding, qname, condition, openapi, asyncapi); - } - - } - } - - return binding; - } - - private void computeRoutes( - BindingConfigBuilder binding, - String qname, - OpenapiAsyncapiConditionConfig condition, - OpenapiView openapi, - AsyncapiView asyncapi) - { - for (String item : openapi.paths.keySet()) - { - OpenapiPathView path = openapi.paths.get(item); - for (String method : path.methods.keySet()) - { - final String operationId = condition.operationId != null - ? condition.operationId - : path.methods.get(method).id; - - final OpenapiOperationView openapiOperation = path.methods.get(method); - final Optional asyncapiOperation = findAsyncOperation( - item, openapi, asyncapi, openapiOperation, operationId); - - asyncapiOperation.ifPresent(operation -> - { - final List paramNames = findParams(item); - - binding - .route() - .exit(qname) - .when(HttpKafkaConditionConfig::builder) - .method(method) - .path(item) - .build() - .inject(r -> injectHttpKafkaRouteWith(r, openapi, openapiOperation, operation, paramNames)) - .build(); - }); - } - } - } - - private Optional findAsyncOperation( - String path, - OpenapiView openapi, - AsyncapiView asyncapi, - OpenapiOperationView openapiOperation, - String operationId) - { - Optional operation = findAsyncOperationByOperationId(asyncapi.operations, operationId); - - if (operation.isEmpty() && isOpenapiOperationAsync(openapiOperation)) - { - Optional correlatedOperationId = findOpenapiOperationIdByFormat(path, openapi); - if (correlatedOperationId.isPresent()) - { - operation = findAsyncOperationByOperationId(asyncapi.operations, correlatedOperationId.get()); - } - } - return operation; - } - - private Optional findAsyncOperationByOperationId( - Map operations, - String operationId) - { - return operations.entrySet().stream() - .filter(f -> f.getKey().equals(operationId)) - .map(Map.Entry::getValue) - .findFirst(); - } - - private Optional findOpenapiOperationIdByFormat( - String format, - OpenapiView openapi) - { - String operationId = null; - correlated: - for (String item : openapi.paths.keySet()) - { - if (!item.equals(format)) - { - OpenapiPathView path = openapi.paths.get(item); - for (String method : path.methods.keySet()) - { - final OpenapiOperationView openapiOperation = path.methods.get(method); - boolean formatMatched = openapiOperation.responses.entrySet().stream() - .anyMatch(o -> - { - OpenapiResponseView content = o.getValue(); - return "202".equals(o.getKey()) && content.headers.entrySet().stream() - .anyMatch(c -> matchFormat(format, c.getValue())); - }); - - if (formatMatched) - { - operationId = path.methods.get(method).id; - break correlated; - } - } - } - } - - return Optional.ofNullable(operationId); - } - - private boolean isOpenapiOperationAsync( - OpenapiOperationView openapiOperation) - { - return openapiOperation.responses.entrySet().stream() - .anyMatch(o -> - { - OpenapiResponseView content = o.getValue(); - return "202".equals(o.getKey()) && content.headers.entrySet().stream() - .anyMatch(c -> hasCorrelationId(c.getValue())); - }); - } - - private List findParams( - String item) - { - List paramNames = new ArrayList<>(); - Matcher matcher = parameters.reset(item); - while (matcher.find()) - { - paramNames.add(parameters.group(1)); - } - return paramNames; - } - - private RouteConfigBuilder injectHttpKafkaRouteWith( - RouteConfigBuilder route, - OpenapiView openapi, - OpenapiOperationView httpOperation, - AsyncapiOperationView kafkaOperation, - List paramNames) - { - switch (kafkaOperation.action) - { - case "receive": - route.with(HttpKafkaWithConfig::builder) - .fetch() - .topic(kafkaOperation.channel.address) - .inject(with -> injectHttpKafkaRouteFetchWith(with, openapi, httpOperation, paramNames)) - .build() - .build(); - break; - case "send": - route.with(HttpKafkaWithConfig::builder) - .produce() - .topic(kafkaOperation.channel.address) - .inject(w -> injectHttpKafkaRouteProduceWith(w, httpOperation, kafkaOperation, paramNames)) - .build() - .build(); - break; - } - - return route; - } - - private HttpKafkaWithFetchConfigBuilder injectHttpKafkaRouteFetchWith( - HttpKafkaWithFetchConfigBuilder fetch, - OpenapiView openapi, - OpenapiOperationView operation, - List paramNames) - { - merge: - for (Map.Entry response : operation.responses.entrySet()) - { - OpenapiSchemaView schema = resolveSchemaForJsonContentType(response.getValue().content, openapi); - - if (schema != null && "array".equals(schema.type)) - { - fetch.merged(HttpKafkaWithFetchMergeConfig.builder() - .contentType("application/json") - .initial("[]") - .path("/-") - .build()); - break merge; - } - } - - if (!paramNames.isEmpty()) - { - fetch.filters(List.of(HttpKafkaWithFetchFilterConfig.builder() - .key(String.format("${params.%s}", paramNames.get(paramNames.size() - 1))) - .build())); - } - - return fetch; - } - - private HttpKafkaWithProduceConfigBuilder injectHttpKafkaRouteProduceWith( - HttpKafkaWithProduceConfigBuilder produce, - OpenapiOperationView openapiOperation, - AsyncapiOperationView asyncapiOperation, - List paramNames) - { - final String key = !paramNames.isEmpty() ? String.format("${params.%s}", - paramNames.get(paramNames.size() - 1)) : "${idempotencyKey}"; - - produce.acks("in_sync_replicas").key(key); - - for (Map.Entry response : openapiOperation.responses.entrySet()) - { - if ("202".equals(response.getKey())) - { - OpenapiResponseView content = response.getValue(); - boolean async = content.headers.entrySet().stream() - .anyMatch(e -> hasCorrelationId(e.getValue())); - - if (async) - { - content.headers.forEach((k, v) -> - { - String location = v.schema.format; - location = location.replaceAll(CORRELATION_ID, "\\${correlationId}"); - location = location.replaceAll(PARAMETERS, "\\${params.$1}"); - produce.async(HttpKafkaWithProduceAsyncHeaderConfig.builder() - .name(k) - .value(location) - .build()); - }); - } - } - } - AsyncapiReplyView reply = asyncapiOperation.reply; - if (reply != null) - { - produce.replyTo(reply.channel.address); - } - - produce.build(); - - return produce; - } - - private boolean hasCorrelationId( - OpenapiHeaderView header) - { - boolean hasCorrelationId = false; - OpenapiSchemaView schema = header.schema; - if (schema != null && - schema.format != null) - { - hasCorrelationId = correlation.reset(schema.format).find(); - } - return hasCorrelationId; - } - - private boolean matchFormat( - String format, - OpenapiHeaderView header) - { - boolean matched = false; - OpenapiSchemaView schema = header.schema; - if (schema != null && - schema.format != null) - { - matched = schema.format.equals(format); - } - - return matched; - } - - private NamespaceConfigBuilder injectNamespaceMetric( - NamespaceConfigBuilder namespace, - boolean hasMetrics) - { - if (hasMetrics) - { - namespace - .telemetry() - .metric() - .group("stream") - .name("stream.active.received") - .build() - .metric() - .group("stream") - .name("stream.active.sent") - .build() - .metric() - .group("stream") - .name("stream.opens.received") - .build() - .metric() - .group("stream") - .name("stream.opens.sent") - .build() - .metric() - .group("stream") - .name("stream.data.received") - .build() - .metric() - .group("stream") - .name("stream.data.sent") - .build() - .metric() - .group("stream") - .name("stream.errors.received") - .build() - .metric() - .group("stream") - .name("stream.errors.sent") - .build() - .metric() - .group("stream") - .name("stream.closes.received") - .build() - .metric() - .group("stream") - .name("stream.closes.sent") - .build() - .build(); - } - - return namespace; - } - - protected BindingConfigBuilder injectMetrics( - BindingConfigBuilder binding, - List metricRefs, - String protocol) - { - List metrics = metricRefs.stream() - .filter(m -> m.name.startsWith("stream.")) - .collect(toList()); - - if (!metrics.isEmpty()) - { - final TelemetryRefConfigBuilder> telemetry = binding.telemetry(); - metrics.forEach(telemetry::metric); - telemetry.build(); - } - - return binding; - } - - private OpenapiSchemaView resolveSchemaForJsonContentType( - Map content, - OpenapiView openApi) - { - OpenapiMediaTypeView response = null; - if (content != null) - { - for (String contentType : content.keySet()) - { - if (jsonContentType.reset(contentType).matches()) - { - response = content.get(contentType); - break; - } - } - } - - return response == null ? null : response.schema; - } -} diff --git a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiBindingConfig.java b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiBindingConfig.java index 80181d22bc..6c6ad7099c 100644 --- a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiBindingConfig.java +++ b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiBindingConfig.java @@ -16,193 +16,67 @@ import static java.util.stream.Collectors.toList; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.function.Consumer; import java.util.function.LongFunction; +import java.util.function.ToIntFunction; +import java.util.function.ToLongBiFunction; import java.util.function.ToLongFunction; -import java.util.stream.Collectors; -import org.agrona.collections.Long2LongHashMap; -import org.agrona.collections.Object2LongHashMap; -import org.agrona.collections.Object2ObjectHashMap; - -import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiCatalogConfig; -import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiSchemaConfig; -import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiSpecificationConfig; -import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi; -import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.parser.AsyncapiParser; -import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiView; import io.aklivity.zilla.runtime.binding.openapi.asyncapi.config.OpenapiAsyncapiOptionsConfig; -import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiCatalogConfig; -import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiParser; -import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiSchemaConfig; -import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiSpecificationConfig; -import io.aklivity.zilla.runtime.binding.openapi.internal.model.Openapi; -import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiView; +import io.aklivity.zilla.runtime.engine.EngineContext; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.config.BindingConfig; import io.aklivity.zilla.runtime.engine.config.KindConfig; +import io.aklivity.zilla.runtime.engine.config.MetricRefConfig; import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; -import io.aklivity.zilla.runtime.engine.namespace.NamespacedId; public final class OpenapiAsyncapiBindingConfig { public final long id; - public final String name; + public final String namespace; + public final String qname; public final KindConfig kind; public final OpenapiAsyncapiOptionsConfig options; public final List routes; + public final List metricRefs; - private final OpenapiAsyncNamespaceGenerator namespaceGenerator; - private final LongFunction supplyCatalog; - private final ToLongFunction resolvedIds; - private final Long2LongHashMap compositeResolvedIds; - private final Object2LongHashMap openapiSchemaIdsByApiId; - private final Object2LongHashMap asyncapiSchemaIdsByApiId; - private final AsyncapiParser asyncapiParser; - private final OpenapiParser openapiParser; - private final Consumer attach; - private final Consumer detach; + public final ToLongFunction resolveId; + public final LongFunction supplyCatalog; + public final ToIntFunction supplyTypeId; + public final ToLongBiFunction supplyBindingId; - private NamespaceConfig composite; - private int httpKafkaOrigin; + public transient OpenapiAsyncapiCompositeConfig composite; public OpenapiAsyncapiBindingConfig( - BindingConfig binding, - OpenapiAsyncNamespaceGenerator namespaceGenerator, - LongFunction supplyCatalog, - Consumer attachComposite, - Consumer detachComposite) + EngineContext context, + BindingConfig binding) { this.id = binding.id; - this.name = binding.name; + this.namespace = binding.namespace; + this.qname = binding.qname; this.kind = binding.kind; this.options = (OpenapiAsyncapiOptionsConfig) binding.options; - this.resolvedIds = binding.resolveId; - this.attach = attachComposite; - this.detach = detachComposite; - this.openapiSchemaIdsByApiId = new Object2LongHashMap<>(-1); - this.asyncapiSchemaIdsByApiId = new Object2LongHashMap<>(-1); - this.compositeResolvedIds = new Long2LongHashMap(-1); - this.openapiParser = new OpenapiParser(); - this.asyncapiParser = new AsyncapiParser(); - this.routes = binding.routes.stream() - .map(r -> new OpenapiAsyncapiRouteConfig(r, openapiSchemaIdsByApiId::get)) + .map(OpenapiAsyncapiRouteConfig::new) .collect(toList()); - this.namespaceGenerator = namespaceGenerator; - this.supplyCatalog = supplyCatalog; - } - - public boolean isCompositeOriginId( - long originId) - { - return httpKafkaOrigin == NamespacedId.namespaceId(originId); - } - - public long resolveResolvedId( - long apiId) - { - return compositeResolvedIds.get(apiId); - } - - public long resolveAsyncapiApiId( - String apiId) - { - return asyncapiSchemaIdsByApiId.get(apiId); + this.metricRefs = + binding.telemetryRef != null && binding.telemetryRef.metricRefs != null + ? binding.telemetryRef.metricRefs + : List.of(); + this.resolveId = binding.resolveId; + this.supplyCatalog = context::supplyCatalog; + this.supplyTypeId = context::supplyTypeId; + this.supplyBindingId = context::supplyBindingId; } public OpenapiAsyncapiRouteConfig resolve( long authorization, - long apiId) + String apiId, + String operationId) { return routes.stream() - .filter(r -> r.authorized(authorization) && r.matches(apiId)) + .filter(r -> r.authorized(authorization) && r.matches(apiId, operationId)) .findFirst() .orElse(null); } - - public void attach( - BindingConfig binding) - { - final List openapiConfigs = convertToOpenapi(options.specs.openapi); - final List asyncapiConfigs = convertToAsyncapi(options.specs.asyncapi); - - final Map openapis = openapiConfigs.stream() - .collect(Collectors.toMap( - c -> c.apiLabel, - c -> c.openapi, - (e, n) -> e, - Object2ObjectHashMap::new)); - - final Map asyncapis = asyncapiConfigs.stream() - .collect(Collectors.toMap( - c -> c.apiLabel, - c -> c.asyncapi, - (e, n) -> e, - Object2ObjectHashMap::new)); - - this.composite = namespaceGenerator.generate(binding, openapis, asyncapis, openapiSchemaIdsByApiId::get); - attach.accept(this.composite); - - BindingConfig mappingBinding = composite.bindings.stream() - .filter(b -> b.type.equals("http-kafka")).findFirst().get(); - - httpKafkaOrigin = NamespacedId.namespaceId(mappingBinding.id); - - openapiConfigs.forEach(o -> - { - compositeResolvedIds.put(o.schemaId, mappingBinding.id); - openapiSchemaIdsByApiId.put(o.apiLabel, o.schemaId); - }); - asyncapiConfigs.forEach(a -> asyncapiSchemaIdsByApiId.put(a.apiLabel, a.schemaId)); - } - - public void detach() - { - detach.accept(composite); - } - - private List convertToAsyncapi( - Set configs) - { - final List asyncapiConfigs = new ArrayList<>(); - for (AsyncapiSpecificationConfig config : configs) - { - for (AsyncapiCatalogConfig catalog : config.catalogs) - { - final long catalogId = resolvedIds.applyAsLong(catalog.name); - final CatalogHandler handler = supplyCatalog.apply(catalogId); - final int schemaId = handler.resolve(catalog.subject, catalog.version); - final String payload = handler.resolve(schemaId); - final Asyncapi model = asyncapiParser.parse(payload); - final AsyncapiView asyncapi = AsyncapiView.of(model); - asyncapiConfigs.add(new AsyncapiSchemaConfig(config.label, schemaId, asyncapi)); - } - } - return asyncapiConfigs; - } - - private List convertToOpenapi( - Set configs) - { - final List openapiConfigs = new ArrayList<>(); - for (OpenapiSpecificationConfig config : configs) - { - for (OpenapiCatalogConfig catalog : config.catalogs) - { - final long catalogId = resolvedIds.applyAsLong(catalog.name); - final CatalogHandler handler = supplyCatalog.apply(catalogId); - final int schemaId = handler.resolve(catalog.subject, catalog.version); - final String payload = handler.resolve(schemaId); - final Openapi model = openapiParser.parse(payload); - final OpenapiView openapi = OpenapiView.of(model); - openapiConfigs.add(new OpenapiSchemaConfig(config.label, schemaId, openapi)); - } - } - return openapiConfigs; - } } diff --git a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiCompositeConditionConfig.java b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiCompositeConditionConfig.java new file mode 100644 index 0000000000..00a7b2cba2 --- /dev/null +++ b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiCompositeConditionConfig.java @@ -0,0 +1,51 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config; + +import io.aklivity.zilla.runtime.engine.config.ConditionConfig; + +public class OpenapiAsyncapiCompositeConditionConfig extends ConditionConfig +{ + public final long apiId; + public final int operationTypeId; + + public OpenapiAsyncapiCompositeConditionConfig( + long apiId, + int operationTypeId) + { + this.apiId = apiId; + this.operationTypeId = operationTypeId; + } + + public boolean matches( + long apiId, + int operationTypeId) + { + return matchesApiId(apiId) && + matchesOperationTypeId(operationTypeId); + } + + private boolean matchesApiId( + long apiId) + { + return this.apiId == apiId; + } + + private boolean matchesOperationTypeId( + int operationTypeId) + { + return this.operationTypeId == operationTypeId; + } +} diff --git a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiCompositeConfig.java b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiCompositeConfig.java new file mode 100644 index 0000000000..0b2f6cdd9f --- /dev/null +++ b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiCompositeConfig.java @@ -0,0 +1,90 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config; + +import static io.aklivity.zilla.runtime.engine.catalog.CatalogHandler.NO_SCHEMA_ID; +import static java.util.stream.Collectors.toMap; + +import java.util.List; +import java.util.function.ToLongFunction; + +import org.agrona.collections.Long2ObjectHashMap; +import org.agrona.collections.Object2LongHashMap; + +import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiSchemaConfig; +import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiSchemaConfig; +import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiOperationView; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; +import io.aklivity.zilla.runtime.engine.namespace.NamespacedId; + +public final class OpenapiAsyncapiCompositeConfig +{ + public final List routes; + public final List namespaces; + + private final ToLongFunction resolveSchemaId; + + private Long2ObjectHashMap operationsById; + + public OpenapiAsyncapiCompositeConfig( + List openapis, + List asyncapis, + List namespaces, + List routes) + { + this.routes = routes; + this.namespaces = namespaces; + + final Object2LongHashMap schemaIdsByLabel = new Object2LongHashMap<>(NO_SCHEMA_ID); + asyncapis.forEach(s -> schemaIdsByLabel.put(s.apiLabel, s.schemaId)); + this.resolveSchemaId = schemaIdsByLabel::get; + + this.operationsById = openapis.stream() + .map(s -> s.openapi) + .flatMap(v -> v.operations.values().stream()) + .collect(toMap(o -> o.compositeId, o -> o, (o1, o2) -> o1, Long2ObjectHashMap::new)); + } + + public boolean hasBindingId( + long bindingId) + { + return namespaces.stream() + .mapToInt(n -> n.id) + .anyMatch(id -> id == NamespacedId.namespaceId(bindingId)); + } + + public OpenapiAsyncapiCompositeRouteConfig resolve( + long authorization, + long apiId, + int operationTypeId) + { + return routes.stream() + .filter(r -> r.matches(apiId, operationTypeId)) + .findFirst() + .orElse(null); + } + + public long resolveApiId( + String apiId) + { + return resolveSchemaId.applyAsLong(apiId); + } + + public OpenapiOperationView resolveOperation( + long compositeId) + { + return operationsById.get(compositeId); + } +} diff --git a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiCompositeRouteConfig.java b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiCompositeRouteConfig.java new file mode 100644 index 0000000000..cbd719485f --- /dev/null +++ b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiCompositeRouteConfig.java @@ -0,0 +1,49 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config; + +import java.util.List; + +public final class OpenapiAsyncapiCompositeRouteConfig +{ + public final long id; + public final OpenapiAsyncapiCompositeWithConfig with; + public final List when; + + // TODO: builder instead of overloaded constructors + public OpenapiAsyncapiCompositeRouteConfig( + long id, + OpenapiAsyncapiCompositeConditionConfig when) + { + this(id, List.of(when), null); + } + + public OpenapiAsyncapiCompositeRouteConfig( + long id, + List when, + OpenapiAsyncapiCompositeWithConfig with) + { + this.id = id; + this.when = when; + this.with = with; + } + + boolean matches( + long apiId, + int operationTypeId) + { + return when.isEmpty() || when.stream().anyMatch(m -> m.matches(apiId, operationTypeId)); + } +} diff --git a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiCompositeWithConfig.java b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiCompositeWithConfig.java new file mode 100644 index 0000000000..510b85a396 --- /dev/null +++ b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiCompositeWithConfig.java @@ -0,0 +1,29 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config; + +public class OpenapiAsyncapiCompositeWithConfig +{ + public final long apiId; + public final String operationId; + + public OpenapiAsyncapiCompositeWithConfig( + long apiId, + String operationId) + { + this.apiId = apiId; + this.operationId = operationId; + } +} diff --git a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiConditionConfig.java b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiConditionConfig.java index 5e86b3837f..1bec1db25e 100644 --- a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiConditionConfig.java +++ b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiConditionConfig.java @@ -14,8 +14,6 @@ */ package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config; -import java.util.function.ToLongFunction; - import io.aklivity.zilla.runtime.engine.config.ConditionConfig; public class OpenapiAsyncapiConditionConfig extends ConditionConfig @@ -32,16 +30,22 @@ public OpenapiAsyncapiConditionConfig( } public boolean matches( - long apiId, - ToLongFunction supplyApiId) + String apiId, + String operationId) { - return matchesApiId(apiId, supplyApiId); + return matchesApiId(apiId) && + matchesOperationId(operationId); } private boolean matchesApiId( - long apiId, - ToLongFunction supplyApiId) + String apiId) + { + return this.apiId == null || this.apiId.equals(apiId); + } + + private boolean matchesOperationId( + String operationId) { - return supplyApiId.applyAsLong(this.apiId) == apiId; + return this.operationId == null || this.operationId.equals(operationId); } } diff --git a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiRouteConfig.java b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiRouteConfig.java index b55e5a7bd2..f6ce075a49 100644 --- a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiRouteConfig.java +++ b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/OpenapiAsyncapiRouteConfig.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.function.LongPredicate; -import java.util.function.ToLongFunction; import io.aklivity.zilla.runtime.engine.config.RouteConfig; @@ -28,12 +27,10 @@ public final class OpenapiAsyncapiRouteConfig public final OpenapiAsyncapiWithConfig with; public final List when; - private final ToLongFunction supplyApiId; private final LongPredicate authorized; public OpenapiAsyncapiRouteConfig( - RouteConfig route, - ToLongFunction supplyApiId) + RouteConfig route) { this.id = route.id; this.authorized = route.authorized; @@ -41,7 +38,6 @@ public OpenapiAsyncapiRouteConfig( .map(OpenapiAsyncapiConditionConfig.class::cast) .collect(toList()); this.with = (OpenapiAsyncapiWithConfig) route.with; - this.supplyApiId = supplyApiId; } boolean authorized( @@ -51,8 +47,9 @@ boolean authorized( } boolean matches( - long apiId) + String apiId, + String operationId) { - return when.isEmpty() || when.stream().anyMatch(m -> m.matches(apiId, supplyApiId)); + return when.isEmpty() || when.stream().anyMatch(m -> m.matches(apiId, operationId)); } } diff --git a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/composite/OpenapiAsyncapiCompositeGenerator.java b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/composite/OpenapiAsyncapiCompositeGenerator.java new file mode 100644 index 0000000000..e271d4b85c --- /dev/null +++ b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/composite/OpenapiAsyncapiCompositeGenerator.java @@ -0,0 +1,236 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.composite; + +import java.util.ArrayList; +import java.util.List; + +import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiCatalogConfig; +import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiSchemaConfig; +import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiServerConfig; +import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiSpecificationConfig; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.parser.AsyncapiParser; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiView; +import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.OpenapiAsyncapiBindingConfig; +import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.OpenapiAsyncapiCompositeConfig; +import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiCatalogConfig; +import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiParser; +import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiSchemaConfig; +import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiServerConfig; +import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiSpecificationConfig; +import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiView; +import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; +import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.GuardedConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; + +public abstract class OpenapiAsyncapiCompositeGenerator +{ + public final OpenapiAsyncapiCompositeConfig generate( + OpenapiAsyncapiBindingConfig binding) + { + int tagIndex = 1; + + final OpenapiParser openapiParser = new OpenapiParser(); + final List openapiSchemas = new ArrayList<>(); + for (OpenapiSpecificationConfig openapiSpec : binding.options.specs.openapi) + { + final String label = openapiSpec.label; + + for (OpenapiCatalogConfig catalog : openapiSpec.catalogs) + { + final long catalogId = binding.resolveId.applyAsLong(catalog.name); + final CatalogHandler handler = binding.supplyCatalog.apply(catalogId); + final int schemaId = handler.resolve(catalog.subject, catalog.version); + final String payload = handler.resolve(schemaId); + final List configs = + openapiSpec.servers == null || openapiSpec.servers.isEmpty() + ? List.of(OpenapiServerConfig.builder().build()) + : openapiSpec.servers; + final OpenapiView openapi = OpenapiView.of(tagIndex++, label, openapiParser.parse(payload), configs); + + openapiSchemas.add(new OpenapiSchemaConfig(label, schemaId, openapi)); + } + } + + final AsyncapiParser asyncapiParser = new AsyncapiParser(); + final List asyncapiSchemas = new ArrayList<>(); + for (AsyncapiSpecificationConfig asyncapiSpec : binding.options.specs.asyncapi) + { + final String label = asyncapiSpec.label; + + for (AsyncapiCatalogConfig catalog : asyncapiSpec.catalogs) + { + final long catalogId = binding.resolveId.applyAsLong(catalog.name); + final CatalogHandler handler = binding.supplyCatalog.apply(catalogId); + final int schemaId = handler.resolve(catalog.subject, catalog.version); + final String payload = handler.resolve(schemaId); + final List configs = + asyncapiSpec.servers == null || asyncapiSpec.servers.isEmpty() + ? List.of(AsyncapiServerConfig.builder().build()) + : asyncapiSpec.servers; + final AsyncapiView asyncapi = AsyncapiView.of(tagIndex++, label, asyncapiParser.parse(payload), configs); + + asyncapiSchemas.add(new AsyncapiSchemaConfig(label, schemaId, asyncapi)); + } + } + + return generate(binding, openapiSchemas, asyncapiSchemas); + } + + protected abstract OpenapiAsyncapiCompositeConfig generate( + OpenapiAsyncapiBindingConfig binding, + List openapis, + List asyncapis); + + @FunctionalInterface + public interface NamespaceInjector + { + NamespaceConfigBuilder inject( + NamespaceConfigBuilder builder); + } + + protected abstract class NamespaceHelper + { + protected final OpenapiAsyncapiBindingConfig config; + protected final String name; + + protected NamespaceHelper( + OpenapiAsyncapiBindingConfig config, + String name) + { + this.config = config; + this.name = name; + } + + public final NamespaceConfigBuilder injectAll( + NamespaceConfigBuilder namespace) + { + return namespace + .inject(this::injectName) + .inject(this::injectMetrics) + .inject(this::injectComponents); + } + + protected abstract NamespaceConfigBuilder injectComponents( + NamespaceConfigBuilder namespace); + + protected final String resolveIdentity( + String value) + { + if ("{identity}".equals(value)) + { + value = String.format("${guarded['%s:jwt0'].identity}", config.namespace); + } + + return value; + } + + private NamespaceConfigBuilder injectName( + NamespaceConfigBuilder namespace) + { + return namespace.name("%s/%s".formatted(config.qname, name)); + } + + private NamespaceConfigBuilder injectMetrics( + NamespaceConfigBuilder namespace) + { + if (!config.metricRefs.isEmpty()) + { + namespace + .telemetry() + .metric() + .group("stream") + .name("stream.active.received") + .build() + .metric() + .group("stream") + .name("stream.active.sent") + .build() + .metric() + .group("stream") + .name("stream.opens.received") + .build() + .metric() + .group("stream") + .name("stream.opens.sent") + .build() + .metric() + .group("stream") + .name("stream.data.received") + .build() + .metric() + .group("stream") + .name("stream.data.sent") + .build() + .metric() + .group("stream") + .name("stream.errors.received") + .build() + .metric() + .group("stream") + .name("stream.errors.sent") + .build() + .metric() + .group("stream") + .name("stream.closes.received") + .build() + .metric() + .group("stream") + .name("stream.closes.sent") + .build() + .build(); + } + + return namespace; + } + + protected abstract class BindingsHelper + { + protected static final String REGEX_ADDRESS_PARAMETER = "\\{[^}]+\\}"; + + protected abstract NamespaceConfigBuilder injectAll( + NamespaceConfigBuilder namespace); + + protected final BindingConfigBuilder injectMetrics( + BindingConfigBuilder binding) + { + if (config.metricRefs.stream() + .anyMatch(m -> m.name.startsWith("stream."))) + { + binding.telemetry() + .metric() + .name("stream.*") + .build() + .build(); + } + + return binding; + } + + protected final GuardedConfigBuilder injectGuardedRoles( + GuardedConfigBuilder guarded, + List roles) + { + for (String role : roles) + { + guarded.role(role); + } + + return guarded; + } + } + } +} diff --git a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/composite/OpenapiAsyncapiCompositeId.java b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/composite/OpenapiAsyncapiCompositeId.java new file mode 100644 index 0000000000..baa7391e23 --- /dev/null +++ b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/composite/OpenapiAsyncapiCompositeId.java @@ -0,0 +1,42 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.composite; + +public final class OpenapiAsyncapiCompositeId +{ + public static int apiId( + long compositeId) + { + return (int)(compositeId >> Integer.SIZE) & 0xffff_ffff; + } + + public static int operationId( + long compositeId) + { + return (int)(compositeId >> 0) & 0xffff_ffff; + } + + public static long compositeId( + final int apiId, + final int operationId) + { + return (long) apiId << Integer.SIZE | + (long) operationId << 0; + } + + private OpenapiAsyncapiCompositeId() + { + } +} diff --git a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/composite/OpenapiAsyncapiProxyGenerator.java b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/composite/OpenapiAsyncapiProxyGenerator.java new file mode 100644 index 0000000000..c1569993b3 --- /dev/null +++ b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/config/composite/OpenapiAsyncapiProxyGenerator.java @@ -0,0 +1,547 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.composite; + +import static io.aklivity.zilla.runtime.engine.config.KindConfig.PROXY; +import static java.util.function.Function.identity; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiSchemaConfig; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiOperationView; +import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiReplyView; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaConditionConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchConfigBuilder; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchFilterConfigBuilder; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithFetchMergeConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceAsyncHeaderConfig; +import io.aklivity.zilla.runtime.binding.http.kafka.config.HttpKafkaWithProduceConfigBuilder; +import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.OpenapiAsyncapiBindingConfig; +import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.OpenapiAsyncapiCompositeConditionConfig; +import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.OpenapiAsyncapiCompositeConfig; +import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.OpenapiAsyncapiCompositeRouteConfig; +import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.OpenapiAsyncapiRouteConfig; +import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiSchemaConfig; +import io.aklivity.zilla.runtime.binding.openapi.internal.model.extensions.http.kafka.OpenapiHttpKafkaFilter; +import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiHeaderView; +import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiOperationView; +import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiResponseView; +import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiSchemaView; +import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiSecurityRequirementView; +import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiSecuritySchemeView; +import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiServerView; +import io.aklivity.zilla.runtime.engine.config.BindingConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; +import io.aklivity.zilla.runtime.engine.config.NamespaceConfigBuilder; +import io.aklivity.zilla.runtime.engine.config.RouteConfigBuilder; + +public final class OpenapiAsyncapiProxyGenerator extends OpenapiAsyncapiCompositeGenerator +{ + @Override + protected OpenapiAsyncapiCompositeConfig generate( + OpenapiAsyncapiBindingConfig binding, + List openapis, + List asyncapis) + { + final Map openapisByApiId = openapis.stream() + .collect(Collectors.toMap(s -> s.apiLabel, identity())); + + final Map asyncapisByApiId = asyncapis.stream() + .collect(Collectors.toMap(s -> s.apiLabel, identity())); + + final List mappings = binding.routes.stream() + .flatMap(r -> r.when.stream() + .map(w -> + new ProxyMapping( + openapisByApiId.get(w.apiId), + asyncapisByApiId.get(r.with.apiId)))) + .distinct() + .toList(); + + List namespaces = new LinkedList<>(); + List routes = new LinkedList<>(); + Matcher routed = Pattern.compile("(http)_kafka_proxy0").matcher(""); + + for (ProxyMapping mapping : mappings) + { + NamespaceHelper helper = new ProxyNamespaceHelper(binding, mapping); + NamespaceConfig namespace = NamespaceConfig.builder() + .inject(helper::injectAll) + .build(); + namespaces.add(namespace); + + namespace.bindings.stream() + .filter(b -> routed.reset(b.name).matches()) + .forEach(b -> + { + final int operationTypeId = binding.supplyTypeId.applyAsInt(routed.group(1)); + final long routeId = binding.supplyBindingId.applyAsLong(namespace, b); + + final OpenapiAsyncapiCompositeConditionConfig when = new OpenapiAsyncapiCompositeConditionConfig( + mapping.when.schemaId, + operationTypeId); + + routes.add(new OpenapiAsyncapiCompositeRouteConfig(routeId, when)); + }); + } + + return new OpenapiAsyncapiCompositeConfig(openapis, asyncapis, namespaces, routes); + } + + private final class ProxyNamespaceHelper extends NamespaceHelper + { + private final BindingsHelper bindings; + + private ProxyNamespaceHelper( + OpenapiAsyncapiBindingConfig config, + ProxyMapping mapping) + { + super(config, "%s+%s".formatted(mapping.when.apiLabel, mapping.with.apiLabel)); + this.bindings = new ProxyBindingsHelper(mapping); + } + + @Override + protected NamespaceConfigBuilder injectComponents( + NamespaceConfigBuilder namespace) + { + return namespace + .inject(bindings::injectAll); + } + + private final class ProxyBindingsHelper extends BindingsHelper + { + private final ProxyMapping mapping; + private final BindingsHelper httpKafka; + + private ProxyBindingsHelper( + ProxyMapping mapping) + { + this.mapping = mapping; + this.httpKafka = new HttpKafkaBindingsHelper(); + } + + @Override + protected NamespaceConfigBuilder injectAll( + NamespaceConfigBuilder namespace) + { + return namespace + .inject(httpKafka::injectAll); + } + + private final class ProxyRouteHelper + { + private final List when; + private final ProxyWithHelper with; + + private ProxyRouteHelper( + OpenapiAsyncapiRouteConfig route) + { + this.when = route.when.stream() + .filter(c -> mapping.when.apiLabel.equals(c.apiId)) + .map(c -> new ProxyWhenHelper(mapping.when, c.operationId)) + .toList(); + this.with = new ProxyWithHelper(mapping.with, route.with.operationId); + } + + private boolean hasWithProtocol( + Predicate protocol) + { + return hasProtocol(with, protocol); + } + + private static boolean hasProtocol( + ProxyWithHelper operation, + Predicate protocol) + { + return operation.schema != null && operation.schema.asyncapi.hasProtocol(protocol); + } + } + + private final class ProxyWhenHelper + { + private final OpenapiSchemaConfig schema; + private final String operationId; + + private ProxyWhenHelper( + OpenapiSchemaConfig schema, + String operationId) + { + this.schema = schema; + this.operationId = operationId; + } + } + + private final class ProxyWithHelper + { + private final AsyncapiSchemaConfig schema; + private final String operationId; + + private ProxyWithHelper( + AsyncapiSchemaConfig schema, + String operationId) + { + this.schema = schema; + this.operationId = operationId; + } + } + + private final class HttpKafkaBindingsHelper extends BindingsHelper + { + private static final String CORRELATION_ID = "\\{correlationId\\}"; + private static final String PARAMETERS = "\\{(?!correlationId)(\\w+)\\}"; + + private static final Pattern JSON_CONTENT_TYPE_PATTERN = Pattern.compile("^application/(?:.+\\+)?json$"); + private static final Pattern PARAMETER_PATTERN = Pattern.compile("\\{([^}]+)\\}"); + private static final Pattern CORRELATION_PATTERN = Pattern.compile(CORRELATION_ID); + + private final Matcher parameters = PARAMETER_PATTERN.matcher(""); + private final Matcher correlation = CORRELATION_PATTERN.matcher(""); + private final Matcher jsonContentType = JSON_CONTENT_TYPE_PATTERN.matcher(""); + + + private final List httpKafkaRoutes; + + private HttpKafkaBindingsHelper() + { + this.httpKafkaRoutes = config.routes.stream() + .map(ProxyRouteHelper::new) + .filter(r -> r.hasWithProtocol(p -> p.startsWith("kafka"))) + .toList(); + } + + @Override + protected NamespaceConfigBuilder injectAll( + NamespaceConfigBuilder namespace) + { + if (!httpKafkaRoutes.isEmpty()) + { + namespace.inject(this::injectHttpKafka); + } + + return namespace; + } + + private NamespaceConfigBuilder injectHttpKafka( + NamespaceConfigBuilder namespace) + { + return namespace.binding() + .name("http_kafka_proxy0") + .type("http-kafka") + .kind(PROXY) + .inject(this::injectMetrics) + .inject(this::injectHttpKafkaRoutes) + .build(); + } + + private BindingConfigBuilder injectHttpKafkaRoutes( + BindingConfigBuilder binding) + { + for (ProxyRouteHelper route : httpKafkaRoutes) + { + Map kafkaOpsById = route.with.schema.asyncapi.operations; + + for (ProxyWhenHelper condition : route.when) + { + Map httpOpsById = condition.schema.openapi.operations; + + OpenapiOperationView httpOp = httpOpsById.get(condition.operationId); + if (httpOp == null) + { + for (OpenapiOperationView httpAnyOp : httpOpsById.values()) + { + String kafkaOpId = route.with.operationId != null + ? route.with.operationId + : httpAnyOp.id; + + AsyncapiOperationView kafkaOp = kafkaOpsById.get(kafkaOpId); + + if (kafkaOp == null) + { + OpenapiOperationView httpFormatOp = httpOpsById.values().stream() + .filter(o -> o != httpAnyOp) + .filter(o -> o.responses.values().stream() + .filter(r -> "202".equals(r.status)) + .anyMatch(r -> r.headers.values().stream() + .filter(OpenapiHeaderView::hasSchemaFormat) + .anyMatch( + h -> httpAnyOp.path.equals(h.schema.format)))) + .findFirst() + .orElse(null); + + if (httpFormatOp != null) + { + kafkaOp = kafkaOpsById.get(httpFormatOp.id); + } + } + + if (kafkaOp != null) + { + injectHttpKafkaRoute(binding, httpAnyOp, kafkaOp); + } + } + } + else + { + AsyncapiOperationView kafkaOp = kafkaOpsById.get(route.with.operationId); + binding.inject(b -> injectHttpKafkaRoute(b, httpOp, kafkaOp)); + } + } + } + + return binding; + } + + private BindingConfigBuilder injectHttpKafkaRoute( + BindingConfigBuilder binding, + OpenapiOperationView httpOp, + AsyncapiOperationView kafkaOp) + { + for (OpenapiServerView httpServer : httpOp.servers) + { + binding + .route() + .exit(config.qname) + .when(HttpKafkaConditionConfig::builder) + .method(httpOp.method) + .path(httpServer.requestPath(httpOp.path)) + .build() + .inject(r -> injectHttpKafkaRouteWith(r, httpServer, httpOp, kafkaOp)) + //.inject(r -> injectHttpServerRouteGuarded(r, httpOp)) + .build(); + } + + return binding; + } + + private RouteConfigBuilder injectHttpKafkaRouteWith( + RouteConfigBuilder route, + OpenapiServerView httpServer, + OpenapiOperationView httpOperation, + AsyncapiOperationView kafkaOperation) + { + switch (kafkaOperation.action) + { + case "receive": + route + .with(HttpKafkaWithConfig::builder) + .compositeId(httpOperation.compositeId) + .fetch() + .topic(kafkaOperation.channel.address) + .inject(w -> injectHttpKafkaRouteFetchWith(w, httpServer, httpOperation)) + .build() + .build(); + break; + case "send": + route + .with(HttpKafkaWithConfig::builder) + .compositeId(httpOperation.compositeId) + .produce() + .topic(kafkaOperation.channel.address) + .inject(w -> injectHttpKafkaRouteProduceWith(w, httpServer, httpOperation, kafkaOperation)) + .build() + .build(); + break; + } + + return route; + } + + private HttpKafkaWithFetchConfigBuilder injectHttpKafkaRouteFetchWith( + HttpKafkaWithFetchConfigBuilder fetch, + OpenapiServerView httpServer, + OpenapiOperationView httpOperation) + { + merge: + for (OpenapiResponseView response : httpOperation.responses.values()) + { + OpenapiSchemaView schema = response.content != null + ? response.content.values().stream() + .filter(r -> jsonContentType.reset(r.name).matches()) + .findFirst() + .map(r -> r.schema) + .orElse(null) + : null; + + if (schema != null && "array".equals(schema.type)) + { + fetch.merged(HttpKafkaWithFetchMergeConfig.builder() + .contentType("application/json") + .initial("[]") + .path("/-") + .build()); + break merge; + } + } + + // TODO: remove, driven by http kafka operation extension instead? + final List httpParamNames = findParams(httpOperation.path); + if (!httpParamNames.isEmpty()) + { + fetch.filter() + .key(String.format("${params.%s}", httpParamNames.get(httpParamNames.size() - 1))) + .build(); + } + + if (httpOperation.hasExtensionHttpKafka()) + { + List filters = httpOperation.httpKafka.filters; + if (filters != null) + { + for (OpenapiHttpKafkaFilter filter : filters) + { + HttpKafkaWithFetchFilterConfigBuilder withFilter = fetch.filter(); + + String key = filter.key; + if (key != null) + { + key = resolveIdentity(key); + + withFilter.key(key); + } + + Map headers = filter.headers; + if (headers != null) + { + for (Map.Entry header : headers.entrySet()) + { + String name = header.getKey(); + String value = header.getValue(); + + value = resolveIdentity(value); + + withFilter.header(name, value); + } + } + + withFilter.build(); + } + } + } + + return fetch; + } + + private HttpKafkaWithProduceConfigBuilder injectHttpKafkaRouteProduceWith( + HttpKafkaWithProduceConfigBuilder produce, + OpenapiServerView httpServer, + OpenapiOperationView httpOperation, + AsyncapiOperationView kafkaOperation) + { + final List httpParamNames = findParams(httpOperation.path); + final String key = !httpParamNames.isEmpty() + ? String.format("${params.%s}", httpParamNames.get(httpParamNames.size() - 1)) + : "${idempotencyKey}"; + + produce.acks("in_sync_replicas").key(key); + + for (Map.Entry response : httpOperation.responses.entrySet()) + { + if ("202".equals(response.getKey())) + { + OpenapiResponseView content = response.getValue(); + boolean async = content.headers.entrySet().stream() + .anyMatch(e -> hasCorrelationId(e.getValue())); + + if (async) + { + content.headers.forEach((k, v) -> + { + String location = httpServer.requestPath(v.schema.format); + location = location.replaceAll(CORRELATION_ID, "\\${correlationId}"); + location = location.replaceAll(PARAMETERS, "\\${params.$1}"); + produce.async(HttpKafkaWithProduceAsyncHeaderConfig.builder() + .name(k) + .value(location) + .build()); + }); + } + } + } + AsyncapiReplyView reply = kafkaOperation.reply; + if (reply != null) + { + produce.replyTo(reply.channel.address); + } + + return produce; + } + + + private RouteConfigBuilder injectHttpServerRouteGuarded( + RouteConfigBuilder route, + OpenapiOperationView httpOp) + { + Map securitySchemes = httpOp.specification.components.securitySchemes; + final List> security = httpOp.security; + + if (security != null) + { + security.stream() + .flatMap(s -> s.stream()) + .filter(r -> securitySchemes != null && securitySchemes.containsKey(r.name)) + .filter(r -> "jwt".equalsIgnoreCase(securitySchemes.get(r.name).bearerFormat)) + .forEach(r -> + route + .guarded() + .name(String.format("%s:jwt0", config.namespace)) + .inject(guarded -> injectGuardedRoles(guarded, r.scopes)) + .build()); + } + + return route; + } + + private List findParams( + String item) + { + List paramNames = new ArrayList<>(); + Matcher matcher = parameters.reset(item); + while (matcher.find()) + { + paramNames.add(parameters.group(1)); + } + return paramNames; + } + + private boolean hasCorrelationId( + OpenapiHeaderView header) + { + boolean hasCorrelationId = false; + OpenapiSchemaView schema = header.schema; + if (schema != null && + schema.format != null) + { + hasCorrelationId = correlation.reset(schema.format).find(); + } + return hasCorrelationId; + } + } + } + } + + private record ProxyMapping( + OpenapiSchemaConfig when, + AsyncapiSchemaConfig with) + { + } +} diff --git a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/streams/OpenapiAsyncapiProxyFactory.java b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/streams/OpenapiAsyncapiProxyFactory.java index fe171c3438..16b6a9d143 100644 --- a/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/streams/OpenapiAsyncapiProxyFactory.java +++ b/runtime/binding-openapi-asyncapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/streams/OpenapiAsyncapiProxyFactory.java @@ -14,22 +14,21 @@ */ package io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.streams; -import java.util.function.Consumer; -import java.util.function.LongFunction; -import java.util.function.LongSupplier; import java.util.function.LongUnaryOperator; import org.agrona.DirectBuffer; import org.agrona.MutableDirectBuffer; -import org.agrona.collections.Long2LongHashMap; import org.agrona.collections.Long2ObjectHashMap; import org.agrona.concurrent.UnsafeBuffer; import io.aklivity.zilla.runtime.binding.asyncapi.internal.AsyncapiBinding; import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.OpenapiAsyncapiConfiguration; -import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.OpenapiAsyncNamespaceGenerator; import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.OpenapiAsyncapiBindingConfig; +import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.OpenapiAsyncapiCompositeConfig; +import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.OpenapiAsyncapiCompositeRouteConfig; import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.OpenapiAsyncapiRouteConfig; +import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.composite.OpenapiAsyncapiCompositeGenerator; +import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.config.composite.OpenapiAsyncapiProxyGenerator; import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.types.Flyweight; import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.types.OctetsFW; import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.types.stream.AbortFW; @@ -37,22 +36,20 @@ import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.types.stream.BeginFW; import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.types.stream.DataFW; import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.types.stream.EndFW; +import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.types.stream.ExtensionFW; import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.types.stream.FlushFW; import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.types.stream.OpenapiBeginExFW; import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.types.stream.ResetFW; import io.aklivity.zilla.runtime.binding.openapi.asyncapi.internal.types.stream.WindowFW; import io.aklivity.zilla.runtime.binding.openapi.internal.OpenapiBinding; +import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiOperationView; import io.aklivity.zilla.runtime.engine.EngineContext; import io.aklivity.zilla.runtime.engine.binding.BindingHandler; import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer; -import io.aklivity.zilla.runtime.engine.buffer.BufferPool; -import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.config.BindingConfig; -import io.aklivity.zilla.runtime.engine.config.NamespaceConfig; public final class OpenapiAsyncapiProxyFactory implements OpenapiAsyncapiStreamFactory { - private static final int UNKNOWN_COMPOSITE_RESOLVED_ID = -1; private static final OctetsFW EMPTY_OCTETS = new OctetsFW().wrap(new UnsafeBuffer(), 0, 0); private final BeginFW beginRO = new BeginFW(); @@ -71,26 +68,24 @@ public final class OpenapiAsyncapiProxyFactory implements OpenapiAsyncapiStreamF private final ResetFW.Builder resetRW = new ResetFW.Builder(); private final FlushFW.Builder flushRW = new FlushFW.Builder(); + private final ExtensionFW extensionRO = new ExtensionFW(); + private final AsyncapiBeginExFW asyncapiBeginExRO = new AsyncapiBeginExFW(); private final OpenapiBeginExFW openapiBeginExRO = new OpenapiBeginExFW(); private final AsyncapiBeginExFW.Builder asyncapiBeginExRW = new AsyncapiBeginExFW.Builder(); private final OpenapiBeginExFW.Builder openapiBeginExRW = new OpenapiBeginExFW.Builder(); - private final MutableDirectBuffer writeBuffer; - private final MutableDirectBuffer extBuffer; - private final BufferPool bufferPool; + private final EngineContext context; + private final BindingHandler streamFactory; private final LongUnaryOperator supplyInitialId; private final LongUnaryOperator supplyReplyId; - private final LongSupplier supplyTraceId; - private final LongFunction supplyCatalog; - private final Consumer attachComposite; - private final Consumer detachComposite; + private final MutableDirectBuffer writeBuffer; + private final MutableDirectBuffer extBuffer; + private final Long2ObjectHashMap bindings; - private final Long2LongHashMap apiIds; - private final OpenapiAsyncapiConfiguration config; - private final OpenapiAsyncNamespaceGenerator namespaceGenerator; + private final OpenapiAsyncapiCompositeGenerator generator; private final int openapiTypeId; private final int asyncapiTypeId; @@ -98,20 +93,14 @@ public OpenapiAsyncapiProxyFactory( OpenapiAsyncapiConfiguration config, EngineContext context) { - this.config = config; - this.namespaceGenerator = new OpenapiAsyncNamespaceGenerator(); + this.generator = new OpenapiAsyncapiProxyGenerator(); this.writeBuffer = context.writeBuffer(); this.extBuffer = new UnsafeBuffer(new byte[writeBuffer.capacity()]); - this.bufferPool = context.bufferPool(); + this.context = context; this.streamFactory = context.streamFactory(); this.supplyInitialId = context::supplyInitialId; this.supplyReplyId = context::supplyReplyId; - this.supplyTraceId = context::supplyTraceId; - this.supplyCatalog = context::supplyCatalog; - this.attachComposite = context::attachComposite; - this.detachComposite = context::detachComposite; this.bindings = new Long2ObjectHashMap<>(); - this.apiIds = new Long2LongHashMap(-1); this.openapiTypeId = context.supplyTypeId(OpenapiBinding.NAME); this.asyncapiTypeId = context.supplyTypeId(AsyncapiBinding.NAME); } @@ -127,19 +116,30 @@ public int routedTypeId() public void attach( BindingConfig binding) { - OpenapiAsyncapiBindingConfig apiBinding = new OpenapiAsyncapiBindingConfig(binding, - namespaceGenerator, supplyCatalog, attachComposite, detachComposite); - bindings.put(binding.id, apiBinding); + OpenapiAsyncapiBindingConfig attached = new OpenapiAsyncapiBindingConfig(context, binding); + bindings.put(binding.id, attached); + + OpenapiAsyncapiCompositeConfig composite = generator.generate(attached); + assert composite != null; + // TODO: schedule generate retry if null - apiBinding.attach(binding); + composite.namespaces.forEach(context::attachComposite); + attached.composite = composite; } @Override public void detach( long bindingId) { - OpenapiAsyncapiBindingConfig apiBinding = bindings.remove(bindingId); - apiBinding.detach(); + OpenapiAsyncapiBindingConfig binding = bindings.remove(bindingId); + OpenapiAsyncapiCompositeConfig composite = binding.composite; + + if (composite != null) + { + composite.namespaces.forEach(context::detachComposite); + } + + // TODO: cancel generate retry if scheduled } @Override @@ -159,52 +159,69 @@ public MessageConsumer newStream( final OctetsFW extension = begin.extension(); final OpenapiAsyncapiBindingConfig binding = bindings.get(routedId); + final OpenapiAsyncapiCompositeConfig composite = binding != null ? binding.composite : null; MessageConsumer newStream = null; - if (binding != null) + if (binding != null && composite != null) { - if (!binding.isCompositeOriginId(originId)) + if (composite.hasBindingId(originId)) { - final OpenapiBeginExFW openapiBeginEx = extension.get(openapiBeginExRO::tryWrap); - final long apiId = openapiBeginEx.apiId(); - final String operationId = openapiBeginEx.operationId().asString(); - - final long compositeResolvedId = binding.resolveResolvedId(apiId); - apiIds.put(apiId, apiId); + final ExtensionFW extensionEx = extension.get(extensionRO::wrap); + final long compositeId = extensionEx.compositeId(); + final OpenapiOperationView operation = composite.resolveOperation(compositeId); - if (compositeResolvedId != UNKNOWN_COMPOSITE_RESOLVED_ID) + if (operation != null) { - newStream = new OpenapiStream( - receiver, - originId, - routedId, - initialId, - apiId, - authorization, - compositeResolvedId, - operationId)::onOpenapiMessage; + final String apiId = operation.specification.label; + final String operationId = operation.id; + final OpenapiAsyncapiRouteConfig route = binding.resolve(authorization, apiId, operationId); + + if (route != null) + { + final long resolvedId = route.id; + final long resolvedApiId = composite.resolveApiId(route.with.apiId); + final String resolvedOperationId = route.with.operationId != null + ? route.with.operationId + : operationId; + + newStream = new CompositeClientStream( + receiver, + originId, + routedId, + initialId, + authorization, + affinity, + resolvedId, + resolvedApiId, + resolvedOperationId)::onCompositeClientMessage; + } } } else { - final long apiId = apiIds.get(affinity); - final OpenapiAsyncapiRouteConfig route = binding.resolve(authorization, apiId); + final OpenapiBeginExFW beginEx = extension.get(openapiBeginExRO::tryWrap); + final long apiId = beginEx.apiId(); + final String operationId = beginEx.operationId().asString(); + final ExtensionFW extensionEx = beginEx.extension().get(extensionRO::tryWrap); + final int operationTypeId = extensionEx.typeId(); + + final OpenapiAsyncapiCompositeRouteConfig route = composite.resolve(authorization, apiId, operationTypeId); if (route != null) { - final long clientApiId = binding.resolveAsyncapiApiId(route.with.apiId); - final String operationId = route.with.operationId; - newStream = new CompositeClientStream( + final long resolvedId = route.id; + + newStream = new OpenapiStream( receiver, originId, routedId, initialId, authorization, - route.id, affinity, - clientApiId, - operationId)::onCompositeClientMessage; + resolvedId, + apiId, + operationId)::onOpenapiMessage; } } } @@ -241,13 +258,14 @@ private OpenapiStream( long originId, long routedId, long initialId, - long affinity, long authorization, - long compositeResolvedId, + long affinity, + long resolvedId, + long apiId, String operationId) { this.delegate = - new CompositeServerStream(this, compositeResolvedId, compositeResolvedId, authorization, operationId); + new CompositeServerStream(this, routedId, resolvedId, authorization, apiId, operationId); this.sender = sender; this.originId = originId; this.routedId = routedId; @@ -545,6 +563,7 @@ private void cleanup( final class CompositeServerStream { + private final long apiId; private final String operationId; private final long originId; private final long routedId; @@ -569,16 +588,18 @@ final class CompositeServerStream private CompositeServerStream( OpenapiStream delegate, + long originId, long routedId, - long compositeResolvedId, long authorization, + long apiId, String operationId) { this.delegate = delegate; - this.originId = routedId; - this.routedId = compositeResolvedId; + this.originId = originId; + this.routedId = routedId; this.receiver = MessageConsumer.NOOP; this.authorization = authorization; + this.apiId = apiId; this.operationId = operationId; } @@ -634,7 +655,7 @@ private void onCompositeBegin( final OpenapiBeginExFW openapiBeginEx = openapiBeginExRW .wrap(extBuffer, 0, extBuffer.capacity()) .typeId(openapiTypeId) - .apiId(delegate.affinity) + .apiId(apiId) .operationId(operationId) .extension(extension) .build(); @@ -900,8 +921,8 @@ private CompositeClientStream( long routedId, long initialId, long authorization, - long resolvedId, long affinity, + long resolvedId, long apiId, String operationId) { diff --git a/runtime/binding-openapi-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/streams/OpenapiAsyncapiIT.java b/runtime/binding-openapi-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/streams/OpenapiAsyncapiIT.java index 71ebe4f77d..994c36106b 100644 --- a/runtime/binding-openapi-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/streams/OpenapiAsyncapiIT.java +++ b/runtime/binding-openapi-asyncapi/src/test/java/io/aklivity/zilla/runtime/binding/openapi/asyncapi/internal/streams/OpenapiAsyncapiIT.java @@ -25,6 +25,7 @@ import io.aklivity.k3po.runtime.junit.annotation.Specification; import io.aklivity.k3po.runtime.junit.rules.K3poRule; +import io.aklivity.zilla.runtime.engine.EngineConfiguration; import io.aklivity.zilla.runtime.engine.test.EngineRule; import io.aklivity.zilla.runtime.engine.test.annotation.Configuration; @@ -41,6 +42,8 @@ public class OpenapiAsyncapiIT .countersBufferCapacity(8192) .configurationRoot("io/aklivity/zilla/specs/binding/openapi/asyncapi/config") .external("asyncapi_client0") + .configure(EngineConfiguration.ENGINE_VERBOSE, false) + .configure(EngineConfiguration.ENGINE_VERBOSE_COMPOSITES, false) .clean(); @Rule diff --git a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeConfig.java b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeConfig.java index 1eda128fce..4abce22f55 100644 --- a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeConfig.java +++ b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/OpenapiCompositeConfig.java @@ -66,8 +66,7 @@ public OpenapiCompositeConfig( this.operationsById = schemas.stream() .map(s -> s.openapi) - .flatMap(v -> v.paths.values().stream()) - .flatMap(v -> v.methods.values().stream()) + .flatMap(v -> v.operations.values().stream()) .collect(toMap(o -> o.compositeId, o -> o, (o1, o2) -> o1, Long2ObjectHashMap::new)); this.specificationsById = schemas.stream() diff --git a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/composite/OpenapiClientGenerator.java b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/composite/OpenapiClientGenerator.java index f29c71a6a3..c061406324 100644 --- a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/composite/OpenapiClientGenerator.java +++ b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/composite/OpenapiClientGenerator.java @@ -239,14 +239,13 @@ private HttpOptionsConfigBuilder injectHttpRequests( { Stream.of(schema) .map(s -> s.openapi) - .flatMap(v -> v.paths.values().stream()) - .flatMap(p -> p.methods.values().stream()) + .flatMap(v -> v.operations.values().stream()) .filter(OpenapiOperationView::hasResponses) .forEach(operation -> operation.servers.forEach(server -> options .request() - .path(operation.requestPath(server)) + .path(server.requestPath(operation.path)) .method(HttpRequestConfig.Method.valueOf(operation.method)) .inject(request -> injectHttpResponses(request, operation)) .build() diff --git a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/composite/OpenapiCompositeGenerator.java b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/composite/OpenapiCompositeGenerator.java index db81cc68ba..994e3e5362 100644 --- a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/composite/OpenapiCompositeGenerator.java +++ b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/composite/OpenapiCompositeGenerator.java @@ -248,16 +248,14 @@ private InlineOptionsConfigBuilder injectInlineRequests( { Stream.of(schema) .map(s -> s.openapi) - .flatMap(v -> v.paths.values().stream()) - .flatMap(p -> p.methods.values().stream()) + .flatMap(v -> v.operations.values().stream()) .map(o -> o.requestBody) .filter(Objects::nonNull) .forEach(m -> injectInlineRequest(jsonb, options, m)); Stream.of(schema) .map(s -> s.openapi) - .flatMap(v -> v.paths.values().stream()) - .flatMap(p -> p.methods.values().stream()) + .flatMap(v -> v.operations.values().stream()) .filter(o -> o.parameters != null) .flatMap(c -> c.parameters.stream()) .filter(p -> p.schema != null) // TODO: runtime expressions @@ -327,8 +325,7 @@ private InlineOptionsConfigBuilder injectInlineResponses( { Stream.of(schema) .map(s -> s.openapi) - .flatMap(v -> v.paths.values().stream()) - .flatMap(p -> p.methods.values().stream()) + .flatMap(v -> v.operations.values().stream()) .filter(OpenapiOperationView::hasResponses) .flatMap(o -> o.responses.values().stream()) .forEach(o -> injectInlineResponse(jsonb, options, o)); diff --git a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/composite/OpenapiServerGenerator.java b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/composite/OpenapiServerGenerator.java index af6e9f653a..cdac7322db 100644 --- a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/composite/OpenapiServerGenerator.java +++ b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/config/composite/OpenapiServerGenerator.java @@ -33,6 +33,7 @@ import io.aklivity.zilla.runtime.binding.openapi.internal.config.OpenapiCompositeConfig; import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiOperationView; import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiSchemaView; +import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiSecurityRequirementView; import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiSecuritySchemeView; import io.aklivity.zilla.runtime.binding.openapi.internal.view.OpenapiServerView; import io.aklivity.zilla.runtime.binding.tcp.config.TcpConditionConfig; @@ -261,8 +262,7 @@ private HttpOptionsConfigBuilder injectHttpRequests( { Stream.of(schema) .map(s -> s.openapi) - .flatMap(v -> v.paths.values().stream()) - .flatMap(v -> v.methods.values().stream()) + .flatMap(v -> v.operations.values().stream()) .filter(OpenapiOperationView::hasRequestBodyOrParameters) .forEach(operation -> { @@ -270,7 +270,7 @@ private HttpOptionsConfigBuilder injectHttpRequests( { options .request() - .path(operation.requestPath(server)) + .path(server.requestPath(operation.path)) .method(Method.valueOf(operation.method)) .inject(request -> injectHttpParams(request, operation)) .inject(request -> injectHttpContent(request, operation)) @@ -375,8 +375,7 @@ private BindingConfigBuilder injectHttpRoutes( { Stream.of(schema) .map(s -> s.openapi) - .flatMap(v -> v.paths.values().stream()) - .flatMap(p -> p.methods.values().stream()) + .flatMap(v -> v.operations.values().stream()) .filter(o -> o.servers != null) .forEach(operation -> operation.servers.forEach(server -> @@ -384,7 +383,7 @@ private BindingConfigBuilder injectHttpRoutes( .route() .exit(config.qname) .when(HttpConditionConfig::builder) - .header(":path", operation.requestPath(server).replaceAll(REGEX_ADDRESS_PARAMETER, "*")) + .header(":path", server.requestPath(operation.path).replaceAll(REGEX_ADDRESS_PARAMETER, "*")) .header(":method", operation.method) .build() .with(HttpWithConfig::builder) @@ -401,19 +400,19 @@ private RouteConfigBuilder injectHttpServerRouteGuarded( OpenapiOperationView operation) { Map securitySchemes = operation.specification.components.securitySchemes; - final List>> security = operation.security; + final List> security = operation.security; if (security != null) { security.stream() - .flatMap(s -> s.entrySet().stream()) - .filter(e -> securitySchemes.containsKey(e.getKey())) - .filter(e -> "jwt".equalsIgnoreCase(securitySchemes.get(e.getKey()).bearerFormat)) - .forEach(e -> + .flatMap(s -> s.stream()) + .filter(r -> securitySchemes.containsKey(r.name)) + .filter(r -> "jwt".equalsIgnoreCase(securitySchemes.get(r.name).bearerFormat)) + .forEach(r -> route .guarded() .name(config.options.http.authorization.qname) - .inject(guarded -> injectGuardedRoles(guarded, e.getValue())) + .inject(guarded -> injectGuardedRoles(guarded, r.scopes)) .build()); } diff --git a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/model/Openapi.java b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/model/Openapi.java index f2d4ab37e4..60c6be7e53 100644 --- a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/model/Openapi.java +++ b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/model/Openapi.java @@ -22,6 +22,6 @@ public class Openapi public String openapi; public List servers; public Map paths; - public Map> security; + public List>> security; public OpenapiComponents components; } diff --git a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/model/OpenapiOperation.java b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/model/OpenapiOperation.java index 6bdd0ba342..4c2c22dfc9 100644 --- a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/model/OpenapiOperation.java +++ b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/model/OpenapiOperation.java @@ -17,6 +17,10 @@ import java.util.List; import java.util.Map; +import jakarta.json.bind.annotation.JsonbProperty; + +import io.aklivity.zilla.runtime.binding.openapi.internal.model.extensions.http.kafka.OpenapiHttpKafkaOperationExtension; + public class OpenapiOperation { public String operationId; @@ -25,4 +29,7 @@ public class OpenapiOperation public Map responses; public List>> security; public List servers; + + @JsonbProperty("x-zilla-http-kafka") + public OpenapiHttpKafkaOperationExtension httpKafka; } diff --git a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/model/extensions/http/kafka/OpenapiHttpKafkaFilter.java b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/model/extensions/http/kafka/OpenapiHttpKafkaFilter.java new file mode 100644 index 0000000000..b3c4a09d82 --- /dev/null +++ b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/model/extensions/http/kafka/OpenapiHttpKafkaFilter.java @@ -0,0 +1,23 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.openapi.internal.model.extensions.http.kafka; + +import java.util.Map; + +public class OpenapiHttpKafkaFilter +{ + public String key; + public Map headers; +} diff --git a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/model/extensions/http/kafka/OpenapiHttpKafkaOperationExtension.java b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/model/extensions/http/kafka/OpenapiHttpKafkaOperationExtension.java new file mode 100644 index 0000000000..ca22d16f58 --- /dev/null +++ b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/model/extensions/http/kafka/OpenapiHttpKafkaOperationExtension.java @@ -0,0 +1,26 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.openapi.internal.model.extensions.http.kafka; + +import java.util.List; +import java.util.Map; + +public class OpenapiHttpKafkaOperationExtension +{ + public String key; + public Map overrides; + + public List filters; +} diff --git a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiHeaderView.java b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiHeaderView.java index cd92b21891..1168aa6aad 100644 --- a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiHeaderView.java +++ b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiHeaderView.java @@ -37,4 +37,9 @@ public final class OpenapiHeaderView this.allowEmptyValue = resolved.allowEmptyValue; this.schema = new OpenapiSchemaView(resolver, model.schema); } + + public boolean hasSchemaFormat() + { + return schema != null && schema.format != null; + } } diff --git a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiOperationView.java b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiOperationView.java index 3b2203e029..9961bef520 100644 --- a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiOperationView.java +++ b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiOperationView.java @@ -19,10 +19,10 @@ import java.util.List; import java.util.Map; -import java.util.Objects; import io.aklivity.zilla.runtime.binding.openapi.config.OpenapiServerConfig; import io.aklivity.zilla.runtime.binding.openapi.internal.model.OpenapiOperation; +import io.aklivity.zilla.runtime.binding.openapi.internal.model.extensions.http.kafka.OpenapiHttpKafkaOperationExtension; import io.aklivity.zilla.runtime.binding.openapi.internal.model.resolver.OpenapiResolver; public final class OpenapiOperationView @@ -38,8 +38,9 @@ public final class OpenapiOperationView public final List parameters; public final OpenapiRequestBodyView requestBody; public final Map responses; - public final List>> security; + public final List> security; public final List servers; + public final OpenapiHttpKafkaOperationExtension httpKafka; OpenapiOperationView( OpenapiView specification, @@ -55,7 +56,7 @@ public final class OpenapiOperationView this.method = method; this.path = path; - this.id = Objects.requireNonNull(model.operationId); + this.id = model.operationId; this.parameters = model.parameters != null ? model.parameters.stream() @@ -73,13 +74,21 @@ public final class OpenapiOperationView .collect(toMap(c -> c.status, identity())) : null; - this.security = model.security; + this.security = model.security != null + ? model.security.stream() + .map(s -> s.entrySet().stream() + .map(e -> new OpenapiSecurityRequirementView(e.getKey(), e.getValue())) + .toList()) + .toList() + : specification.security; this.servers = model.servers != null ? model.servers.stream() .flatMap(s -> configs.stream().map(c -> new OpenapiServerView(resolver, s, c))) .toList() : specification.servers; + + this.httpKafka = model.httpKafka; } public boolean hasRequestBodyOrParameters() @@ -102,13 +111,8 @@ public boolean hasResponses() return responses != null; } - public String requestPath( - OpenapiServerView server) + public boolean hasExtensionHttpKafka() { - String serverPath = server.url.getPath(); - - return serverPath != null - ? serverPath.endsWith("/") ? serverPath.concat(path.substring(1)) : serverPath.concat(path) - : path; + return httpKafka != null; } } diff --git a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiSecurityRequirementView.java b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiSecurityRequirementView.java new file mode 100644 index 0000000000..3228cc7ac2 --- /dev/null +++ b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiSecurityRequirementView.java @@ -0,0 +1,31 @@ +/* + * Copyright 2021-2024 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.openapi.internal.view; + +import java.util.List; + +public final class OpenapiSecurityRequirementView +{ + public final String name; + public final List scopes; + + OpenapiSecurityRequirementView( + String name, + List scopes) + { + this.name = name; + this.scopes = scopes; + } +} diff --git a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiServerView.java b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiServerView.java index d05b400a47..9a8555eb44 100644 --- a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiServerView.java +++ b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiServerView.java @@ -59,6 +59,17 @@ public final class OpenapiServerView : null; } + + public String requestPath( + String path) + { + String serverPath = url.getPath(); + + return serverPath != null && path != null + ? serverPath.endsWith("/") ? serverPath.concat(path.substring(1)) : serverPath.concat(path) + : path; + } + public static final class VariableMatcher { private static final Pattern VARIABLE = Pattern.compile("\\{([^}]*.?)\\}"); diff --git a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiView.java b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiView.java index d8b8a091e9..a4b0dbcb07 100644 --- a/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiView.java +++ b/runtime/binding-openapi/src/main/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiView.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.function.LongSupplier; +import java.util.stream.Stream; import org.agrona.collections.MutableInteger; @@ -36,6 +37,8 @@ public final class OpenapiView public final OpenapiComponentsView components; public final Map paths; public final List servers; + public final Map operations; + public final List> security; public static OpenapiView of( Openapi model) @@ -72,10 +75,24 @@ private OpenapiView( this.servers = model.servers != null ? model.servers.stream() - .flatMap(s -> configs.stream().map(c -> new OpenapiServerView(resolver, s, c))) + .flatMap(s -> configs.isEmpty() + ? Stream.of(new OpenapiServerView(resolver, s, null)) + : configs.stream().map(c -> new OpenapiServerView(resolver, s, c))) .toList() : null; + this.security = model.security != null + ? model.security.stream() + .map(s -> s.entrySet().stream() + .map(e -> new OpenapiSecurityRequirementView(e.getKey(), e.getValue())) + .toList()) + .toList() + : null; + + this.components = model.components != null + ? new OpenapiComponentsView(resolver, model.components) + : null; + MutableInteger opIndex = new MutableInteger(1); LongSupplier supplyCompositeId = () -> compositeId(id, opIndex.value++); this.paths = model.paths != null @@ -84,8 +101,11 @@ private OpenapiView( .collect(toMap(c -> c.path, identity())) : null; - this.components = model.components != null - ? new OpenapiComponentsView(resolver, model.components) + this.operations = this.paths != null + ? this.paths.values().stream() + .flatMap(p -> p.methods.values().stream()) + .filter(o -> o.id != null) + .collect(toMap(o -> o.id, identity())) : null; } } diff --git a/runtime/binding-openapi/src/test/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiViewTest.java b/runtime/binding-openapi/src/test/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiViewTest.java index d06ec8893c..dee7067dea 100644 --- a/runtime/binding-openapi/src/test/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiViewTest.java +++ b/runtime/binding-openapi/src/test/java/io/aklivity/zilla/runtime/binding/openapi/internal/view/OpenapiViewTest.java @@ -63,6 +63,7 @@ public void shouldAdaptSchemaExclusiveMinMax() throws Exception Openapi model = new Openapi(); model.servers = List.of(server); model.paths = Map.of("/", path); + model.security = List.of(Map.of("OAuth2", List.of("read", "write"))); OpenapiServerConfig config = OpenapiServerConfig.builder() .url("http://localhost/path") @@ -70,6 +71,7 @@ public void shouldAdaptSchemaExclusiveMinMax() throws Exception OpenapiView view = OpenapiView.of(model, List.of(config)); OpenapiServerView serverView = view.servers.get(0); + List securityView = view.security.get(0); OpenapiPathView pathView = view.paths.get("/"); OpenapiOperationView operationView = pathView.methods.get("GET"); OpenapiRequestBodyView requestBodyView = operationView.requestBody; @@ -77,6 +79,9 @@ public void shouldAdaptSchemaExclusiveMinMax() throws Exception OpenapiSchemaView schemaView = mediaTypeView.schema; assertEquals(URI.create("http://localhost:80/path"), serverView.url); + assertEquals(1, securityView.size()); + assertEquals("OAuth2", securityView.get(0).name); + assertEquals(List.of("read", "write"), securityView.get(0).scopes); assertNull(schemaView.model.exclusiveMinimum); assertEquals(0, schemaView.model.minimum.intValue()); assertEquals(100, schemaView.model.exclusiveMaximum.intValue()); diff --git a/specs/binding-openapi-asyncapi.spec/pom.xml b/specs/binding-openapi-asyncapi.spec/pom.xml index b1913e3065..a4db3137e7 100644 --- a/specs/binding-openapi-asyncapi.spec/pom.xml +++ b/specs/binding-openapi-asyncapi.spec/pom.xml @@ -97,7 +97,7 @@ ${project.version} core http openapi - io.aklivity.zilla.specs.binding.openapi.internal.types + io.aklivity.zilla.specs.binding.openapi.asyncapi.internal.types @@ -153,7 +153,7 @@ jacoco-maven-plugin - io/aklivity/zilla/specs/binding/openapi/internal/types/**/*.class + io/aklivity/zilla/specs/binding/openapi/asyncapi/internal/types/**/*.class diff --git a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/config/proxy-async.yaml b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/config/proxy-async.yaml index 6109552b05..1c87af90bd 100644 --- a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/config/proxy-async.yaml +++ b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/config/proxy-async.yaml @@ -19,6 +19,7 @@ catalogs: catalog0: type: test options: + id: 1 subject: petstore schema: | openapi: "3.0.0" @@ -28,7 +29,7 @@ catalogs: license: name: MIT servers: - - url: http://localhost:8080 + - url: http://localhost:8080/v1 paths: /customer: post: @@ -60,9 +61,6 @@ catalogs: type: string format: /customer;cid={correlationId} description: Customer verification result location URL - security: - - petstore_auth: - - write:all /customer;cid={correlationId}: get: tags: @@ -98,9 +96,6 @@ catalogs: type: string format: /customer;cid={correlationId} description: Customer verification result location URL - security: - - petstore_auth: - - read:all components: schemas: Customer: @@ -143,6 +138,7 @@ catalogs: catalog1: type: test options: + id: 2 subject: petstore schema: | asyncapi: 3.0.0 diff --git a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/config/proxy.yaml b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/config/proxy.yaml index a01792444d..d93cd963b4 100644 --- a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/config/proxy.yaml +++ b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/config/proxy.yaml @@ -19,6 +19,7 @@ catalogs: catalog0: type: test options: + id: 1 subject: petstore schema: | openapi: "3.0.0" @@ -28,7 +29,7 @@ catalogs: license: name: MIT servers: - - url: http://localhost:9090 + - url: http://localhost:9090/v1 paths: /pets: get: @@ -103,6 +104,7 @@ catalogs: catalog1: type: test options: + id: 2 subject: petstore schema: | asyncapi: 3.0.0 diff --git a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/async.verify.customer/client.rpt b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/async.verify.customer/client.rpt index 51ddaed510..a005160142 100644 Binary files a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/async.verify.customer/client.rpt and b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/async.verify.customer/client.rpt differ diff --git a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/async.verify.customer/server.rpt b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/async.verify.customer/server.rpt index c391159847..e2f9e5ea30 100644 Binary files a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/async.verify.customer/server.rpt and b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/async.verify.customer/server.rpt differ diff --git a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/create.pet/client.rpt b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/create.pet/client.rpt index a8e9ec432a..acdc946861 100644 --- a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/create.pet/client.rpt +++ b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/create.pet/client.rpt @@ -22,7 +22,7 @@ connect "zilla://streams/asyncapi_client0" write zilla:begin.ext ${asyncapi:beginEx() .typeId(zilla:id("asyncapi")) - .apiId(0) + .apiId(2) .extension(kafka:beginEx() .typeId(zilla:id("kafka")) .merged() diff --git a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/create.pet/server.rpt b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/create.pet/server.rpt index c3f6fd3489..7da12d167e 100644 --- a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/create.pet/server.rpt +++ b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/asyncapi/create.pet/server.rpt @@ -21,7 +21,7 @@ accepted read zilla:begin.ext ${asyncapi:matchBeginEx() .typeId(zilla:id("asyncapi")) - .apiId(0) + .apiId(2) .extension(kafka:matchBeginEx() .typeId(zilla:id("kafka")) .merged() diff --git a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/async.verify.customer/client.rpt b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/async.verify.customer/client.rpt index d4a3efab5d..2456e008fd 100644 --- a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/async.verify.customer/client.rpt +++ b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/async.verify.customer/client.rpt @@ -19,13 +19,13 @@ connect "zilla://streams/composite0" write zilla:begin.ext ${openapi:beginEx() .typeId(zilla:id("openapi")) - .apiId(0) + .apiId(1) .operationId("createCustomer") .extension(http:beginEx() .typeId(zilla:id("http")) .header(":method", "POST") .header(":scheme", "http") - .header(":path", "/customer") + .header(":path", "/v1/customer") .header(":authority", "localhost:8080") .header("content-type", "application/json") .header("content-length", "155") @@ -40,13 +40,13 @@ write close read zilla:begin.ext ${openapi:matchBeginEx() .typeId(zilla:id("openapi")) - .apiId(0) + .apiId(1) .operationId("createCustomer") .extension(http:matchBeginEx() .typeId(zilla:id("http")) .header(":status", "202") .header("content-length", "0") - .header("Location", "/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed") + .header("Location", "/v1/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-29854217b0ba4ea8370d777b2f0f187a") .build()) .build()} @@ -62,14 +62,14 @@ connect await RECEIVED_ASYNC_RESPONSE write zilla:begin.ext ${openapi:beginEx() .typeId(zilla:id("openapi")) - .apiId(0) + .apiId(1) .operationId("getVerifiedCustomer") .extension(http:beginEx() .typeId(zilla:id("http")) .header(":method", "GET") .header(":scheme", "http") .header(":authority", "localhost:8080") - .header(":path", "/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed") + .header(":path", "/v1/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-29854217b0ba4ea8370d777b2f0f187a") .header("prefer", "respond-async") .build()) .build()} @@ -79,7 +79,7 @@ write close read zilla:begin.ext ${openapi:matchBeginEx() .typeId(zilla:id("openapi")) - .apiId(0) + .apiId(1) .operationId("getVerifiedCustomer") .extension(http:matchBeginEx() .typeId(zilla:id("http")) diff --git a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/async.verify.customer/server.rpt b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/async.verify.customer/server.rpt index 1aef3f522f..46895f6356 100644 --- a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/async.verify.customer/server.rpt +++ b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/async.verify.customer/server.rpt @@ -20,13 +20,13 @@ accepted read zilla:begin.ext ${openapi:matchBeginEx() .typeId(zilla:id("openapi")) - .apiId(0) + .apiId(1) .operationId("createCustomer") .extension(http:matchBeginEx() .typeId(zilla:id("http")) .header(":method", "POST") .header(":scheme", "http") - .header(":path", "/customer") + .header(":path", "/v1/customer") .header(":authority", "localhost:8080") .header("content-type", "application/json") .header("content-length", "155") @@ -41,13 +41,13 @@ read closed write zilla:begin.ext ${openapi:beginEx() .typeId(zilla:id("openapi")) - .apiId(0) + .apiId(1) .operationId("createCustomer") .extension(http:beginEx() .typeId(zilla:id("http")) .header(":status", "202") .header("content-length", "0") - .header("Location", "/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed") + .header("Location", "/v1/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-29854217b0ba4ea8370d777b2f0f187a") .build()) .build()} write flush @@ -58,14 +58,14 @@ accepted read zilla:begin.ext ${openapi:matchBeginEx() .typeId(zilla:id("openapi")) - .apiId(0) + .apiId(1) .operationId("getVerifiedCustomer") .extension(http:matchBeginEx() .typeId(zilla:id("http")) .header(":method", "GET") .header(":scheme", "http") .header(":authority", "localhost:8080") - .header(":path", "/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-966ecfabf0fe9086ce63f615b87a6bed") + .header(":path", "/v1/customer;cid=3f96592e-c8f1-4167-8c46-85f2aabb70a5-29854217b0ba4ea8370d777b2f0f187a") .header("prefer", "respond-async") .build()) .build()} @@ -75,7 +75,7 @@ read closed write zilla:begin.ext ${openapi:beginEx() .typeId(zilla:id("openapi")) - .apiId(0) + .apiId(1) .operationId("getVerifiedCustomer") .extension(http:beginEx() .typeId(zilla:id("http")) diff --git a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/create.pet/client.rpt b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/create.pet/client.rpt index 4c469634da..34c43dea10 100644 --- a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/create.pet/client.rpt +++ b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/create.pet/client.rpt @@ -19,13 +19,13 @@ connect "zilla://streams/composite0" write zilla:begin.ext ${openapi:beginEx() .typeId(zilla:id("openapi")) - .apiId(0) + .apiId(1) .operationId("createPets") .extension(http:beginEx() .typeId(zilla:id("http")) .header(":method", "POST") .header(":scheme", "http") - .header(":path", "/pets") + .header(":path", "/v1/pets") .header(":authority", "localhost:8080") .header("content-type", "application/json") .header("content-length", "39") @@ -38,7 +38,7 @@ write close read zilla:begin.ext ${openapi:matchBeginEx() .typeId(zilla:id("openapi")) - .apiId(0) + .apiId(1) .operationId("createPets") .extension(http:matchBeginEx() .typeId(zilla:id("http")) diff --git a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/create.pet/server.rpt b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/create.pet/server.rpt index 16b37abee3..e0a079e789 100644 --- a/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/create.pet/server.rpt +++ b/specs/binding-openapi-asyncapi.spec/src/main/scripts/io/aklivity/zilla/specs/binding/openapi/asyncapi/streams/openapi/create.pet/server.rpt @@ -20,13 +20,13 @@ accepted read zilla:begin.ext ${openapi:matchBeginEx() .typeId(zilla:id("openapi")) - .apiId(0) + .apiId(1) .operationId("createPets") .extension(http:matchBeginEx() .typeId(zilla:id("http")) .header(":method", "POST") .header(":scheme", "http") - .header(":path", "/pets") + .header(":path", "/v1/pets") .header(":authority", "localhost:8080") .header("content-type", "application/json") .header("content-length", "39") @@ -40,7 +40,7 @@ read closed write zilla:begin.ext ${openapi:beginEx() .typeId(zilla:id("openapi")) - .apiId(0) + .apiId(1) .operationId("createPets") .extension(http:beginEx() .typeId(zilla:id("http"))