Skip to content

Commit

Permalink
feat: upgrade to EDC 0.6 and port the service patterns from embedded …
Browse files Browse the repository at this point in the history
…matchmaking.
  • Loading branch information
drcgjung committed Apr 18, 2024
1 parent 6248f9c commit e4a7442
Show file tree
Hide file tree
Showing 11 changed files with 837 additions and 873 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ __pycache__
.classpath
.project
.settings/
*.bak
*cs-cleanup.xml
*cs-formatter.xml
.checkstyle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.eclipse.tractusx.agents;

import com.nimbusds.jose.JWSObject;
import jakarta.json.Json;
import jakarta.json.JsonValue;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.ClientErrorException;
Expand All @@ -38,7 +39,6 @@
import org.eclipse.tractusx.agents.model.TransferProcess;
import org.eclipse.tractusx.agents.model.TransferRequest;
import org.eclipse.tractusx.agents.service.DataManagement;
import org.eclipse.tractusx.agents.service.DataManagementImpl;
import org.eclipse.tractusx.agents.service.DataspaceSynchronizer;
import org.eclipse.tractusx.agents.utils.CallbackAddress;
import org.eclipse.tractusx.agents.utils.DataAddress;
Expand Down Expand Up @@ -120,7 +120,7 @@ public String toString() {
public void receiveEdcCallback(EndpointDataReference dataReference) {
var agreementId = dataReference.getId();
monitor.debug(String.format("An endpoint data reference for agreement %s has been posted.", agreementId));
synchronized (agreementStore) {
synchronized (processStore) {
for (Map.Entry<String, TransferProcess> process : processStore.entrySet()) {
if (process.getValue().getId().equals(agreementId)) {
synchronized (endpointStore) {
Expand Down Expand Up @@ -283,7 +283,7 @@ public EndpointDataReference createAgreement(String remoteUrl, String asset) thr
var contractNegotiationRequest = ContractNegotiationRequest.Builder.newInstance()
.offerId(contractOfferDescription)
.connectorId("provider")
.connectorAddress(String.format(DataManagementImpl.DSP_PATH, remoteUrl))
.connectorAddress(String.format(DataManagement.DSP_PATH, remoteUrl))
.protocol("dataspace-protocol-http")
.localBusinessPartnerNumber(config.getBusinessPartnerNumber())
.remoteBusinessPartnerNumber(contractOffers.getParticipantId())
Expand Down Expand Up @@ -359,7 +359,7 @@ public EndpointDataReference createAgreement(String remoteUrl, String asset) thr
.assetId(asset)
.contractId(agreement.getId())
.connectorId(config.getBusinessPartnerNumber())
.connectorAddress(String.format(DataManagementImpl.DSP_PATH, remoteUrl))
.connectorAddress(String.format(DataManagement.DSP_PATH, remoteUrl))
.protocol("dataspace-protocol-http")
.dataDestination(dataDestination)
.managedResources(false)
Expand All @@ -369,9 +369,14 @@ public EndpointDataReference createAgreement(String remoteUrl, String asset) thr
monitor.debug(String.format("About to initiate transfer for agreement %s (for asset %s at connector %s)", negotiation.getContractAgreementId(), asset, remoteUrl));

String transferId;
TransferProcess process;

try {
transferId = dataManagement.initiateHttpProxyTransferProcess(transferRequest);
synchronized (processStore) {
transferId = dataManagement.initiateHttpProxyTransferProcess(transferRequest);
process = new TransferProcess(Json.createObjectBuilder().add("@id", transferId).add("https://w3id.org/edc/v0.0.1/ns/state", "UNINITIALIZED").build());
registerProcess(asset, process);
}
} catch (IOException ioe) {
deactivate(asset);
throw new InternalServerErrorException(String.format("HttpProxy transfer for agreement %s could not be initiated.", agreement.getId()), ioe);
Expand All @@ -380,8 +385,6 @@ public EndpointDataReference createAgreement(String remoteUrl, String asset) thr
monitor.debug(String.format("About to check transfer %s (for asset %s at connector %s)", transferId, asset, remoteUrl));

// Check negotiation state
TransferProcess process = null;

startTime = System.currentTimeMillis();

// EDC 0.5.1 has a problem with the checker configuration and wont process to COMPLETED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ static Matcher matchSkill(String key) {
* @param contract of skill optional
* @param dist of skill required
* @param isFederated whether skill maybe synchronized in catalogue
* @param allowServicePattern regex for service to call in skill
* @param denyServicePattern regex for services denied in skill
* @param ontologies a set of ontologies
* @return skill id
*/
String put(String key, String skill, String name, String description, String version, String contract, SkillDistribution dist, boolean isFederated, String... ontologies);
String put(String key, String skill, String name, String description, String version, String contract, SkillDistribution dist, boolean isFederated, String allowServicePattern, String denyServicePattern, String... ontologies);

/**
* return the skill distribution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,11 +454,13 @@ public Response postSkill(String query,
@QueryParam("contract") String contract,
@QueryParam("distributionMode") SkillDistribution mode,
@QueryParam("isFederated") boolean isFederated,
@QueryParam("allowServicesPattern") String allowServicePattern,
@QueryParam("denyServicesPattern") String denyServicePattern,
@QueryParam("ontology") String[] ontologies
) {
monitor.debug(String.format("Received a POST skill request %s %s %s %s %s %b %s ", asset, name, description, version, contract, mode.getMode(), isFederated, query));
monitor.debug(String.format("Received a POST skill request %s %s %s %s %s %b %s %s %s ", asset, name, description, version, contract, mode.getMode(), isFederated, allowServicePattern, denyServicePattern, query));
Response.ResponseBuilder rb;
if (skillStore.put(asset, query, name, description, version, contract, mode, isFederated, ontologies) != null) {
if (skillStore.put(asset, query, name, description, version, contract, mode, isFederated, allowServicePattern, denyServicePattern, ontologies) != null) {
rb = Response.ok();
} else {
rb = Response.status(HttpStatus.SC_CREATED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ public Response postAsset(String content,
@QueryParam("shape") String shape,
@QueryParam("isFederated") boolean isFederated,
@QueryParam("ontology") String[] ontologies,
@QueryParam("allowServicesPattern") String allow,
@QueryParam("denyServicesPattern") String deny,
@Context HttpServletRequest request
) {
ExternalFormat format = ExternalFormat.valueOfFormat(request.getContentType());
monitor.debug(String.format("Received a POST asset request %s %s %s %s %s %b in format %s", asset, name, description, version, contract, shape, isFederated, format));
monitor.debug(String.format("Received a POST asset request %s %s %s %s %s %b %s %s in format %s", asset, name, description, version, contract, shape, isFederated, allow, deny, format));
if (format == null) {
return Response.status(Response.Status.BAD_REQUEST).build();
}
Expand All @@ -121,7 +123,7 @@ public Response postAsset(String content,
if (shape == null) {
shape = String.format("@prefix : <%s#> .\\n", asset);
}
management.createOrUpdateGraph(asset, name, description, version, contract, ontologiesString, shape, isFederated);
management.createOrUpdateGraph(asset, name, description, version, contract, ontologiesString, shape, isFederated, allow, deny);
long reg = store.registerAsset(asset, content, format);
MediaType med = MediaType.APPLICATION_JSON_TYPE;
ResponseBuilder resBuild = Response.ok(reg, med);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.eclipse.tractusx.agents.http.DelegationServiceImpl;
import org.eclipse.tractusx.agents.http.GraphController;
import org.eclipse.tractusx.agents.rdf.RdfStore;
import org.eclipse.tractusx.agents.service.DataManagementImpl;
import org.eclipse.tractusx.agents.service.DataManagement;
import org.eclipse.tractusx.agents.service.DataspaceSynchronizer;
import org.eclipse.tractusx.agents.service.EdcSkillStore;
import org.eclipse.tractusx.agents.sparql.DataspaceServiceExecutor;
Expand All @@ -50,20 +50,20 @@

public class SharedObjectManager {
private static final SharedObjectManager INSTANCE = new SharedObjectManager();
private Monitor monitor;
private TypeManager typeManager;
private AgentConfig agentConfig;
private RdfStore rdfStore;
private ServiceExecutorRegistry reg;
private SparqlQueryProcessor processor;
private DataManagementImpl catalogService;
private SkillStore skillStore;
private AgreementControllerImpl agreementController;
private AgentController agentController;
private GraphController graphController;
private DelegationServiceImpl delegationService;
private DataspaceSynchronizer synchronizer;
private OkHttpClient httpClient;
private final Monitor monitor;
private final TypeManager typeManager;
private final AgentConfig agentConfig;
private final RdfStore rdfStore;
private final ServiceExecutorRegistry reg;
private final SparqlQueryProcessor processor;
private final DataManagement catalogService;
private final SkillStore skillStore;
private final AgreementControllerImpl agreementController;
private final AgentController agentController;
private final GraphController graphController;
private final DelegationServiceImpl delegationService;
private final DataspaceSynchronizer synchronizer;
private final OkHttpClient httpClient;


private SharedObjectManager() {
Expand Down Expand Up @@ -93,7 +93,7 @@ private SharedObjectManager() {
Config emptyConfig = ConfigFactory.fromProperties(props);
this.agentConfig = new AgentConfig(monitor, emptyConfig);
this.httpClient = new OkHttpClient();
this.catalogService = new DataManagementImpl(monitor, typeManager, httpClient, agentConfig);
this.catalogService = new DataManagement(monitor, typeManager, httpClient, agentConfig);
agreementController = new AgreementControllerImpl(monitor, agentConfig, catalogService);
this.rdfStore = new RdfStore(agentConfig, monitor);
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(agentConfig.getThreadPoolSize());
Expand All @@ -119,9 +119,7 @@ public static String convertToCurl(Request request) {
curlCommand.append(" -X ").append(request.method());

// Add headers
request.headers().toMultimap().forEach((name, values) -> {
values.forEach(value -> curlCommand.append(" -H '").append(name).append(": ").append(value).append("'"));
});
request.headers().toMultimap().forEach((name, values) -> values.forEach(value -> curlCommand.append(" -H '").append(name).append(": ").append(value).append("'")));

// Add request body if present
if (request.body() != null) {
Expand Down Expand Up @@ -164,7 +162,7 @@ public SparqlQueryProcessor getProcessor() {
return processor;
}

public DataManagementImpl getCatalogService() {
public DataManagement getCatalogService() {
return catalogService;
}

Expand Down
Loading

0 comments on commit e4a7442

Please sign in to comment.