diff --git a/gradle/libs.0.10.0.versions.toml b/gradle/libs.0.10.0.versions.toml
new file mode 100644
index 0000000..a54eb48
--- /dev/null
+++ b/gradle/libs.0.10.0.versions.toml
@@ -0,0 +1,122 @@
+[metadata]
+format.version = "1.1"
+
+[versions]
+edc = "0.10.0"
+postgres = "42.7.4"
+
+# add here
+
+[libraries]
+edc-runtime-metamodel = { module = "org.eclipse.edc:runtime-metamodel", version.ref = "edc" }
+# SPI modules
+edc-spi-core = { module = "org.eclipse.edc:core-spi", version.ref = "edc" }
+edc-spi-identity-trust = { module = "org.eclipse.edc:identity-trust-spi", version.ref = "edc" }
+edc-spi-jwt = { module = "org.eclipse.edc:jwt-spi", version.ref = "edc" }
+edc-boot-spi = { module = "org.eclipse.edc:boot-spi", version.ref = "edc" }
+
+
+# Common dependencies
+edc-boot = { module = "org.eclipse.edc:boot", version.ref = "edc" }
+edc-core-connector = { module = "org.eclipse.edc:connector-core", version.ref = "edc" }
+edc-auth-tokenbased = { module = "org.eclipse.edc:auth-tokenbased", version.ref = "edc" }
+edc-auth-configuration = { module = "org.eclipse.edc:auth-configuration", version.ref = "edc" }
+edc-did-web = { module = "org.eclipse.edc:identity-did-web", version.ref = "edc" }
+edc-identity-core-did = { module = "org.eclipse.edc:identity-did-core", version.ref = "edc" }
+edc-identity-vc-ldp = { module = "org.eclipse.edc:ldp-verifiable-credentials", version.ref = "edc" }
+edc-identity-vc-jwt = { module = "org.eclipse.edc:jwt-verifiable-credentials", version.ref = "edc" }
+edc-vault-hashicorp = { module = "org.eclipse.edc:vault-hashicorp", version.ref = "edc" }
+edc-jsonld = { module = "org.eclipse.edc:json-ld", version.ref = "edc" }
+edc-junit = { module = "org.eclipse.edc:junit", version.ref = "edc" }
+edc-jersey-core = { module = "org.eclipse.edc:jersey-core", version.ref = "edc" }
+edc-jetty-core = { module = "org.eclipse.edc:jetty-core", version.ref = "edc" }
+edc-configuration-filesystem = { module = "org.eclipse.edc:configuration-filesystem", version.ref = "edc" }
+edc-token-core = { module = "org.eclipse.edc:token-core", version.ref = "edc" }
+
+# Control Plane-specific dependencies
+edc-controlplane-core = { module = "org.eclipse.edc:control-plane-core", version.ref = "edc" }
+edc-dsp = { module = "org.eclipse.edc:dsp", version.ref = "edc" }
+edc-api-management-dataplaneselector = { module = "org.eclipse.edc:data-plane-selector-api", version.ref = "edc" }
+edc-http = { module = "org.eclipse.edc:http", version.ref = "edc" }
+edc-controlplane-callback-dispatcher-event = { module = "org.eclipse.edc:callback-event-dispatcher", version.ref = "edc" }
+edc-controlplane-callback-dispatcher-http = { module = "org.eclipse.edc:callback-http-dispatcher", version.ref = "edc" }
+edc-iam-mock = { module = "org.eclipse.edc:iam-mock", version.ref = "edc" }
+edc-identity-trust-transform = { module = "org.eclipse.edc:identity-trust-transform", version.ref = "edc" }
+edc-api-control-configuration = { module = "org.eclipse.edc:control-api-configuration", version.ref = "edc" }
+edc-core-edrstore = { module = "org.eclipse.edc:edr-store-core", version.ref = "edc" }
+edc-edr-storereceiver = { module = "org.eclipse.edc:edr-store-receiver", version.ref = "edc" }
+edc-policy-monitor-core = { module = "org.eclipse.edc:policy-monitor-core", version.ref = "edc" }
+
+# Dataplane Framework dependencies
+edc-dpf-transfer-signaling = { module = "org.eclipse.edc:transfer-data-plane-signaling", version.ref = "edc" }
+edc-spi-dataplane-selector = { module = "org.eclipse.edc:data-plane-selector-spi", version.ref = "edc" }
+edc-dpf-selector-control-api = { module = "org.eclipse.edc:data-plane-selector-control-api", version.ref = "edc" }
+edc-dpf-signaling-client = { module = "org.eclipse.edc:data-plane-signaling-client", version.ref = "edc" }
+edc-dpf-selector-core = { module = "org.eclipse.edc:data-plane-selector-core", version.ref = "edc" }
+edc-dpf-selector-client = { module = "org.eclipse.edc:data-plane-selector-client", version.ref = "edc" }
+
+# Data Plane specific dependencies
+edc-dataplane-core = { module = "org.eclipse.edc:data-plane-core", version.ref = "edc" }
+edc-dataplane-api-control-config = { module = "org.eclipse.edc:control-api-configuration", version.ref = "edc" }
+edc-dataplane-api-control-client = { module = "org.eclipse.edc:control-plane-api-client", version.ref = "edc" }
+edc-dataplane-selfregistration = { module = "org.eclipse.edc:data-plane-self-registration", version.ref = "edc" }
+edc-dataplane-http = { module = "org.eclipse.edc:data-plane-http", version.ref = "edc" }
+edc-dataplane-http-oauth2 = { module = "org.eclipse.edc:data-plane-http-oauth2", version.ref = "edc" }
+edc-dataplane-api-public = { module = "org.eclipse.edc:data-plane-public-api-v2", version.ref = "edc" }
+edc-dataplane-api-signaling = { module = "org.eclipse.edc:data-plane-signaling-api", version.ref = "edc" }
+edc-dataplane-iam = { module = "org.eclipse.edc:data-plane-iam", version.ref = "edc" }
+
+# API modules
+edc-api-version = { module = "org.eclipse.edc:version-api", version.ref = "edc" }
+edc-api-management = { module = "org.eclipse.edc:management-api", version.ref = "edc" }
+edc-api-management-test-fixtures = { module = "org.eclipse.edc:management-api-test-fixtures", version.ref = "edc" }
+edc-api-management-config = { module = "org.eclipse.edc:management-api-configuration", version.ref = "edc" }
+edc-api-management-edr = { module = "org.eclipse.edc:edr-cache-api", version.ref = "edc" }
+edc-api-observability = { module = "org.eclipse.edc:api-observability", version.ref = "edc" }
+
+# Lib dependencies
+edc-lib-transform = { module = "org.eclipse.edc:transform-lib", version.ref = "edc" }
+edc-lib-jws2020 = { module = "org.eclipse.edc:jws2020-lib", version.ref = "edc" }
+
+# SQL Dependencies
+edc-sql-assetindex = { module = "org.eclipse.edc:asset-index-sql", version.ref = "edc" }
+edc-sql-edrcache = { module = "org.eclipse.edc:edr-index-sql", version.ref = "edc" }
+edc-sql-contractdef = { module = "org.eclipse.edc:contract-definition-store-sql", version.ref = "edc" }
+edc-sql-contractneg = { module = "org.eclipse.edc:contract-negotiation-store-sql", version.ref = "edc" }
+edc-sql-policydef = { module = "org.eclipse.edc:policy-definition-store-sql", version.ref = "edc" }
+edc-sql-policymonitor = { module = "org.eclipse.edc:policy-monitor-store-sql", version.ref = "edc" }
+edc-sql-transferprocess = { module = "org.eclipse.edc:transfer-process-store-sql", version.ref = "edc" }
+edc-sql-core = { module = "org.eclipse.edc:sql-core", version.ref = "edc" }
+edc-sql-lease = { module = "org.eclipse.edc:sql-lease", version.ref = "edc" }
+edc-sql-pool = { module = "org.eclipse.edc:sql-pool-apache-commons", version.ref = "edc" }
+edc-sql-transactionlocal = { module = "org.eclipse.edc:transaction-local", version.ref = "edc" }
+edc-sql-accesstokendata = { module = "org.eclipse.edc:accesstokendata-store-sql", version.ref = "edc" }
+edc-sql-dataplane = { module = "org.eclipse.edc:data-plane-store-sql", version.ref = "edc" }
+edc-sql-dataplane-instancestore = { module = "org.eclipse.edc:data-plane-instance-store-sql", version.ref = "edc" }
+
+# Third-part dependencies
+postgres = { module = "org.postgresql:postgresql", version.ref = "postgres" }
+
+# add some
+
+[bundles]
+dpf = ["edc-dpf-selector-core", "edc-spi-dataplane-selector", "edc-dpf-selector-control-api", "edc-dpf-signaling-client", "edc-dpf-transfer-signaling"]
+
+controlplane = ["edc-configuration-filesystem", "edc-controlplane-core", "edc-auth-tokenbased", "edc-auth-configuration", "edc-policy-monitor-core",
+    "edc-api-management", "edc-api-management-config", "edc-api-management-edr", "edc-api-management-dataplaneselector",
+    "edc-api-observability", "edc-dsp", "edc-spi-jwt", "edc-http", "edc-controlplane-callback-dispatcher-event",
+    "edc-controlplane-callback-dispatcher-http", "edc-identity-core-did", "edc-iam-mock", "edc-identity-trust-transform",
+    "edc-api-control-configuration", "edc-lib-transform", "edc-identity-vc-ldp", "edc-did-web", "edc-lib-jws2020", "edc-core-edrstore",
+    "edc-edr-storereceiver", "edc-token-core"]
+
+dataplane = ["edc-configuration-filesystem", "edc-jersey-core", "edc-jetty-core", "edc-dataplane-core", "edc-dataplane-api-control-config", "edc-dataplane-api-control-client", "edc-dataplane-selfregistration",
+    "edc-dataplane-http", "edc-dataplane-http-oauth2", "edc-dataplane-api-public", "edc-dataplane-api-signaling", "edc-dataplane-iam", "edc-token-core"]
+
+sql-controlplane = ["edc-sql-assetindex", "edc-sql-contractdef", "edc-sql-contractneg", "edc-sql-policydef",
+    "edc-sql-edrcache", "edc-sql-transferprocess", "edc-sql-dataplane-instancestore", "edc-sql-core", "edc-sql-lease", "edc-sql-policymonitor",
+    "edc-sql-pool", "edc-sql-transactionlocal", "postgres"]
+sql-dataplane = ["edc-sql-accesstokendata", "edc-sql-dataplane", "edc-sql-core", "edc-sql-lease", "edc-sql-pool", "edc-sql-transactionlocal", "edc-sql-dataplane-instancestore", "postgres"]
+
+
+[plugins]
+docker = { id = "com.bmuschko.docker-remote-api", version = "9.4.0" }
diff --git a/runtimes/010/controlplane-010/build.gradle.kts b/runtimes/010/controlplane-010/build.gradle.kts
new file mode 100644
index 0000000..f29f723
--- /dev/null
+++ b/runtimes/010/controlplane-010/build.gradle.kts
@@ -0,0 +1,64 @@
+/*
+ *  Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+ *
+ *  This program and the accompanying materials are made available under the
+ *  terms of the Apache License, Version 2.0 which is available at
+ *  https://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *
+ *  Contributors:
+ *       Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
+ *
+ */
+
+import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
+import com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin
+
+plugins {
+    id("application")
+    alias(libs.plugins.shadow)
+    alias(libs.plugins.docker)
+}
+
+dependencies {
+    runtimeOnly(libs010.edc.spi.core) // we need some constants
+
+    runtimeOnly(libs010.bundles.controlplane)
+    runtimeOnly(libs010.edc.core.connector)
+
+    runtimeOnly(libs010.bundles.dpf)
+    runtimeOnly(libs010.edc.api.version)
+    // uncomment the following lines to compile with Hashicorp Vault and Postgres persistence
+    // runtimeOnly(libs010.edc.vault.hashicorp)
+    runtimeOnly(libs010.bundles.sql.controlplane)
+
+}
+
+tasks.withType<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar> {
+    exclude("**/pom.properties", "**/pom.xml")
+    mergeServiceFiles()
+    archiveFileName.set("${project.name}.jar")
+}
+
+application {
+    mainClass.set("org.eclipse.edc.boot.system.runtime.BaseRuntime")
+}
+
+edcBuild {
+    publish.set(false)
+}
+
+// configure the "dockerize" task
+tasks.register("dockerize", DockerBuildImage::class) {
+    val dockerContextDir = project.projectDir
+    dockerFile.set(file("$dockerContextDir/src/main/docker/Dockerfile"))
+    images.add("${project.name}:${libs010.versions.edc.get()}")
+    images.add("${project.name}:latest")
+    // specify platform with the -Dplatform flag:
+    if (System.getProperty("platform") != null)
+        platform.set(System.getProperty("platform"))
+    buildArgs.put("JAR", "build/libs/${project.name}.jar")
+    inputDir.set(file(dockerContextDir))
+    dependsOn(tasks.named(ShadowJavaPlugin.SHADOW_JAR_TASK_NAME))
+}
diff --git a/runtimes/010/controlplane-010/src/main/docker/Dockerfile b/runtimes/010/controlplane-010/src/main/docker/Dockerfile
new file mode 100644
index 0000000..9abb670
--- /dev/null
+++ b/runtimes/010/controlplane-010/src/main/docker/Dockerfile
@@ -0,0 +1,18 @@
+# -buster is required to have apt available
+FROM eclipse-temurin:23.0.1_11-jre-alpine
+
+# Optional JVM arguments, such as memory settings
+ARG JVM_ARGS=""
+ARG JAR
+
+RUN apk --no-cache add curl
+
+WORKDIR /app
+
+COPY ${JAR} edc-controlplane.jar
+
+# Use "exec" for graceful termination (SIGINT) to reach JVM.
+# ARG can not be used in ENTRYPOINT so storing value in an ENV variable
+ENV ENV_JVM_ARGS=$JVM_ARGS
+# use the "exec" syntax so that SIGINT reaches the JVM -> graceful termination
+CMD ["sh", "-c", "exec java -Djava.security.egd=file:/dev/urandom -jar edc-controlplane.jar"]
diff --git a/runtimes/010/dataplane-010/build.gradle.kts b/runtimes/010/dataplane-010/build.gradle.kts
new file mode 100644
index 0000000..33f1084
--- /dev/null
+++ b/runtimes/010/dataplane-010/build.gradle.kts
@@ -0,0 +1,67 @@
+/*
+ *  Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+ *
+ *  This program and the accompanying materials are made available under the
+ *  terms of the Apache License, Version 2.0 which is available at
+ *  https://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *
+ *  Contributors:
+ *       Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
+ *
+ */
+
+import com.bmuschko.gradle.docker.tasks.image.DockerBuildImage
+import com.github.jengelman.gradle.plugins.shadow.ShadowJavaPlugin
+
+plugins {
+    id("application")
+    alias(libs.plugins.shadow)
+    alias(libs.plugins.docker)
+}
+
+dependencies {
+    implementation(libs010.edc.runtime.metamodel)
+    implementation(libs010.edc.boot.spi)
+    runtimeOnly(libs010.edc.spi.core)
+    runtimeOnly(libs010.edc.api.observability)
+    runtimeOnly(libs010.edc.core.connector)
+    runtimeOnly(libs010.edc.boot)
+    runtimeOnly(libs010.bundles.dataplane)
+    runtimeOnly(libs010.edc.jsonld) // needed by the DataPlaneSignalingApi
+    runtimeOnly(libs010.edc.dpf.selector.client) // for the selector service -> self registration
+
+    // uncomment the following lines to compile with Hashicorp Vault and Postgres persistence
+    // runtimeOnly(libs.edc.vault.hashicorp)
+    runtimeOnly(libs010.bundles.sql.dataplane)
+
+}
+
+tasks.withType<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar> {
+    exclude("**/pom.properties", "**/pom.xml")
+    mergeServiceFiles()
+    archiveFileName.set("${project.name}.jar")
+}
+
+application {
+    mainClass.set("org.eclipse.edc.boot.system.runtime.BaseRuntime")
+}
+
+edcBuild {
+    publish.set(false)
+}
+
+// configure the "dockerize" task
+tasks.register("dockerize", DockerBuildImage::class) {
+    val dockerContextDir = project.projectDir
+    dockerFile.set(file("$dockerContextDir/src/main/docker/Dockerfile"))
+    images.add("${project.name}:${libs010.versions.edc.get()}")
+    images.add("${project.name}:latest")
+    // specify platform with the -Dplatform flag:
+    if (System.getProperty("platform") != null)
+        platform.set(System.getProperty("platform"))
+    buildArgs.put("JAR", "build/libs/${project.name}.jar")
+    inputDir.set(file(dockerContextDir))
+    dependsOn(tasks.named(ShadowJavaPlugin.SHADOW_JAR_TASK_NAME))
+}
\ No newline at end of file
diff --git a/runtimes/010/dataplane-010/src/main/docker/Dockerfile b/runtimes/010/dataplane-010/src/main/docker/Dockerfile
new file mode 100644
index 0000000..c8e1ac6
--- /dev/null
+++ b/runtimes/010/dataplane-010/src/main/docker/Dockerfile
@@ -0,0 +1,18 @@
+# -buster is required to have apt available
+FROM eclipse-temurin:23.0.1_11-jre-alpine
+
+# Optional JVM arguments, such as memory settings
+ARG JVM_ARGS=""
+ARG JAR
+
+RUN apk --no-cache add curl
+
+WORKDIR /app
+
+COPY ${JAR} edc-dataplane.jar
+
+# Use "exec" for graceful termination (SIGINT) to reach JVM.
+# ARG can not be used in ENTRYPOINT so storing value in an ENV variable
+ENV ENV_JVM_ARGS=$JVM_ARGS
+# use the "exec" syntax so that SIGINT reaches the JVM -> graceful termination
+CMD ["sh", "-c", "exec java -Djava.security.egd=file:/dev/urandom -jar edc-dataplane.jar"]
diff --git a/runtimes/010/dataplane-010/src/main/java/org/eclipse/edc/compatibility/tests/VaultSeedExtension.java b/runtimes/010/dataplane-010/src/main/java/org/eclipse/edc/compatibility/tests/VaultSeedExtension.java
new file mode 100644
index 0000000..4b55527
--- /dev/null
+++ b/runtimes/010/dataplane-010/src/main/java/org/eclipse/edc/compatibility/tests/VaultSeedExtension.java
@@ -0,0 +1,48 @@
+/*
+ *  Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+ *
+ *  This program and the accompanying materials are made available under the
+ *  terms of the Apache License, Version 2.0 which is available at
+ *  https://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *
+ *  Contributors:
+ *       Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
+ *
+ */
+
+package org.eclipse.edc.compatibility.tests;
+
+import org.eclipse.edc.runtime.metamodel.annotation.Inject;
+import org.eclipse.edc.spi.security.Vault;
+import org.eclipse.edc.spi.system.ServiceExtension;
+import org.eclipse.edc.spi.system.ServiceExtensionContext;
+
+import java.util.Map;
+
+public class VaultSeedExtension implements ServiceExtension {
+
+
+    public static final String VAULT_TESTING_PREFIX = "testing.edc.vaults";
+
+    public static final String VAULT_TESTING_KEY = "key";
+    public static final String VAULT_TESTING_VALUE = "value";
+
+    @Inject
+    private Vault vault;
+    
+    @Override
+    public void initialize(ServiceExtensionContext context) {
+        ServiceExtension.super.initialize(context);
+
+        var config = context.getConfig(VAULT_TESTING_PREFIX);
+        var secrets = config.partition().map((partition) -> {
+            var key = partition.getString(VAULT_TESTING_KEY);
+            var value = partition.getString(VAULT_TESTING_VALUE);
+            return Map.entry(key, value);
+        }).toList();
+
+        secrets.forEach(secret -> vault.storeSecret(secret.getKey(), secret.getValue()));
+    }
+}
\ No newline at end of file
diff --git a/runtimes/010/dataplane-010/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/runtimes/010/dataplane-010/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension
new file mode 100644
index 0000000..56d825c
--- /dev/null
+++ b/runtimes/010/dataplane-010/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension
@@ -0,0 +1,15 @@
+#
+#  Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
+#
+#  This program and the accompanying materials are made available under the
+#  terms of the Apache License, Version 2.0 which is available at
+#  https://www.apache.org/licenses/LICENSE-2.0
+#
+#  SPDX-License-Identifier: Apache-2.0
+#
+#  Contributors:
+#       Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
+#
+#
+
+org.eclipse.edc.compatibility.tests.VaultSeedExtension
\ No newline at end of file
diff --git a/settings.gradle.kts b/settings.gradle.kts
index d34cbd6..faa8462 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -38,6 +38,9 @@ dependencyResolutionManagement {
         create("stableLibs") {
             from(files("./gradle/libs.stable.versions.toml"))
         }
+        create("libs010") {
+            from(files("./gradle/libs.0.10.0.versions.toml"))
+        }
     }
 }
 
