From e5cd4e2a5bde30786777b008b70dba283660f04f Mon Sep 17 00:00:00 2001 From: "Dr. Christoph \"Schorsch\" Jung" Date: Thu, 12 Oct 2023 08:15:05 +0200 Subject: [PATCH] fix: deal with peculiarities of the EDC 0.5.1 control plane regarding to negotiation requests and transfer process state. --- agent-plane/agent-plane-protocol/README.md | 65 ++++++++++--------- .../resources/dataplane.properties | 1 + .../tractusx/agents/edc/AgentConfig.java | 15 +++++ .../agents/edc/AgreementController.java | 7 +- .../agents/edc/service/DataManagement.java | 38 +++++++++-- 5 files changed, 88 insertions(+), 38 deletions(-) diff --git a/agent-plane/agent-plane-protocol/README.md b/agent-plane/agent-plane-protocol/README.md index 1131b72..c551e2d 100644 --- a/agent-plane/agent-plane-protocol/README.md +++ b/agent-plane/agent-plane-protocol/README.md @@ -87,36 +87,37 @@ For a list of environment variables to configure the behaviour of the data plane See [this sample configuration file](resources/dataplane.properties) -| Property | Required | Default/Example | Description | List | -|-----------------------------------------------|----------|---------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------|------| -| cx.agent.asset.default | | urn:x-arq:DefaultGraph | IRI of the default graph (federated data catalogue) | | -| cx.agent.asset.file | | https://www.w3id.org/catenax/ontology,dataspace.ttl | Initial triples for the default graph (federated data catalogue) | L | -| cx.agent.accesspoint.name | | api | Matchmaking agent endpoint name (internal) | | -| cx.agent.controlplane.protocol | (X) | http://oem-control-plane:8182 | Protocol Endpoint of the providing control plane (needed if you want to access local graphs/skills without absolute address) | | -| cx.agent.controlplane.management | X | http://oem-control-plane2:8181/management/v2 | Data Management Endpoint of the consuming control plane | | -| cx.agent.controlplane.management.provider | (X) | http://oem-control-plane:8181/management/v2 | Data Management Endpoint of the providing control plane (only if different from the consuming control plane) | | -| edc.participant.id | X | BPNL00000DUMMY | business partner number under which the consuming control plane operates | | -| edc.api.auth.code | (X) | X-Api-Key | Authentication Header for consuming control plane (if any) | | -| edc.api.auth.key | (X) | **** | Authentication Secret for consuming control plane (if any) | | -| edc.dataplane.token.validation.endpoint | X | http://oem-control-plane2:9999/control/token / http://localhost:8082/api/validation/ | Token validation endpoint of single control plane or the address of the integrated switching validator in case of multiple control planes | | -| edc.dataplane.token.validation.endpoints. | (X) | http://oem-control-plane:9999/control/token | Additional token validation endpoints to switch between (if multiple control planes) | * | -| web.http.callback.port | X | 8187 | Callback endpoint port | | -| web.http.callback.path | X | /callback | Callback endpoint path prefix | | -| cx.agent.callback | X | http://oem-data-plane:8187/callback/endpoint-data-reference | Callback endpoint full address as seen from the consuming control plane | | -| cx.agent.skill.contract | | cx.agent.skill.contract.default=Contract?partner=Skill | Id/IRI of the default contract put in the cx-common:publishedUnderContract property for new skills | | -| cx.agent.dataspace.synchronization | | -1 / 60000 | If positive, number of seconds between each catalogue synchronization attempt | | -| cx.agent.service.allow | | (http|edc)s?://.* | Regular expression for determining which IRIs are allowed in SERVICE calls (on top level/federated data catalogue) | | -| cx.agent.service.deny | | ^$ | Regular expression for determining which IRIs are denied in SERVICE calls (on top level/federated data catalogue) | | | -| cx.agent.service.asset.allow | | (http|edc)s://.* | Regular expression for determining which IRIs are allowed in delegated SERVICE calls (if not overriden by the cx-common:allowServicePattern address property) | | -| cx.agent.service.asset.deny | | ^$ | Regular expression for determining which IRIs are denied in delegated SERVICE calls (it not overridden by the cx-common:denyServicePattern address property) | | | -| cx.agent.dataspace.remotes | | http://consumer-edc-control:8282,http://tiera-edc-control:8282 | business partner control plane protocol urls to synchronize with | L | -| cx.agent.sparql.verbose | | false | Controls the verbosity of the SparQL Engine | | -| cx.agent.threadpool.size | | 4 | Number of threads pooled for any concurrent batch calls and synchronisation actions | | -| cx.agent.federation.batch.max | | 9223372036854775807 / 8 | Maximal number of tuples to send in one query | | -| cx.agent.negotiation.poll | | 1000 | Number of milliseconds between negotiation status checks | | -| cx.agent.negotiation.timeout | | 30000 | Number of milliseconds after which a pending negotiation is regarded as stale | | -| cx.agent.connect.timeout | | | Number of milliseconds after which a connection attempt is regarded as stale | | -| cx.agent.read.timeout | | 1080000 | Number of milliseconds after which a reading attempt is regarded as stale | | -| cx.agent.call.timeout | | | Number of milliseconds after which a complete call is regarded as stale | | -| cx.agent.write.timeout | | | Number of milliseconds after which a write attempt is regarded as stale | | +| Property | Required | Default/Example | Description | List | +|-----------------------------------------------|----------|--------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------|------| +| cx.agent.asset.default | | urn:x-arq:DefaultGraph | IRI of the default graph (federated data catalogue) | | +| cx.agent.asset.file | | https://www.w3id.org/catenax/ontology,dataspace.ttl | Initial triples for the default graph (federated data catalogue) | L | +| cx.agent.accesspoint.name | | api | Matchmaking agent endpoint name (internal) | | +| cx.agent.controlplane.protocol | (X) | http://oem-control-plane:8182 | Protocol Endpoint of the providing control plane (needed if you want to access local graphs/skills without absolute address) | | +| cx.agent.controlplane.management | X | http://oem-control-plane2:8181/management/v2 | Data Management Endpoint of the consuming control plane | | +| cx.agent.controlplane.management.provider | (X) | http://oem-control-plane:8181/management/v2 | Data Management Endpoint of the providing control plane (only if different from the consuming control plane) | | +| edc.participant.id | X | BPNL00000DUMMY | business partner number under which the consuming control plane operates | | +| edc.api.auth.code | (X) | X-Api-Key | Authentication Header for consuming control plane (if any) | | +| edc.api.auth.key | (X) | **** | Authentication Secret for consuming control plane (if any) | | +| edc.dataplane.token.validation.endpoint | X | http://oem-control-plane2:9999/control/token / http://localhost:8082/api/validation/ | Token validation endpoint of single control plane or the address of the integrated switching validator in case of multiple control planes | | +| edc.dataplane.token.validation.endpoints. | (X) | http://oem-control-plane:9999/control/token | Additional token validation endpoints to switch between (if multiple control planes) | * | +| web.http.callback.port | X | 8187 | Callback endpoint port | | +| web.http.callback.path | X | /callback | Callback endpoint path prefix | | +| cx.agent.callback | X | http://oem-data-plane:8187/callback/endpoint-data-reference | Callback endpoint full address as seen from the consuming control plane | | +| cx.agent.skill.contract | | cx.agent.skill.contract.default=Contract?partner=Skill | Id/IRI of the default contract put in the cx-common:publishedUnderContract property for new skills | | +| cx.agent.dataspace.synchronization | | -1 / 60000 | If positive, number of seconds between each catalogue synchronization attempt | | +| cx.agent.service.allow | | (http|edc)s?://.* | Regular expression for determining which IRIs are allowed in SERVICE calls (on top level/federated data catalogue) | | +| cx.agent.service.deny | | ^$ | Regular expression for determining which IRIs are denied in SERVICE calls (on top level/federated data catalogue) | | | +| cx.agent.service.asset.allow | | (http|edc)s://.* | Regular expression for determining which IRIs are allowed in delegated SERVICE calls (if not overriden by the cx-common:allowServicePattern address property) | | +| cx.agent.service.asset.deny | | ^$ | Regular expression for determining which IRIs are denied in delegated SERVICE calls (it not overridden by the cx-common:denyServicePattern address property) | | | +| cx.agent.dataspace.remotes | | http://consumer-edc-control:8282,http://tiera-edc-control:8282 | business partner control plane protocol urls to synchronize with | L | +| cx.agent.sparql.verbose | | false | Controls the verbosity of the SparQL Engine | | +| cx.agent.threadpool.size | | 4 | Number of threads pooled for any concurrent batch calls and synchronisation actions | | +| cx.agent.federation.batch.max | | 9223372036854775807 / 8 | Maximal number of tuples to send in one query | | +| cx.agent.negotiation.poll | | 1000 | Number of milliseconds between negotiation status checks | | +| cx.agent.negotiation.timeout | | 30000 | Number of milliseconds after which a pending negotiation is regarded as stale | | +| cx.agent.connect.timeout | | | Number of milliseconds after which a connection attempt is regarded as stale | | +| cx.agent.read.timeout | | 1080000 | Number of milliseconds after which a reading attempt is regarded as stale | | +| cx.agent.call.timeout | | | Number of milliseconds after which a complete call is regarded as stale | | +| cx.agent.write.timeout | | | Number of milliseconds after which a write attempt is regarded as stale | | +| cx.agent.edc.version | | 0.5.1 | Version of the TX EDC that is used (in case that management/transfer API changes) | | diff --git a/agent-plane/agent-plane-protocol/resources/dataplane.properties b/agent-plane/agent-plane-protocol/resources/dataplane.properties index 0bf0ab7..80f6daa 100644 --- a/agent-plane/agent-plane-protocol/resources/dataplane.properties +++ b/agent-plane/agent-plane-protocol/resources/dataplane.properties @@ -65,6 +65,7 @@ cx.agent.controlplane.management=http://consuming-control-plane:8181/management/ cx.agent.controlplane.management.provider=http://providing-control-plane:8181/management/v2 cx.agent.callback=http://agent-plane:8187/callback/endpoint-data-reference cx.agent.skill.contract.default=Contract?partner=Skill +cx.agent.edc.version=0.5.1 cx.agent.service.allow=(edcs?://.*)|(https://query\\.wikidata\\.org/sparql)|(http://[^\\.]+:\\d+.*) cx.agent.service.asset.allow=(edcs?://.*)|(https://query\\.wikidata\\.org/sparql)|(http://[^\\.]+:\\d+.*) diff --git a/agent-plane/agent-plane-protocol/src/main/java/org/eclipse/tractusx/agents/edc/AgentConfig.java b/agent-plane/agent-plane-protocol/src/main/java/org/eclipse/tractusx/agents/edc/AgentConfig.java index 416e092..aa3eaf8 100644 --- a/agent-plane/agent-plane-protocol/src/main/java/org/eclipse/tractusx/agents/edc/AgentConfig.java +++ b/agent-plane/agent-plane-protocol/src/main/java/org/eclipse/tractusx/agents/edc/AgentConfig.java @@ -88,6 +88,8 @@ public class AgentConfig { public static String SERVICE_DENY_ASSET_PROPERTY = "cx.agent.service.asset.deny"; public static String DEFAULT_SERVICE_DENY_ASSET_PATTERN = "^$"; + public static final String TX_EDC_VERSION_PROPERTY = "cx.agent.edc.version"; + /** * precompiled stuff */ @@ -308,5 +310,18 @@ public Pattern getServiceAssetDenyPattern() { return serviceAssetDenyPattern; } + /** + * @return tx edc version as a string + */ + public String getEdcVersion() { + return config.getString(TX_EDC_VERSION_PROPERTY, "0.5.0"); + } + + /** + * @return whether the edc version is less than 23.09 + */ + public boolean isPrerelease() { + return getEdcVersion().compareTo("0.5.0") <= 0; + } } diff --git a/agent-plane/agent-plane-protocol/src/main/java/org/eclipse/tractusx/agents/edc/AgreementController.java b/agent-plane/agent-plane-protocol/src/main/java/org/eclipse/tractusx/agents/edc/AgreementController.java index 5bbc4ec..eb4915e 100644 --- a/agent-plane/agent-plane-protocol/src/main/java/org/eclipse/tractusx/agents/edc/AgreementController.java +++ b/agent-plane/agent-plane-protocol/src/main/java/org/eclipse/tractusx/agents/edc/AgreementController.java @@ -359,8 +359,11 @@ public EndpointDataReference createAgreement(String remoteUrl, String asset) thr startTime = System.currentTimeMillis(); + // EDC 0.5.1 has a problem with the checker configuration and wont process to COMPLETED + String expectedTransferState = config.isPrerelease() ? "COMPLETED" : "STARTED"; + try { - while ((System.currentTimeMillis() - startTime < config.getNegotiationTimeout()) && (process == null || !process.getState().equals("COMPLETED"))) { + while ((System.currentTimeMillis() - startTime < config.getNegotiationTimeout()) && (process == null || !process.getState().equals(expectedTransferState))) { Thread.sleep(config.getNegotiationPollInterval()); process = dataManagement.getTransfer( transferId @@ -373,7 +376,7 @@ public EndpointDataReference createAgreement(String remoteUrl, String asset) thr monitor.warning(String.format("Process thread for asset %s transfer %s run into problem. Giving up.", asset, transferId),e); } - if (process == null || !process.getState().equals("COMPLETED")) { + if (process == null || !process.getState().equals(expectedTransferState)) { deactivate(asset); throw new InternalServerErrorException(String.format("Transfer process %s for agreement %s and asset %s could not be provisioned.", transferId, agreement.getId(), asset)); } diff --git a/agent-plane/agent-plane-protocol/src/main/java/org/eclipse/tractusx/agents/edc/service/DataManagement.java b/agent-plane/agent-plane-protocol/src/main/java/org/eclipse/tractusx/agents/edc/service/DataManagement.java index 83c87ca..7114736 100644 --- a/agent-plane/agent-plane-protocol/src/main/java/org/eclipse/tractusx/agents/edc/service/DataManagement.java +++ b/agent-plane/agent-plane-protocol/src/main/java/org/eclipse/tractusx/agents/edc/service/DataManagement.java @@ -46,7 +46,14 @@ public class DataManagement { */ public static final String DSP_PATH="%s/api/v1/dsp"; public static final String CATALOG_CALL = "%s/catalog/request"; - public static final String CATALOG_REQUEST_BODY="{" + + // catalog request 0.5.>=1 + public static final String CATALOG_REQUEST_BODY = "{" + + "\"@context\": {}," + + "\"protocol\": \"dataspace-protocol-http\"," + + "\"counterPartyAddress\": \"%s\", " + + "\"querySpec\": %s }"; + // catalog request 0.5.0 + public static final String CATALOG_REQUEST_BODY_PRERELEASE = "{" + "\"@context\": {}," + "\"protocol\": \"dataspace-protocol-http\"," + "\"providerUrl\": \"%s\", " + @@ -92,7 +99,23 @@ public class DataManagement { "}\n"; public static final String ASSET_CALL = "%s/assets/request"; + // negotiation request 0.5.>=1 public static final String NEGOTIATION_REQUEST_BODY="{\n" + + "\"@context\": { \"odrl\": \"http://www.w3.org/ns/odrl/2/\"},\n" + + "\"@type\": \"NegotiationInitiateRequestDto\",\n" + + "\"connectorAddress\": \"%1$s\",\n" + + "\"protocol\": \"dataspace-protocol-http\",\n" + + "\"providerId\": \"%3$s\",\n" + + "\"connectorId\": \"%2$s\",\n" + + "\"offer\": {\n" + + " \"offerId\": \"%4$s\",\n" + + " \"assetId\": \"%5$s\",\n" + + " \"policy\": %6$s\n" + + "}\n" + + "}"; + + // negotiation request 0.5.0 - roles of provider and connector are wrong + public static final String NEGOTIATION_REQUEST_BODY_PRERELEASE="{\n" + "\"@context\": { \"odrl\": \"http://www.w3.org/ns/odrl/2/\"},\n" + "\"@type\": \"NegotiationInitiateRequestDto\",\n" + "\"connectorAddress\": \"%1$s\",\n" + @@ -180,9 +203,13 @@ public DcatCatalog findContractOffers(String remoteControlPlaneIdsUrl, String as * @throws IOException in case something went wrong */ public DcatCatalog getCatalog(String remoteControlPlaneIdsUrl, QuerySpec spec) throws IOException { - var url = String.format(CATALOG_CALL,config.getControlPlaneManagementUrl()); - var catalogSpec =String.format(CATALOG_REQUEST_BODY,String.format(DSP_PATH,remoteControlPlaneIdsUrl),objectMapper.writeValueAsString(spec)); + + // use a version specific call + String template = config.isPrerelease() ? CATALOG_REQUEST_BODY_PRERELEASE : CATALOG_REQUEST_BODY; + + var catalogSpec =String.format(template, + String.format(DSP_PATH, remoteControlPlaneIdsUrl), objectMapper.writeValueAsString(spec)); var request = new Request.Builder().url(url).post(RequestBody.create(catalogSpec,MediaType.parse("application/json"))); config.getControlPlaneManagementHeaders().forEach(request::addHeader); @@ -294,7 +321,10 @@ public IdResponse createOrUpdateSkill(String assetId, String name, String descri public String initiateNegotiation(ContractNegotiationRequest negotiationRequest) throws IOException { var url = String.format(NEGOTIATION_INITIATE_CALL,config.getControlPlaneManagementUrl()); - var negotiateSpec =String.format(NEGOTIATION_REQUEST_BODY, + // use a version specific call + String template = config.isPrerelease() ? NEGOTIATION_REQUEST_BODY_PRERELEASE : NEGOTIATION_REQUEST_BODY; + + var negotiateSpec =String.format(template, negotiationRequest.getConnectorAddress(), negotiationRequest.getLocalBusinessPartnerNumber(), negotiationRequest.getRemoteBusinessPartnerNumber(),