Skip to content

Commit

Permalink
[Feature] Prorietary API calls /import and /reset (#943)
Browse files Browse the repository at this point in the history
* updated request execution context
* Service(Context) now allows to execution of request with the context of the endpoint through which the request has been received
* introduced AbstractEndpoint class
* endpoints can new be configured to support only a given set of service profiles
* added Persistence.deleteAll()
* added FileStorage.deleteAll()
* AAS operation GetSelfDescription now returns only service profiles supported by endpoint through which it has been called
* added new requests ImportRequest and ResetRequest
* updated documentation
* fix ReferenceHelper.asString(Reference) to noew use proper String serialization of reference type

---------

Co-authored-by: Michael Jacoby <[email protected]>
  • Loading branch information
fvolz and mjacoby authored Nov 13, 2024
1 parent e24b838 commit ed5e896
Show file tree
Hide file tree
Showing 128 changed files with 2,670 additions and 880 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import de.fraunhofer.iosb.ilt.faaast.service.persistence.SubmodelSearchCriteria;
import de.fraunhofer.iosb.ilt.faaast.service.registry.RegistrySynchronization;
import de.fraunhofer.iosb.ilt.faaast.service.request.RequestHandlerManager;
import de.fraunhofer.iosb.ilt.faaast.service.request.handler.DynamicRequestExecutionContext;
import de.fraunhofer.iosb.ilt.faaast.service.request.handler.RequestExecutionContext;
import de.fraunhofer.iosb.ilt.faaast.service.typing.TypeExtractor;
import de.fraunhofer.iosb.ilt.faaast.service.typing.TypeInfo;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class Service implements ServiceContext {
private MessageBus messageBus;
private Persistence persistence;
private FileStorage fileStorage;
private RequestExecutionContext requestExecutionContext;

private RegistrySynchronization registrySynchronization;
private RequestHandlerManager requestHandler;
Expand Down Expand Up @@ -118,12 +120,8 @@ public Service(CoreConfig coreConfig,
this.fileStorage = fileStorage;
this.messageBus = messageBus;
this.assetConnectionManager = new AssetConnectionManager(config.getCore(), assetConnections, this);
this.requestHandler = new RequestHandlerManager(new RequestExecutionContext(
coreConfig,
persistence,
fileStorage,
messageBus,
assetConnectionManager));
this.requestHandler = new RequestHandlerManager(config.getCore());
this.requestExecutionContext = new DynamicRequestExecutionContext(this);
this.registrySynchronization = new RegistrySynchronization(config.getCore(), persistence, messageBus, endpoints);
}

Expand All @@ -146,9 +144,9 @@ public Service(ServiceConfig config)


@Override
public Response execute(Request request) {
public Response execute(Endpoint source, Request request) {
try {
return requestHandler.execute(request);
return requestHandler.execute(request, requestExecutionContext.withEndpoint(source));
}
catch (Exception e) {
LOGGER.trace("Error executing request", e);
Expand Down Expand Up @@ -224,7 +222,7 @@ public Environment getAASEnvironment() throws PersistenceException {
public void executeAsync(Request request, Consumer<Response> callback) {
Ensure.requireNonNull(request, "request must be non-null");
Ensure.requireNonNull(callback, "callback must be non-null");
this.requestHandler.executeAsync(request, callback);
this.requestHandler.executeAsync(request, callback, requestExecutionContext);
}


Expand All @@ -239,6 +237,21 @@ public AssetConnectionManager getAssetConnectionManager() {
}


public FileStorage getFileStorage() {
return fileStorage;
}


public ServiceConfig getConfig() {
return config;
}


public Persistence getPersistence() {
return persistence;
}


/**
* Starts the service.This includes starting the message bus and endpoints.
*
Expand Down Expand Up @@ -301,12 +314,8 @@ private void init() throws ConfigurationException {
endpoints.add(endpoint);
}
}
this.requestHandler = new RequestHandlerManager(new RequestExecutionContext(
this.config.getCore(),
this.persistence,
this.fileStorage,
this.messageBus,
this.assetConnectionManager));
this.requestHandler = new RequestHandlerManager(config.getCore());
this.requestExecutionContext = new DynamicRequestExecutionContext(this);
this.registrySynchronization = new RegistrySynchronization(config.getCore(), persistence, messageBus, endpoints);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package de.fraunhofer.iosb.ilt.faaast.service;

import de.fraunhofer.iosb.ilt.faaast.service.endpoint.Endpoint;
import de.fraunhofer.iosb.ilt.faaast.service.messagebus.MessageBus;
import de.fraunhofer.iosb.ilt.faaast.service.model.api.Request;
import de.fraunhofer.iosb.ilt.faaast.service.model.api.Response;
Expand Down Expand Up @@ -47,10 +48,23 @@ public interface ServiceContext {
* Executes a request.
*
* @param <T> type of expected response
* @param source the endpoint via which the request has been triggered
* @param request request to execute
* @return result of executing the request
*/
public <T extends Response> T execute(Request<T> request);
public <T extends Response> T execute(Endpoint source, Request<T> request);


/**
* Execute a request without context of an endpoint. This is typically used when executed for custom code.
*
* @param <T> type of expected response
* @param request The request to execute.
* @return the corresponding response
*/
public default <T extends Response> T execute(Request<T> request) {
return execute(null, request);
}


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package de.fraunhofer.iosb.ilt.faaast.service.assetconnection;

import de.fraunhofer.iosb.ilt.faaast.service.ServiceContext;
import de.fraunhofer.iosb.ilt.faaast.service.Service;
import de.fraunhofer.iosb.ilt.faaast.service.assetconnection.lambda.LambdaAssetConnection;
import de.fraunhofer.iosb.ilt.faaast.service.assetconnection.lambda.provider.LambdaOperationProvider;
import de.fraunhofer.iosb.ilt.faaast.service.assetconnection.lambda.provider.LambdaSubscriptionProvider;
Expand Down Expand Up @@ -56,18 +56,23 @@ public class AssetConnectionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(AssetConnectionManager.class);
private final List<AssetConnection> connections;
private final CoreConfig coreConfig;
private final ServiceContext serviceContext;
private final ScheduledExecutorService scheduledExecutorService;
private final LambdaAssetConnection lambdaAssetConnection;
private final Service service;
private ScheduledExecutorService scheduledExecutorService;
private LambdaAssetConnection lambdaAssetConnection;
private volatile boolean active;

public AssetConnectionManager(CoreConfig coreConfig, List<AssetConnection> connections, ServiceContext context) throws ConfigurationException {
public AssetConnectionManager(CoreConfig coreConfig, List<AssetConnection> connections, Service service) throws ConfigurationException {
this.active = true;
this.coreConfig = coreConfig;
this.connections = connections != null ? new ArrayList<>(connections) : new ArrayList<>();
this.serviceContext = context;
this.lambdaAssetConnection = new LambdaAssetConnection();
this.service = service;
validateConnections();
init();
}


private void init() {
lambdaAssetConnection = new LambdaAssetConnection();
ThreadFactory threadFactory = new ThreadFactory() {
AtomicLong count = new AtomicLong(0);

Expand Down Expand Up @@ -149,6 +154,20 @@ public void registerLambdaOperationProvider(Reference reference, LambdaOperation
}


/**
* Reset the AssetConnectionManager by first stopping the manager if active, then removing all connections and
* restarting the manager.
*/
public void reset() {
if (active) {
stop();
}
connections.clear();
init();
start();
}


/**
* Unregister a {@link LambdaOperationProvider}.
*
Expand Down Expand Up @@ -202,7 +221,7 @@ private void setupSubscription(Reference reference, AssetSubscriptionProvider pr
}
try {
provider.addNewDataListener((DataElementValue data) -> {
Response response = serviceContext.execute(SetSubmodelElementValueByPathRequest.builder()
Response response = service.execute(SetSubmodelElementValueByPathRequest.builder()
.submodelId(ReferenceHelper.findFirstKeyType(reference, KeyTypes.SUBMODEL))
.path(ReferenceHelper.toPath(reference))
.disableSyncWithAsset()
Expand Down Expand Up @@ -253,7 +272,7 @@ private void setupConnectionAsync(AssetConnection connection) {
*/
public void add(AssetConnectionConfig<? extends AssetConnection, ? extends AssetValueProviderConfig, ? extends AssetOperationProviderConfig, ? extends AssetSubscriptionProviderConfig> connectionConfig)
throws ConfigurationException, AssetConnectionException {
AssetConnection newConnection = connectionConfig.newInstance(coreConfig, serviceContext);
AssetConnection newConnection = connectionConfig.newInstance(coreConfig, service);
Optional<AssetConnection> connection = connections.stream().filter(x -> Objects.equals(x, newConnection)).findFirst();
if (connection.isPresent()) {
connectionConfig.getValueProviders().forEach(LambdaExceptionHelper.rethrowBiConsumer(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2021 Fraunhofer IOSB, eine rechtlich nicht selbstaendige
* Einrichtung der Fraunhofer-Gesellschaft zur Foerderung der angewandten
* Forschung e.V.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.fraunhofer.iosb.ilt.faaast.service.endpoint;

import de.fraunhofer.iosb.ilt.faaast.service.ServiceContext;
import de.fraunhofer.iosb.ilt.faaast.service.config.CoreConfig;
import de.fraunhofer.iosb.ilt.faaast.service.model.ServiceSpecificationProfile;
import de.fraunhofer.iosb.ilt.faaast.service.util.Ensure;
import java.util.List;
import java.util.Objects;


/**
* Abstract base class for {@link Endpoint} implementations.
*
* @param <T> type of the configuration
*/
public abstract class AbstractEndpoint<T extends EndpointConfig> implements Endpoint<T> {

protected T config;
protected ServiceContext serviceContext;

@Override
public T asConfig() {
return config;
}


/**
* {@inheritDoc}
*
* @throws IllegalArgumentException is config is null
*/
@Override
public void init(CoreConfig coreConfig, T config, ServiceContext serviceContext) {
Ensure.requireNonNull(config, "config must be non-null");
Ensure.requireNonNull(serviceContext, "serviceContext must be non-null");
this.config = config;
this.serviceContext = serviceContext;
}


@Override
public List<ServiceSpecificationProfile> getProfiles() {
return config.getProfiles();
}


@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AbstractEndpoint<T> that = (AbstractEndpoint<T>) o;
return Objects.equals(config, that.config)
&& Objects.equals(serviceContext, that.serviceContext);
}


@Override
public int hashCode() {
return Objects.hash(config, serviceContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import de.fraunhofer.iosb.ilt.faaast.service.config.Configurable;
import de.fraunhofer.iosb.ilt.faaast.service.exception.EndpointException;
import de.fraunhofer.iosb.ilt.faaast.service.model.ServiceSpecificationProfile;
import java.util.List;


Expand All @@ -41,6 +42,14 @@ public interface Endpoint<T extends EndpointConfig> extends Configurable<T> {
public void stop();


/**
* Gets a list of supported service profiles.
*
* @return list of supported service profiles.
*/
public List<ServiceSpecificationProfile> getProfiles();


/**
* Gets endpoint information for an AAS. This is used for automatic registration with a registry. The returned result
* may include multiple endpoints, e.g. with different interfaces like AAS-REPOSITORY and AAS.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
package de.fraunhofer.iosb.ilt.faaast.service.endpoint;

import de.fraunhofer.iosb.ilt.faaast.service.config.Config;
import de.fraunhofer.iosb.ilt.faaast.service.model.ServiceSpecificationProfile;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.eclipse.digitaltwin.aas4j.v3.model.builder.ExtendableBuilder;


Expand All @@ -26,6 +30,36 @@
*/
public class EndpointConfig<T extends Endpoint> extends Config<T> {

protected List<ServiceSpecificationProfile> profiles = new ArrayList<>();

public List<ServiceSpecificationProfile> getProfiles() {
return profiles;
}


public void setProfiles(List<ServiceSpecificationProfile> profiles) {
this.profiles = profiles;
}


@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EndpointConfig that = (EndpointConfig) o;
return Objects.equals(profiles, that.profiles);
}


@Override
public int hashCode() {
return Objects.hash(profiles);
}

/**
* Abstract builder class that should be used for builders of inheriting classes.
*
Expand All @@ -34,7 +68,16 @@ public class EndpointConfig<T extends Endpoint> extends Config<T> {
* @param <B> type of this builder, needed for inheritance builder pattern
*/
public abstract static class AbstractBuilder<T extends Endpoint, C extends EndpointConfig<T>, B extends AbstractBuilder<T, C, B>> extends ExtendableBuilder<C, B> {
public B profiles(List<ServiceSpecificationProfile> value) {
getBuildingInstance().setProfiles(value);
return getSelf();
}


public B profile(ServiceSpecificationProfile value) {
getBuildingInstance().getProfiles().add(value);
return getSelf();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ public interface FileStorage<C extends FileStorageConfig> extends Configurable<C
public void delete(String path) throws ResourceNotFoundException, PersistenceException;


/**
* Deletes all files present in the storage.
*
* @throws PersistenceException if storage error occurs
*/
public void deleteAll() throws PersistenceException;


/**
* Saves the file to given path.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,14 @@ public void insert(SubmodelElementIdentifier parentIdentifier, SubmodelElement s
public void deleteSubmodelElement(SubmodelElementIdentifier identifier) throws ResourceNotFoundException, PersistenceException;


/**
* Deletes all data in the persistence.
*
* @throws PersistenceException if there was an error with the storage.
*/
public void deleteAll() throws PersistenceException;


/**
* Deletes an {@code org.eclipse.digitaltwin.aas4j.v3.model.AssetAdministrationShell}.
*
Expand Down
Loading

0 comments on commit ed5e896

Please sign in to comment.