@@ -46,4 +49,6 @@ include(":runtimes:snapshot:controlplane-snapshot")
 include(":runtimes:snapshot:dataplane-snapshot")
 include(":runtimes:stable:controlplane-stable")
 include(":runtimes:stable:dataplane-stable")
+include(":runtimes:010:controlplane-010")
+include(":runtimes:010:dataplane-010")
 include(":tests:compatibility-tests")
\ No newline at end of file
diff --git a/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/AbstractTest.java b/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/AbstractTest.java
new file mode 100644
index 0000000..5354718
--- /dev/null
+++ b/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/AbstractTest.java
@@ -0,0 +1,128 @@
+package org.eclipse.edc.compatibility.tests;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE;
+import static org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance.createDatabase;
+import static org.eclipse.edc.util.io.Ports.getFreePort;
+import static org.mockserver.integration.ClientAndServer.startClientAndServer;
+
+import org.eclipse.edc.compatibility.tests.fixtures.BaseParticipant;
+import org.eclipse.edc.compatibility.tests.fixtures.EdcDockerRuntimes;
+import org.eclipse.edc.compatibility.tests.fixtures.LocalParticipant;
+import org.eclipse.edc.compatibility.tests.fixtures.RemoteParticipant;
+import org.eclipse.edc.compatibility.tests.fixtures.Runtimes;
+import org.eclipse.edc.connector.controlplane.test.system.utils.Participant;
+import org.eclipse.edc.junit.extensions.RuntimeExtension;
+import org.eclipse.edc.junit.extensions.RuntimePerClassExtension;
+import org.eclipse.edc.spi.security.Vault;
+import org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.testcontainers.containers.PostgreSQLContainer;
+
+public abstract class AbstractTest {
+    private static final List<String> PROTOCOLS_TO_TEST = List.of("dataspace-protocol-http", "dataspace-protocol-http:2024/1");
+    private static final LocalParticipant LOCAL_PARTICIPANT = LocalParticipant.Builder.newInstance().name("local").id("local").build();
+    private static final PostgreSQLContainer<?> PG = new PostgreSQLContainer<>("postgres:16.4").withUsername("postgres").withPassword("password").withCreateContainerCmdModifier(cmd -> cmd.withName("postgres"));
+    protected static List<Participant> participants = new ArrayList<>();
+    protected ClientAndServer providerDataSource = startClientAndServer(getFreePort());
+
+    @BeforeAll
+    static void beforeAll() {
+
+    }
+
+    @BeforeEach
+    void beforeEach() {
+        providerDataSource.reset();
+    }
+
+    @Order(0)
+    @RegisterExtension
+    static final BeforeAllCallback INIT_DATABASE = context -> {
+        PG.setPortBindings(List.of("5432:5432"));
+        PG.start();
+        createDatabase(LOCAL_PARTICIPANT.getName());
+
+    };
+    @Order(1)
+    @RegisterExtension
+    static final BeforeAllCallback INIT_CONTAINERS = context -> {
+        participants.addAll(List.of(LOCAL_PARTICIPANT, createRemoteParticipant(EdcDockerRuntimes.STABLE_CONNECTOR, PostgresqlEndToEndInstance::createDatabase),
+                createRemoteParticipant(EdcDockerRuntimes.STABLE_CONNECTOR_0_10_0, PostgresqlEndToEndInstance::createDatabase)));
+    };
+
+    private static RemoteParticipant createRemoteParticipant(EdcDockerRuntimes runtime, Consumer<String> callback) {
+        var remoteParticipant = RemoteParticipant.Builder.newInstance().name(runtime.name().toLowerCase()).id(runtime.name().toLowerCase()).build();
+        callback.accept(runtime.name().toLowerCase());
+        runtime.start(remoteParticipant.controlPlaneEnv(), remoteParticipant.dataPlaneEnv());
+        return remoteParticipant;
+    }
+
+    @Order(2)
+    @RegisterExtension
+    protected static final RuntimeExtension LOCAL_CONTROL_PLANE = new RuntimePerClassExtension(Runtimes.CONTROL_PLANE.create("local-control-plane", LOCAL_PARTICIPANT.controlPlanePostgresConfiguration()));
+
+    @Order(3)
+    @RegisterExtension
+    protected static final RuntimeExtension LOCAL_DATA_PLANE = new RuntimePerClassExtension(Runtimes.DATA_PLANE.create("local-data-plane", LOCAL_PARTICIPANT.dataPlanePostgresConfiguration()));
+
+
+    @BeforeAll
+    static void storeKeys() {
+        var vault = LOCAL_DATA_PLANE.getService(Vault.class);
+        vault.storeSecret("private-key", LOCAL_PARTICIPANT.getPrivateKey());
+        vault.storeSecret("public-key", LOCAL_PARTICIPANT.getPublicKey());
+    }
+
+    protected void initialise(BaseParticipant consumer, BaseParticipant provider) {
+        initialise(consumer, provider, "dataspace-protocol-http");
+    }
+
+    protected void initialise(BaseParticipant consumer, BaseParticipant provider, String protocol) {
+        provider.setProtocol(protocol);
+        consumer.setProtocol(protocol);
+        provider.waitForDataPlane();
+        providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response("/source").withBody("data"));
+    }
+
+    protected Map<String, Object> createDataAddress(ClientAndServer server, String dataAddressPath) {
+        return Map.of(EDC_NAMESPACE + "name", "testing", EDC_NAMESPACE + "baseUrl", "http://localhost:" + server.getPort() + dataAddressPath, EDC_NAMESPACE + "type", "HttpData", EDC_NAMESPACE + "proxyQueryParams", "true");
+    }
+
+    protected static class ParticipantsArgProvider implements ArgumentsProvider {
+        @Override
+        public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
+            return createArgumentMatrix().stream();
+        }
+
+        private List<Arguments> createArgumentMatrix() {
+            List<Arguments> testArguments = new ArrayList<>();
+            for (int i = 0; i < participants.size(); i++) {
+                for (int j = i + 1; j < participants.size(); j++) {
+                    for (String protocol : PROTOCOLS_TO_TEST) {
+                        testArguments.add(Arguments.of(participants.get(i), participants.get(j), protocol));
+                        testArguments.add(Arguments.of(participants.get(j), participants.get(i), protocol));
+                    }
+                }
+            }
+            return testArguments;
+        }
+    }
+
+
+}
diff --git a/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/BaseParticipant.java b/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/BaseParticipant.java
index c2fbea4..b74f2f5 100644
--- a/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/BaseParticipant.java
+++ b/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/BaseParticipant.java
@@ -14,15 +14,10 @@
 
 package org.eclipse.edc.compatibility.tests.fixtures;
 
-import io.restassured.common.mapper.TypeRef;
-import org.assertj.core.api.ThrowingConsumer;
-import org.eclipse.edc.connector.controlplane.test.system.utils.Participant;
-import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates;
-import org.eclipse.edc.spi.types.domain.DataAddress;
-
 import java.net.URI;
 import java.util.Map;
 import java.util.Objects;
+import java.util.UUID;
 
 import static io.restassured.RestAssured.given;
 import static io.restassured.http.ContentType.JSON;
@@ -31,6 +26,16 @@
 import static org.eclipse.edc.junit.testfixtures.TestUtils.getResourceFileContentAsString;
 import static org.eclipse.edc.util.io.Ports.getFreePort;
 
+import io.restassured.common.mapper.TypeRef;
+import io.restassured.http.ContentType;
+import jakarta.json.Json;
+import jakarta.json.JsonObject;
+import jakarta.json.JsonObjectBuilder;
+import org.assertj.core.api.ThrowingConsumer;
+import org.eclipse.edc.connector.controlplane.test.system.utils.Participant;
+import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates;
+import org.eclipse.edc.spi.types.domain.DataAddress;
+
 public abstract class BaseParticipant extends Participant {
 
     protected static String privateKey = getResourceFileContentAsString("certs/key.pem");
@@ -43,6 +48,7 @@ public abstract class BaseParticipant extends Participant {
     protected final URI dataPlanePublic = URI.create("http://localhost:" + getFreePort() + "/public");
     protected final URI controlPlaneVersion = URI.create("http://localhost:" + getFreePort() + "/version");
     protected final URI dataPlaneVersion = URI.create("http://localhost:" + getFreePort() + "/version");
+    protected final int httpProvisionerPort = getFreePort();
 
     public String getPrivateKey() {
         return privateKey;
@@ -60,33 +66,18 @@ public String getPublicKey() {
      * @param bodyAssertion assertion to be verified on the body
      */
     public void pullData(DataAddress edr, Map<String, String> queryParams, ThrowingConsumer<String> bodyAssertion) {
-        var data = given()
-                .baseUri(edr.getStringProperty("endpoint"))
-                .header("Authorization", edr.getStringProperty("authorization"))
-                .queryParams(queryParams)
-                .when()
-                .get()
-                .then()
-                .log().ifError()
-                .statusCode(200)
-                .extract().body().asString();
+        var data = given().baseUri(edr.getStringProperty("endpoint")).header("Authorization", edr.getStringProperty("authorization")).queryParams(queryParams).when().get().then().log().ifError().statusCode(200).extract().body().asString();
 
         assertThat(data).satisfies(bodyAssertion);
     }
 
     public void waitForDataPlane() {
-        await().atMost(timeout)
-                .untilAsserted(() -> {
-                    var jp = baseManagementRequest()
-                            .get("/v3/dataplanes")
-                            .then()
-                            .statusCode(200)
-                            .log().ifValidationFails()
-                            .extract().body().jsonPath();
-
-                    var state = jp.getString("state");
-                    assertThat(state).isEqualTo("[AVAILABLE]");
-                });
+        await().atMost(timeout).untilAsserted(() -> {
+            var jp = baseManagementRequest().get("/v3/dataplanes").then().statusCode(200).log().ifValidationFails().extract().body().jsonPath();
+
+            var state = jp.getString("state");
+            assertThat(state).isEqualTo("[AVAILABLE]");
+        });
 
     }
 
@@ -94,19 +85,12 @@ public void waitForDataPlane() {
      * Get the EDR from the EDR cache by transfer process id.
      *
      * @param transferProcessId The transfer process id
+     *
      * @return The cached {@link DataAddress}
      */
     public DataAddress getEdr(String transferProcessId) {
-        var dataAddressRaw = baseManagementRequest()
-                .contentType(JSON)
-                .when()
-                .get("/v3/edrs/{id}/dataaddress", transferProcessId)
-                .then()
-                .log().ifError()
-                .statusCode(200)
-                .contentType(JSON)
-                .extract().body().as(new TypeRef<Map<String, Object>>() {
-                });
+        var dataAddressRaw = baseManagementRequest().contentType(JSON).when().get("/v3/edrs/{id}/dataaddress", transferProcessId).then().log().ifError().statusCode(200).contentType(JSON).extract().body().as(new TypeRef<Map<String, Object>>() {
+        });
 
 
         var builder = DataAddress.Builder.newInstance();
@@ -116,10 +100,38 @@ public DataAddress getEdr(String transferProcessId) {
     }
 
     public void awaitTransferToBeInState(String transferProcessId, TransferProcessStates state) {
-        await().atMost(timeout).until(
-                () -> getTransferProcessState(transferProcessId),
-                it -> Objects.equals(it, state.name())
-        );
+        await().atMost(timeout).until(() -> getTransferProcessState(transferProcessId), it -> Objects.equals(it, state.name()));
+    }
+
+    public void deprovisionTransfer(String transferProcessId) {
+        JsonObjectBuilder requestBodyBuilder = Json.createObjectBuilder().add("@context", Json.createObjectBuilder().add("@vocab", "https://w3id.org/edc/v0.0.1/ns/")).add("@type", "DeprovisionTransfer").add("reason", "any reason");
+        this.baseManagementRequest().contentType(ContentType.JSON).body(requestBodyBuilder.build()).when().post("/v3/transferprocesses/{id}/deprovision", new Object[] {transferProcessId}).then().log().ifError().statusCode(204);
+    }
+
+    public void terminateContractNegotiation(String contractNegotiationId) {
+        JsonObjectBuilder requestBodyBuilder =
+                Json.createObjectBuilder().add("@context", Json.createObjectBuilder().add("@vocab", "https://w3id.org/edc/v0.0.1/ns/")).add("@type", "TerminateNegotiation").add("@id", contractNegotiationId).add("reason", "any reason");
+        this.baseManagementRequest().contentType(ContentType.JSON).body(requestBodyBuilder.build()).when().post("/v3/contractnegotiations/{id}/terminate", new Object[] {contractNegotiationId}).then().log().ifError().statusCode(204);
+    }
+
+    public String createResource(Map<String, Object> dataAddress, JsonObject policy) {
+        String assetId = UUID.randomUUID().toString();
+        this.createAsset(assetId, Map.of("description", "description"), dataAddress);
+        var contractPolicyId = this.createPolicyDefinition(policy);
+        var noConstraintPolicyId = this.createPolicyDefinition(policy);
+        this.createContractDefinition(assetId, UUID.randomUUID().toString(), noConstraintPolicyId, contractPolicyId);
+        return assetId;
+    }
+
+    public JsonObject getOfferForAsset(Participant provider, String assetId) {
+        JsonObject dataset = this.getDatasetForAsset(provider, assetId);
+        JsonObject policy = dataset.getJsonArray("http://www.w3.org/ns/odrl/2/hasPolicy").get(0).asJsonObject();
+        return Json.createObjectBuilder(policy).add("http://www.w3.org/ns/odrl/2/assigner", Json.createObjectBuilder().add("@id", provider.getId())).add("http://www.w3.org/ns/odrl/2/target", Json.createObjectBuilder().add("@id", dataset.get("@id")))
+                .build();
+    }
+
+    public String getContractNegotiationField(String negotiationId, String fieldName) {
+        return this.baseManagementRequest().contentType(ContentType.JSON).when().get("/v3/contractnegotiations/{id}", new Object[] {negotiationId}).then().statusCode(200).extract().body().jsonPath().getString(fieldName);
     }
 
     public static class Builder<P extends BaseParticipant, B extends Participant.Builder<P, B>> extends Participant.Builder<P, B> {
diff --git a/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/EdcDockerRuntimes.java b/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/EdcDockerRuntimes.java
index 3790958..96335eb 100644
--- a/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/EdcDockerRuntimes.java
+++ b/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/EdcDockerRuntimes.java
@@ -14,32 +14,31 @@
 
 package org.eclipse.edc.compatibility.tests.fixtures;
 
+import java.util.Map;
+
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.wait.strategy.Wait;
-
-import java.util.Map;
+import org.yaml.snakeyaml.util.Tuple;
 
 public enum EdcDockerRuntimes {
+    STABLE_CONNECTOR_0_10_0("controlplane-010:latest", "dataplane-010:latest"),
 
-    CONTROL_PLANE(
-            "controlplane-stable:latest"
-    ),
-
-    DATA_PLANE(
-            "dataplane-stable:latest"
-    );
+    STABLE_CONNECTOR("controlplane-stable:latest", "dataplane-stable:latest");
 
-    private final String image;
+    private final String controlPlaneImage;
+    private final String dataPlaneImage;
 
-    EdcDockerRuntimes(String image) {
-        this.image = image;
+    EdcDockerRuntimes(String controlPlaneImage, String dataPlaneImage) {
+        this.controlPlaneImage = controlPlaneImage;
+        this.dataPlaneImage = dataPlaneImage;
     }
 
-    public GenericContainer<?> create(String name, Map<String, String> env) {
-        return new GenericContainer<>(image)
-                .withCreateContainerCmdModifier(cmd -> cmd.withName(name))
-                .withNetworkMode("host")
-                .waitingFor(Wait.forLogMessage(".*Runtime .* ready.*", 1))
-                .withEnv(env);
+    public Tuple<GenericContainer<?>, GenericContainer<?>> start(Map<String, String> controlPlaneEnv, Map<String, String> dataPlaneEnv) {
+        var controlPlane =
+                new GenericContainer<>(controlPlaneImage).withCreateContainerCmdModifier(cmd -> cmd.withName(this.name() + "-controlplane")).withNetworkMode("host").waitingFor(Wait.forLogMessage(".*Runtime .* ready.*", 1)).withEnv(controlPlaneEnv);
+        var dataPlane = new GenericContainer<>(dataPlaneImage).withCreateContainerCmdModifier(cmd -> cmd.withName(this.name() + "-dataplane")).withNetworkMode("host").waitingFor(Wait.forLogMessage(".*Runtime .* ready.*", 1)).withEnv(dataPlaneEnv);
+        controlPlane.start();
+        dataPlane.start();
+        return new Tuple<>(controlPlane, dataPlane);
     }
 }
diff --git a/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/LocalParticipant.java b/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/LocalParticipant.java
index b7d6102..89fa718 100644
--- a/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/LocalParticipant.java
+++ b/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/LocalParticipant.java
@@ -19,26 +19,24 @@
 
 import static org.eclipse.edc.boot.BootServicesExtension.PARTICIPANT_ID;
 import static org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance.defaultDatasourceConfiguration;
-import static org.eclipse.edc.util.io.Ports.getFreePort;
 
 public class LocalParticipant extends BaseParticipant {
 
     private static final String API_KEY = "password";
 
-    private final int httpProvisionerPort = getFreePort();
-
     public Map<String, String> controlPlaneConfiguration() {
         return new HashMap<>() {
             {
                 put(PARTICIPANT_ID, id);
+                put("web.http.management.auth.key", API_KEY);
+                put("web.http.management.auth.type", "tokenbased");
+                put("web.http.management.auth.context", "management-api");
                 put("web.http.port", String.valueOf(controlPlaneDefault.getPort()));
                 put("web.http.path", "/api");
                 put("web.http.protocol.port", String.valueOf(controlPlaneProtocol.get().getPort()));
                 put("web.http.protocol.path", controlPlaneProtocol.get().getPath());
                 put("web.http.management.port", String.valueOf(controlPlaneManagement.get().getPort()));
                 put("web.http.management.path", controlPlaneManagement.get().getPath());
-                put("web.http.management.auth.type", "tokenbased");
-                put("web.http.management.auth.key", API_KEY);
                 put("web.http.version.port", String.valueOf(controlPlaneVersion.getPort()));
                 put("web.http.version.path", controlPlaneVersion.getPath());
                 put("web.http.control.port", String.valueOf(controlPlaneControl.getPort()));
diff --git a/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/RemoteParticipant.java b/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/RemoteParticipant.java
index 01c7d52..b2d3f2b 100644
--- a/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/RemoteParticipant.java
+++ b/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/RemoteParticipant.java
@@ -37,6 +37,7 @@ public Map<String, String> controlPlaneEnv() {
                 put("WEB_HTTP_CONTROL_PORT", String.valueOf(controlPlaneControl.getPort()));
                 put("WEB_HTTP_CONTROL_PATH", controlPlaneControl.getPath());
                 put("EDC_DSP_CALLBACK_ADDRESS", controlPlaneProtocol.get().toString());
+                put("EDC_TRANSFER_PROXY_ENDPOINT", dataPlanePublic.toString());
                 put("EDC_DATASOURCE_DEFAULT_URL", "jdbc:postgresql://localhost:5432/%s".formatted(getId()));
                 put("EDC_DATASOURCE_DEFAULT_USER", "postgres");
                 put("EDC_DATASOURCE_DEFAULT_PASSWORD", "password");
@@ -48,13 +49,18 @@ public Map<String, String> controlPlaneEnv() {
     public Map<String, String> dataPlaneEnv() {
         return new HashMap<>() {
             {
+                put("WEB_HTTP_MANAGEMENT_AUTH_KEY", API_KEY);
+                put("WEB_HTTP_MANAGEMENT_AUTH_TYPE", "tokenbased");
+                put("WEB_HTTP_MANAGEMENT_AUTH_CONTEXT", "management-api");
                 put("EDC_PARTICIPANT_ID", id);
                 put("EDC_COMPONENT_ID", id);
-                put("EDC_API_AUTH_KEY", API_KEY);
                 put("WEB_HTTP_PORT", String.valueOf(dataPlaneDefault.getPort()));
                 put("WEB_HTTP_PATH", "/api");
                 put("WEB_HTTP_VERSION_PORT", String.valueOf(dataPlaneVersion.getPort()));
                 put("WEB_HTTP_VERSION_PATH", dataPlaneVersion.getPath());
+                put("WEB_HTTP_PUBLIC_PORT", String.valueOf(dataPlanePublic.getPort()));
+                put("WEB_HTTP_PUBLIC_PATH", "/public");
+                put("EDC_DATAPLANE_API_PUBLIC_BASEURL", dataPlanePublic + "/v2/");
                 put("WEB_HTTP_CONTROL_PORT", String.valueOf(dataPlaneControl.getPort()));
                 put("WEB_HTTP_CONTROL_PATH", dataPlaneControl.getPath());
                 put("EDC_DATASOURCE_DEFAULT_URL", "jdbc:postgresql://localhost:5432/%s".formatted(getId()));
diff --git a/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/Runtimes.java b/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/Runtimes.java
index 2357c3c..98bceab 100644
--- a/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/Runtimes.java
+++ b/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/fixtures/Runtimes.java
@@ -14,21 +14,17 @@
 
 package org.eclipse.edc.compatibility.tests.fixtures;
 
-import org.eclipse.edc.junit.extensions.ClasspathReader;
-import org.eclipse.edc.junit.extensions.EmbeddedRuntime;
-
 import java.net.URL;
 import java.util.Map;
 
+import org.eclipse.edc.junit.extensions.ClasspathReader;
+import org.eclipse.edc.junit.extensions.EmbeddedRuntime;
+
 public enum Runtimes {
 
-    CONTROL_PLANE(
-            ":runtimes:snapshot:controlplane-snapshot"
-    ),
+    CONTROL_PLANE(":runtimes:snapshot:controlplane-snapshot"),
 
-    DATA_PLANE(
-            ":runtimes:snapshot:dataplane-snapshot"
-    );
+    DATA_PLANE(":runtimes:snapshot:dataplane-snapshot");
 
     private final String[] modules;
     private URL[] classpathEntries;
diff --git a/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/transfer/TransferEndToEndTest.java b/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/transfer/TransferEndToEndTest.java
index 03261f4..8a60c49 100644
--- a/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/transfer/TransferEndToEndTest.java
+++ b/tests/compatibility-tests/src/test/java/org/eclipse/edc/compatibility/tests/transfer/TransferEndToEndTest.java
@@ -14,169 +14,83 @@
 
 package org.eclipse.edc.compatibility.tests.transfer;
 
-import jakarta.json.JsonObject;
-import org.eclipse.edc.compatibility.tests.fixtures.BaseParticipant;
-import org.eclipse.edc.compatibility.tests.fixtures.EdcDockerRuntimes;
-import org.eclipse.edc.compatibility.tests.fixtures.LocalParticipant;
-import org.eclipse.edc.compatibility.tests.fixtures.RemoteParticipant;
-import org.eclipse.edc.compatibility.tests.fixtures.Runtimes;
-import org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures;
-import org.eclipse.edc.junit.annotations.EndToEndTest;
-import org.eclipse.edc.junit.extensions.RuntimeExtension;
-import org.eclipse.edc.junit.extensions.RuntimePerClassExtension;
-import org.eclipse.edc.spi.security.Vault;
-import org.jetbrains.annotations.NotNull;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Order;
-import org.junit.jupiter.api.extension.BeforeAllCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.ArgumentsProvider;
-import org.junit.jupiter.params.provider.ArgumentsSource;
-import org.mockserver.integration.ClientAndServer;
-import org.mockserver.model.HttpRequest;
-import org.mockserver.model.HttpResponse;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.PostgreSQLContainer;
-
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
-import java.util.stream.Stream;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.awaitility.Awaitility.await;
-import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.noConstraintPolicy;
+import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.DEPROVISIONED;
 import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTED;
 import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDED;
+import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE;
 import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE;
-import static org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance.createDatabase;
 import static org.eclipse.edc.util.io.Ports.getFreePort;
 import static org.mockserver.integration.ClientAndServer.startClientAndServer;
-
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import jakarta.json.Json;
+import jakarta.json.JsonArrayBuilder;
+import jakarta.json.JsonObject;
+import org.eclipse.edc.compatibility.tests.AbstractTest;
+import org.eclipse.edc.compatibility.tests.fixtures.BaseParticipant;
+import org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures;
+import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessStarted;
+import org.eclipse.edc.junit.annotations.EndToEndTest;
+import org.eclipse.edc.spi.event.EventEnvelope;
+import org.eclipse.edc.spi.types.domain.DataAddress;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ArgumentsSource;
+import org.mockserver.model.HttpRequest;
+import org.mockserver.model.HttpResponse;
+import org.mockserver.model.HttpStatusCode;
+import org.mockserver.model.MediaType;
 
 @EndToEndTest
-public class TransferEndToEndTest {
-
-    protected static final LocalParticipant LOCAL_PARTICIPANT = LocalParticipant.Builder.newInstance()
-            .name("local")
-            .id("local")
-            .build();
-
-    protected static final RemoteParticipant REMOTE_PARTICIPANT = RemoteParticipant.Builder.newInstance()
-            .name("remote")
-            .id("remote")
-            .build();
-
-    @Order(1)
-    @RegisterExtension
-    static final RuntimeExtension LOCAL_CONTROL_PLANE = new RuntimePerClassExtension(
-            Runtimes.CONTROL_PLANE.create("local-control-plane", LOCAL_PARTICIPANT.controlPlanePostgresConfiguration()));
-
-    @Order(2)
-    @RegisterExtension
-    static final RuntimeExtension LOCAL_DATA_PLANE = new RuntimePerClassExtension(
-            Runtimes.DATA_PLANE.create("local-data-plane", LOCAL_PARTICIPANT.dataPlanePostgresConfiguration()));
-
-
-    private static final GenericContainer<?> CONTROL_PLANE = EdcDockerRuntimes.CONTROL_PLANE.create("controlplane", REMOTE_PARTICIPANT.controlPlaneEnv());
-
-    private static final GenericContainer<?> DATA_PLANE = EdcDockerRuntimes.DATA_PLANE.create("dataplane", REMOTE_PARTICIPANT.dataPlaneEnv());
-
-    private static final PostgreSQLContainer<?> PG = new PostgreSQLContainer<>("postgres:16.4")
-            .withUsername("postgres")
-            .withPassword("password")
-            .withCreateContainerCmdModifier(cmd -> cmd.withName("postgres"));
-
-    @Order(0)
-    @RegisterExtension
-    static final BeforeAllCallback CREATE_DATABASES = context -> {
-        PG.setPortBindings(List.of("5432:5432"));
-        PG.start();
-        createDatabase(LOCAL_PARTICIPANT.getName());
-        createDatabase(REMOTE_PARTICIPANT.getName());
-    };
-
-    private static ClientAndServer providerDataSource;
-
-    @BeforeAll
-    static void beforeAll() {
-        CONTROL_PLANE.start();
-        DATA_PLANE.start();
-        providerDataSource = startClientAndServer(getFreePort());
-    }
-
-    private static @NotNull Map<String, Object> httpSourceDataAddress() {
-        return Map.of(
-                EDC_NAMESPACE + "name", "transfer-test",
-                EDC_NAMESPACE + "baseUrl", "http://localhost:" + providerDataSource.getPort() + "/source",
-                EDC_NAMESPACE + "type", "HttpData",
-                EDC_NAMESPACE + "proxyQueryParams", "true"
-        );
-    }
-
-    @BeforeEach
-    void storeKeys() {
-        var vault = LOCAL_DATA_PLANE.getService(Vault.class);
-        vault.storeSecret("private-key", LOCAL_PARTICIPANT.getPrivateKey());
-        vault.storeSecret("public-key", LOCAL_PARTICIPANT.getPublicKey());
-    }
+class TransferEndToEndTest extends AbstractTest {
+    private static final ObjectMapper MAPPER = new ObjectMapper();
 
     @ParameterizedTest
     @ArgumentsSource(ParticipantsArgProvider.class)
-    void httpPullTransfer(BaseParticipant consumer, BaseParticipant provider, String protocol) {
-        consumer.setProtocol(protocol);
-        provider.setProtocol(protocol);
-        provider.waitForDataPlane();
-        providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data"));
-        var assetId = UUID.randomUUID().toString();
-        var sourceDataAddress = httpSourceDataAddress();
-        createResourcesOnProvider(provider, assetId, PolicyFixtures.contractExpiresIn("5s"), sourceDataAddress);
-
-        var transferProcessId = consumer.requestAssetFrom(assetId, provider)
-                .withTransferType("HttpData-PULL")
-                .execute();
+    void httpPullTransferWithExpiry(BaseParticipant consumer, BaseParticipant provider, String protocol) {
+        initialise(consumer, provider, protocol);
+        var assetId = provider.createResource(createDataAddress(providerDataSource, "/source"), PolicyFixtures.contractExpiresIn("5s"));
+        var transferProcessId = consumer.requestAssetFrom(assetId, provider).withTransferType("HttpData-PULL").execute();
 
         consumer.awaitTransferToBeInState(transferProcessId, STARTED);
-
-        var edr = await().atMost(consumer.getTimeout())
-                .until(() -> consumer.getEdr(transferProcessId), Objects::nonNull);
+        var edr = await().atMost(consumer.getTimeout()).until(() -> consumer.getEdr(transferProcessId), Objects::nonNull);
 
         // Do the transfer
         var msg = UUID.randomUUID().toString();
-        await().atMost(consumer.getTimeout())
-                .untilAsserted(() -> consumer.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));
+        await().atMost(consumer.getTimeout()).untilAsserted(() -> consumer.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));
 
         // checks that the EDR is gone once the contract expires
-        await().atMost(consumer.getTimeout())
-                .untilAsserted(() -> assertThatThrownBy(() -> consumer.getEdr(transferProcessId)));
+        await().atMost(consumer.getTimeout()).untilAsserted(() -> assertThatThrownBy(() -> consumer.getEdr(transferProcessId)));
 
         // checks that transfer fails
-        await().atMost(consumer.getTimeout())
-                .untilAsserted(() -> assertThatThrownBy(() -> consumer.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))));
+        await().atMost(consumer.getTimeout()).untilAsserted(() -> assertThatThrownBy(() -> consumer.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))));
 
         providerDataSource.verify(HttpRequest.request("/source").withMethod("GET"));
 
     }
 
-    @ParameterizedTest
+    //@ParameterizedTest
     @ArgumentsSource(ParticipantsArgProvider.class)
     void suspendAndResume_httpPull_dataTransfer(BaseParticipant consumer, BaseParticipant provider, String protocol) {
-        consumer.setProtocol(protocol);
-        provider.setProtocol(protocol);
-        provider.waitForDataPlane();
-        providerDataSource.when(HttpRequest.request()).respond(HttpResponse.response().withBody("data"));
-        var assetId = UUID.randomUUID().toString();
-        createResourcesOnProvider(provider, assetId, PolicyFixtures.noConstraintPolicy(), httpSourceDataAddress());
+        initialise(consumer, provider, protocol);
+        Map<String, Object> dataAddress = createDataAddress(providerDataSource, "/source");
+        var assetId = provider.createResource(dataAddress, PolicyFixtures.noConstraintPolicy());
 
-        var transferProcessId = consumer.requestAssetFrom(assetId, provider)
-                .withTransferType("HttpData-PULL")
-                .execute();
+        var transferProcessId = consumer.requestAssetFrom(assetId, provider).withTransferType("HttpData-PULL").execute();
 
         consumer.awaitTransferToBeInState(transferProcessId, STARTED);
 
@@ -205,24 +119,111 @@ void suspendAndResume_httpPull_dataTransfer(BaseParticipant consumer, BasePartic
         providerDataSource.verify(HttpRequest.request("/source").withMethod("GET"));
     }
 
-    protected void createResourcesOnProvider(BaseParticipant provider, String assetId, JsonObject contractPolicy, Map<String, Object> dataAddressProperties) {
-        provider.createAsset(assetId, Map.of("description", "description"), dataAddressProperties);
-        var contractPolicyId = provider.createPolicyDefinition(contractPolicy);
-        var noConstraintPolicyId = provider.createPolicyDefinition(noConstraintPolicy());
+    //@ParameterizedTest
+    @ArgumentsSource(ParticipantsArgProvider.class)
+    void suspendAndResumeByProvider_httpPull_dataTransfer(BaseParticipant consumer, BaseParticipant provider, String protocol) {
+        initialise(consumer, provider, protocol);
+        Map<String, Object> dataAddress = createDataAddress(providerDataSource, "/source");
+        var assetId = provider.createResource(dataAddress, PolicyFixtures.noConstraintPolicy());
+        var consumerTransferProcessId = consumer.requestAssetFrom(assetId, provider).withTransferType("HttpData-PULL").execute();
+        consumer.awaitTransferToBeInState(consumerTransferProcessId, STARTED);
+        var edr = await().atMost(consumer.getTimeout()).until(() -> consumer.getEdr(consumerTransferProcessId), Objects::nonNull);
+        var msg = UUID.randomUUID().toString();
+        await().atMost(consumer.getTimeout()).untilAsserted(() -> consumer.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));
+        var providerTransferProcessId =
+                provider.getTransferProcesses().stream().filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId)).map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow();
+        provider.suspendTransfer(providerTransferProcessId, "suspension");
+        provider.awaitTransferToBeInState(providerTransferProcessId, SUSPENDED);
+        // checks that the EDR is gone once the transfer has been suspended
+        await().atMost(consumer.getTimeout()).untilAsserted(() -> assertThatThrownBy(() -> consumer.getEdr(consumerTransferProcessId)));
+        // checks that transfer fails
+        await().atMost(consumer.getTimeout()).untilAsserted(() -> assertThatThrownBy(() -> consumer.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))));
+        provider.resumeTransfer(providerTransferProcessId);
+        // check that transfer is available again
+        provider.awaitTransferToBeInState(providerTransferProcessId, STARTED);
+        var secondEdr = await().atMost(consumer.getTimeout()).until(() -> consumer.getEdr(consumerTransferProcessId), Objects::nonNull);
+        var secondMessage = UUID.randomUUID().toString();
+        await().atMost(consumer.getTimeout()).untilAsserted(() -> consumer.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data")));
+        providerDataSource.verify(HttpRequest.request("/source").withMethod("GET"));
+    }
+
+
+    @ParameterizedTest
+    @ArgumentsSource(ParticipantsArgProvider.class)
+    void terminateTransferProcess(BaseParticipant consumer, BaseParticipant provider, String protocol) {
+        initialise(consumer, provider, protocol);
+        Map<String, Object> dataAddress = createDataAddress(providerDataSource, "/source");
+        var assetId = provider.createResource(dataAddress, PolicyFixtures.noConstraintPolicy());
+        String transferProcessId = consumer.requestAssetFrom(assetId, provider).withTransferType("HttpData-PULL").execute();
+        consumer.awaitTransferToBeInState(transferProcessId, STARTED);
+        DataAddress edr = await().atMost(consumer.getTimeout()).until(() -> consumer.getEdr(transferProcessId), Objects::nonNull);
+        await().atMost(consumer.getTimeout()).untilAsserted(() -> consumer.pullData(edr, Map.of(), body -> assertThat(body).isEqualTo("data")));
+        var providerTransferProcessId = provider.getTransferProcesses().stream().filter(filter -> filter.asJsonObject().getString("correlationId").equals(transferProcessId)).map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow();
+        provider.terminateTransfer(providerTransferProcessId);
+        provider.awaitTransferToBeInState(providerTransferProcessId, DEPROVISIONED);
+        // checks that the EDR is gone once the transfer has been terminated
+        await().atMost(consumer.getTimeout()).untilAsserted(() -> assertThatThrownBy(() -> consumer.getEdr(transferProcessId)));
+        await().atMost(consumer.getTimeout()).untilAsserted(() -> assertThatThrownBy(() -> consumer.pullData(edr, Map.of(), body -> assertThat(body).isEqualTo("data"))));
+        providerDataSource.verify(HttpRequest.request("/source").withMethod("GET"));
+    }
+
+    //@ParameterizedTest
+    @ArgumentsSource(ParticipantsArgProvider.class)
+    void deprovisionShouldFailOnceTransferProcessHasStarted(BaseParticipant consumer, BaseParticipant provider, String protocol) {
+        initialise(consumer, provider, protocol);
+        Map<String, Object> dataAddress = createDataAddress(providerDataSource, "/source");
+        var assetId = provider.createResource(dataAddress, PolicyFixtures.noConstraintPolicy());
+        String transferProcessId = consumer.requestAssetFrom(assetId, provider).withTransferType("HttpData-PULL").execute();
+        consumer.awaitTransferToBeInState(transferProcessId, STARTED);
+        assertThatThrownBy(() -> consumer.deprovisionTransfer(transferProcessId)).hasMessageContaining("Expected status code <204> but was <409>");
+        DataAddress edr = await().atMost(consumer.getTimeout()).until(() -> consumer.getEdr(transferProcessId), Objects::nonNull);
+        await().atMost(consumer.getTimeout()).untilAsserted(() -> consumer.pullData(edr, Map.of(), body -> assertThat(body).isEqualTo("data")));
+        providerDataSource.verify(HttpRequest.request("/source").withMethod("GET"));
+    }
+
+    //@ParameterizedTest
+    @ArgumentsSource(ParticipantsArgProvider.class)
+    void httpPull_dataTransfer_withCallbacks(BaseParticipant consumer, BaseParticipant provider, String protocol) {
+        initialise(consumer, provider, protocol);
+        Map<String, Object> dataAddress = createDataAddress(providerDataSource, "/source");
+        var callbacksEndpoint = startClientAndServer(getFreePort());
+        var assetId = provider.createResource(dataAddress, PolicyFixtures.noConstraintPolicy());
+        var callbackUrl = String.format("http://localhost:%d/hooks", callbacksEndpoint.getLocalPort());
+        var callbacks = Json.createArrayBuilder().add(createCallback(callbackUrl, true, Set.of("transfer.process.started"))).build();
+
+        var request = request().withPath("/hooks").withMethod(HttpMethod.POST.name());
 
-        provider.createContractDefinition(assetId, UUID.randomUUID().toString(), noConstraintPolicyId, contractPolicyId);
+        var events = new ConcurrentHashMap<String, TransferProcessStarted>();
+
+        callbacksEndpoint.when(request).respond(req -> this.cacheEdr(req, events));
+
+        var transferProcessId = consumer.requestAssetFrom(assetId, provider).withTransferType("HttpData-PULL").withCallbacks(callbacks).execute();
+
+        consumer.awaitTransferToBeInState(transferProcessId, STARTED);
+
+        await().atMost(consumer.getTimeout()).untilAsserted(() -> assertThat(events.get(transferProcessId)).isNotNull());
+        var event = events.get(transferProcessId);
+        var msg = UUID.randomUUID().toString();
+        await().atMost(consumer.getTimeout()).untilAsserted(() -> consumer.pullData(event.getDataAddress(), Map.of("message", msg), body -> assertThat(body).isEqualTo("data")));
+
+        providerDataSource.verify(request("/source").withMethod("GET"));
     }
 
-    private static class ParticipantsArgProvider implements ArgumentsProvider {
-        @Override
-        public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception {
-            return Stream.of(
-                    Arguments.of(REMOTE_PARTICIPANT, LOCAL_PARTICIPANT, "dataspace-protocol-http"),
-                    Arguments.of(LOCAL_PARTICIPANT, REMOTE_PARTICIPANT, "dataspace-protocol-http"),
-                    Arguments.of(REMOTE_PARTICIPANT, LOCAL_PARTICIPANT, "dataspace-protocol-http:2024/1"),
-                    Arguments.of(LOCAL_PARTICIPANT, REMOTE_PARTICIPANT, "dataspace-protocol-http:2024/1")
-            );
-        }
+    private JsonObject createCallback(String url, boolean transactional, Set<String> events) {
+        return Json.createObjectBuilder().add(TYPE, EDC_NAMESPACE + "CallbackAddress").add(EDC_NAMESPACE + "transactional", transactional).add(EDC_NAMESPACE + "uri", url)
+                .add(EDC_NAMESPACE + "events", events.stream().collect(Json::createArrayBuilder, JsonArrayBuilder::add, JsonArrayBuilder::add).build()).build();
     }
 
+    private HttpResponse cacheEdr(HttpRequest request, Map<String, TransferProcessStarted> events) {
+
+        try {
+            var event = MAPPER.readValue(request.getBody().toString(), new TypeReference<EventEnvelope<TransferProcessStarted>>() {
+            });
+            events.put(event.getPayload().getTransferProcessId(), event.getPayload());
+            return response().withStatusCode(HttpStatusCode.OK_200.code()).withHeader(HttpHeaderNames.CONTENT_TYPE.toString(), MediaType.PLAIN_TEXT_UTF_8.toString()).withBody("{}");
+
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }