diff --git a/.github/workflows/funnel_msvc_tests.yml b/.github/workflows/funnel_msvc_tests.yml
new file mode 100644
index 0000000..514e8e3
--- /dev/null
+++ b/.github/workflows/funnel_msvc_tests.yml
@@ -0,0 +1,47 @@
+name: Funnel in µsvc tests
+
+on:
+ push:
+ branches: [ main ]
+ tags-ignore: [ "*-v*" ]
+ paths:
+ - "publisher/**"
+ pull_request:
+ branches: [ main ]
+ paths:
+ - "publisher/**"
+
+jobs:
+ setup-cluster-and-test:
+ name: Test Funnel on cluster with multiple scaled services
+ runs-on: ubuntu-latest
+ defaults:
+ run:
+ working-directory: ./publisher
+ env:
+ DOTNET_VERSION: 8.0.100-rc.1.23463.5
+ DOTNET_SKIP_FIRST_TIME_EXPERIENCE: 1
+ DOTNET_CLI_TELEMETRY_OPTOUT: 1
+ timeout-minutes: 20
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+ - name: Setup .NET SDK
+ uses: actions/setup-dotnet@v3
+ with:
+ dotnet-version: ${{ env.DOTNET_VERSION }}
+ - name: Setup Tilt
+ uses: yokawasa/action-setup-kube-tools@v0.9.3
+ with:
+ tilt: v0.33.6
+ setup-tools: |
+ tilt
+ - name: Create test cluster
+ uses: AbsaOSS/k3d-action@v2
+ with:
+ cluster-name: testapp
+ args: --config=publisher/funnel_test_cluster/k3d.yaml
+ - name: Make the script files executable
+ run: chmod +x funnel_test_cluster/subst_tpl_envs.sh
+ - name: Run Tilt CI with tests
+ run: tilt ci --context k3d-testapp -f funnel_test_cluster/Tiltfile
diff --git a/.gitignore b/.gitignore
index 853f347..4b0fc68 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,4 @@
# JetBrains Rider
**/.idea/
+
+**/out/
diff --git a/publisher/Directory.Build.targets b/publisher/Directory.Build.targets
index 081e226..0bc4078 100644
--- a/publisher/Directory.Build.targets
+++ b/publisher/Directory.Build.targets
@@ -11,6 +11,7 @@
+
diff --git a/publisher/LeanCode.Pipe.sln b/publisher/LeanCode.Pipe.sln
index df1cee6..cb5fdb1 100644
--- a/publisher/LeanCode.Pipe.sln
+++ b/publisher/LeanCode.Pipe.sln
@@ -19,6 +19,22 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LeanCode.Pipe.TestClient.Te
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LeanCode.Pipe.IntegrationTests", "test\LeanCode.Pipe.IntegrationTests\LeanCode.Pipe.IntegrationTests.csproj", "{05F2BDCE-B093-4D4A-BC36-B7F6F12103AC}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LeanCode.Pipe.Funnel.TestApp1", "test\LeanCode.Pipe.Funnel.TestApp1\LeanCode.Pipe.Funnel.TestApp1.csproj", "{A95EBB5F-D7CD-48D4-9624-F43A486D0D68}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LeanCode.Pipe.Funnel.TestAppFunnel", "test\LeanCode.Pipe.Funnel.TestAppFunnel\LeanCode.Pipe.Funnel.TestAppFunnel.csproj", "{6D361069-9CD4-44DA-914A-80A62458B04F}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "funnel_msvc", "funnel_msvc", "{2A218F39-75C1-4EE1-8614-D67AB30574F0}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LeanCode.Pipe.Funnel.ScaledTargetServiceTests", "test\LeanCode.Pipe.Funnel.ScaledTargetServiceTests\LeanCode.Pipe.Funnel.ScaledTargetServiceTests.csproj", "{531D5FD5-9C72-4F24-B9E7-E53CFBF20B57}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LeanCode.Pipe.Funnel.TestApp2", "test\LeanCode.Pipe.Funnel.TestApp2\LeanCode.Pipe.Funnel.TestApp2.csproj", "{66FEF33B-4FCB-41BC-B81F-DE20C02F95DB}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LeanCode.Pipe.Funnel.MultipleServicesTests", "test\LeanCode.Pipe.Funnel.MultipleServicesTests\LeanCode.Pipe.Funnel.MultipleServicesTests.csproj", "{B77F5B9B-2FF6-4A86-88B9-9C3BF9FF6427}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LeanCode.Pipe.Funnel.ScaledFunnelTests", "test\LeanCode.Pipe.Funnel.ScaledFunnelTests\LeanCode.Pipe.Funnel.ScaledFunnelTests.csproj", "{8EF7A9C2-B6A0-422A-90B5-76CB2852225D}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LeanCode.Pipe.Funnel.NoScalingTests", "test\LeanCode.Pipe.Funnel.NoScalingTests\LeanCode.Pipe.Funnel.NoScalingTests.csproj", "{1F89EB12-9052-46D8-A72E-2BA6E030D3BD}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -52,6 +68,34 @@ Global
{05F2BDCE-B093-4D4A-BC36-B7F6F12103AC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{05F2BDCE-B093-4D4A-BC36-B7F6F12103AC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{05F2BDCE-B093-4D4A-BC36-B7F6F12103AC}.Release|Any CPU.Build.0 = Release|Any CPU
+ {A95EBB5F-D7CD-48D4-9624-F43A486D0D68}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {A95EBB5F-D7CD-48D4-9624-F43A486D0D68}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A95EBB5F-D7CD-48D4-9624-F43A486D0D68}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {A95EBB5F-D7CD-48D4-9624-F43A486D0D68}.Release|Any CPU.Build.0 = Release|Any CPU
+ {6D361069-9CD4-44DA-914A-80A62458B04F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {6D361069-9CD4-44DA-914A-80A62458B04F}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {6D361069-9CD4-44DA-914A-80A62458B04F}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {6D361069-9CD4-44DA-914A-80A62458B04F}.Release|Any CPU.Build.0 = Release|Any CPU
+ {531D5FD5-9C72-4F24-B9E7-E53CFBF20B57}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {531D5FD5-9C72-4F24-B9E7-E53CFBF20B57}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {531D5FD5-9C72-4F24-B9E7-E53CFBF20B57}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {531D5FD5-9C72-4F24-B9E7-E53CFBF20B57}.Release|Any CPU.Build.0 = Release|Any CPU
+ {66FEF33B-4FCB-41BC-B81F-DE20C02F95DB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {66FEF33B-4FCB-41BC-B81F-DE20C02F95DB}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {66FEF33B-4FCB-41BC-B81F-DE20C02F95DB}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {66FEF33B-4FCB-41BC-B81F-DE20C02F95DB}.Release|Any CPU.Build.0 = Release|Any CPU
+ {B77F5B9B-2FF6-4A86-88B9-9C3BF9FF6427}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {B77F5B9B-2FF6-4A86-88B9-9C3BF9FF6427}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {B77F5B9B-2FF6-4A86-88B9-9C3BF9FF6427}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {B77F5B9B-2FF6-4A86-88B9-9C3BF9FF6427}.Release|Any CPU.Build.0 = Release|Any CPU
+ {8EF7A9C2-B6A0-422A-90B5-76CB2852225D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {8EF7A9C2-B6A0-422A-90B5-76CB2852225D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {8EF7A9C2-B6A0-422A-90B5-76CB2852225D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {8EF7A9C2-B6A0-422A-90B5-76CB2852225D}.Release|Any CPU.Build.0 = Release|Any CPU
+ {1F89EB12-9052-46D8-A72E-2BA6E030D3BD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {1F89EB12-9052-46D8-A72E-2BA6E030D3BD}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {1F89EB12-9052-46D8-A72E-2BA6E030D3BD}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {1F89EB12-9052-46D8-A72E-2BA6E030D3BD}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{75B2B837-B392-4DC1-8CF7-C1B2C4E03312} = {401E8BD4-C7D7-48F2-8899-EA9859247BD7}
@@ -60,5 +104,13 @@ Global
{F1830D31-AA1D-409B-B78E-FC2BB88B7DF8} = {401E8BD4-C7D7-48F2-8899-EA9859247BD7}
{38F1AA50-B16E-47A7-925A-AB574E9120FF} = {CBE053F8-6853-4FFC-A931-84A210C5FF2C}
{05F2BDCE-B093-4D4A-BC36-B7F6F12103AC} = {CBE053F8-6853-4FFC-A931-84A210C5FF2C}
+ {2A218F39-75C1-4EE1-8614-D67AB30574F0} = {CBE053F8-6853-4FFC-A931-84A210C5FF2C}
+ {A95EBB5F-D7CD-48D4-9624-F43A486D0D68} = {2A218F39-75C1-4EE1-8614-D67AB30574F0}
+ {6D361069-9CD4-44DA-914A-80A62458B04F} = {2A218F39-75C1-4EE1-8614-D67AB30574F0}
+ {531D5FD5-9C72-4F24-B9E7-E53CFBF20B57} = {2A218F39-75C1-4EE1-8614-D67AB30574F0}
+ {66FEF33B-4FCB-41BC-B81F-DE20C02F95DB} = {2A218F39-75C1-4EE1-8614-D67AB30574F0}
+ {B77F5B9B-2FF6-4A86-88B9-9C3BF9FF6427} = {2A218F39-75C1-4EE1-8614-D67AB30574F0}
+ {8EF7A9C2-B6A0-422A-90B5-76CB2852225D} = {2A218F39-75C1-4EE1-8614-D67AB30574F0}
+ {1F89EB12-9052-46D8-A72E-2BA6E030D3BD} = {2A218F39-75C1-4EE1-8614-D67AB30574F0}
EndGlobalSection
EndGlobal
diff --git a/publisher/funnel_test_cluster/Tiltfile b/publisher/funnel_test_cluster/Tiltfile
new file mode 100644
index 0000000..3cccd10
--- /dev/null
+++ b/publisher/funnel_test_cluster/Tiltfile
@@ -0,0 +1,238 @@
+load('ext://namespace', 'namespace_create')
+
+allow_k8s_contexts('k3d-testapp')
+default_registry('k3d-testapp-registry.local.lncd.pl:21345')
+
+# All services needed for tests
+
+local_resource(
+ 'build-testappfunnel',
+ 'dotnet publish -o out/testapp_funnel ../test/LeanCode.Pipe.Funnel.TestAppFunnel',
+ deps=['../src', '../test/LeanCode.Pipe.Funnel.TestAppFunnel'],
+ ignore=['../**/obj', '../**/bin'],
+ labels=['test-services'],
+)
+
+local_resource(
+ 'build-testapp1',
+ 'dotnet publish -o out/testapp1 ../test/LeanCode.Pipe.Funnel.TestApp1',
+ deps=['src', '../test/LeanCode.Pipe.Funnel.TestApp1'],
+ ignore=['../**/obj', '../**/bin'],
+ labels=['test-services'],
+)
+
+local_resource(
+ 'build-testapp2',
+ 'dotnet publish -o out/testapp2 ../test/LeanCode.Pipe.Funnel.TestApp2',
+ deps=['src', '../test/LeanCode.Pipe.Funnel.TestApp2'],
+ ignore=['../**/obj', '../**/bin'],
+ labels=['test-services'],
+)
+
+docker_build(
+ 'testapp_funnel',
+ '.',
+ dockerfile='testapp_funnel.dockerfile',
+ only = ['out/testapp_funnel'],
+)
+
+docker_build(
+ 'testapp1',
+ '.',
+ dockerfile='testapp1.dockerfile',
+ only = ['out/testapp1'],
+)
+
+docker_build(
+ 'testapp2',
+ '.',
+ dockerfile='testapp2.dockerfile',
+ only = ['out/testapp2'],
+)
+
+k8s_yaml('rabbitmq.yaml')
+k8s_resource('rabbitmq', labels=['test-services'])
+
+# No scaling tests
+
+namespace_create('no-scaling')
+
+local_resource(
+ 'build-no-scaling-tests',
+ 'dotnet publish -o out/no_scaling_tests ../test/LeanCode.Pipe.Funnel.NoScalingTests',
+ deps=[
+ '../src',
+ '../test/LeanCode.Pipe.Funnel.TestAppFunnel',
+ '../test/LeanCode.Pipe.Funnel.TestApp1',
+ '../test/LeanCode.Pipe.Funnel.NoScalingTests' ],
+ ignore=['../**/obj', '../**/bin'],
+ labels=['no-scaling-tests']
+)
+
+docker_build(
+ 'no_scaling_tests',
+ '.',
+ dockerfile='no_scaling/tests.dockerfile',
+ only = ['out/no_scaling_tests']
+)
+
+k8s_yaml(local('./subst_tpl_envs.sh no_scaling funnel.yaml'))
+k8s_resource(
+ 'no-scaling-funnel',
+ resource_deps=['rabbitmq'],
+ labels=['no-scaling-tests']
+)
+
+k8s_yaml(local('./subst_tpl_envs.sh no_scaling testapp1.yaml'))
+k8s_resource(
+ 'no-scaling-testapp1',
+ resource_deps=['rabbitmq'],
+ labels=['no-scaling-tests']
+)
+
+k8s_yaml(local('./subst_tpl_envs.sh no_scaling tests.yaml'))
+k8s_resource(
+ 'no-scaling-tests',
+ resource_deps=['no-scaling-funnel', 'no-scaling-testapp1'],
+ labels=['no-scaling-tests']
+)
+
+# Scaled target service tests
+
+namespace_create('scaled-target-service')
+
+local_resource(
+ 'build-scaled-target-service-tests',
+ 'dotnet publish -o out/scaled_target_service_tests ../test/LeanCode.Pipe.Funnel.ScaledTargetServiceTests',
+ deps=[
+ '../src',
+ '../test/LeanCode.Pipe.Funnel.TestAppFunnel',
+ '../test/LeanCode.Pipe.Funnel.TestApp1',
+ '../test/LeanCode.Pipe.Funnel.ScaledTargetServiceTests' ],
+ ignore=['../**/obj', '../**/bin'],
+ labels=['scaled-target-service-tests']
+)
+
+docker_build(
+ 'scaled_target_service_tests',
+ '.',
+ dockerfile='scaled_target_service/tests.dockerfile',
+ only = ['out/scaled_target_service_tests']
+)
+
+k8s_yaml(local('./subst_tpl_envs.sh scaled_target_service funnel.yaml'))
+k8s_resource(
+ 'scaled-target-service-funnel',
+ resource_deps=['rabbitmq'],
+ labels=['scaled-target-service-tests']
+)
+
+k8s_yaml(local('./subst_tpl_envs.sh scaled_target_service testapp1.yaml'))
+k8s_resource(
+ 'scaled-target-service-testapp1',
+ resource_deps=['rabbitmq'],
+ labels=['scaled-target-service-tests']
+)
+
+k8s_yaml(local('./subst_tpl_envs.sh scaled_target_service tests.yaml'))
+k8s_resource(
+ 'scaled-target-service-tests',
+ resource_deps=['scaled-target-service-funnel', 'scaled-target-service-testapp1'],
+ labels=['scaled-target-service-tests']
+)
+
+# Scaled funnel tests
+
+namespace_create('scaled-funnel')
+
+local_resource(
+ 'build-scaled-funnel-tests',
+ 'dotnet publish -o out/scaled_funnel_tests ../test/LeanCode.Pipe.Funnel.ScaledFunnelTests',
+ deps=[
+ '../src',
+ '../test/LeanCode.Pipe.Funnel.TestAppFunnel',
+ '../test/LeanCode.Pipe.Funnel.TestApp1',
+ '../test/LeanCode.Pipe.Funnel.ScaledFunnelTests' ],
+ ignore=['../**/obj', '../**/bin'],
+ labels=['scaled-funnel-tests']
+)
+
+docker_build(
+ 'scaled_funnel_tests',
+ '.',
+ dockerfile='scaled_funnel/tests.dockerfile',
+ only = ['out/scaled_funnel_tests']
+)
+
+k8s_yaml(local('./subst_tpl_envs.sh scaled_funnel funnel.yaml'))
+k8s_resource(
+ 'scaled-funnel-funnel',
+ resource_deps=['rabbitmq'],
+ labels=['scaled-funnel-tests']
+)
+
+k8s_yaml(local('./subst_tpl_envs.sh scaled_funnel testapp1.yaml'))
+k8s_resource(
+ 'scaled-funnel-testapp1',
+ resource_deps=['rabbitmq'],
+ labels=['scaled-funnel-tests']
+)
+
+k8s_yaml(local('./subst_tpl_envs.sh scaled_funnel tests.yaml'))
+k8s_resource(
+ 'scaled-funnel-tests',
+ resource_deps=['scaled-funnel-funnel', 'scaled-funnel-testapp1'],
+ labels=['scaled-funnel-tests']
+)
+
+# Multiple services tests
+
+namespace_create('multiple-services')
+
+local_resource(
+ 'build-multiple-services-tests',
+ 'dotnet publish -o out/multiple_services_tests ../test/LeanCode.Pipe.Funnel.MultipleServicesTests',
+ deps=[
+ '../src',
+ '../test/LeanCode.Pipe.Funnel.TestAppFunnel',
+ '../test/LeanCode.Pipe.Funnel.TestApp1',
+ '../test/LeanCode.Pipe.Funnel.TestApp2',
+ '../test/LeanCode.Pipe.Funnel.MultipleServicesTests' ],
+ ignore=['../**/obj', '../**/bin'],
+ labels=['multiple-services-tests']
+)
+
+docker_build(
+ 'multiple_services_tests',
+ '.',
+ dockerfile='multiple_services/tests.dockerfile',
+ only = ['out/multiple_services_tests']
+)
+
+k8s_yaml(local('./subst_tpl_envs.sh multiple_services funnel.yaml'))
+k8s_resource(
+ 'multiple-services-funnel',
+ resource_deps=['rabbitmq'],
+ labels=['multiple-services-tests']
+)
+
+k8s_yaml(local('./subst_tpl_envs.sh multiple_services testapp1.yaml'))
+k8s_resource(
+ 'multiple-services-testapp1',
+ resource_deps=['rabbitmq'],
+ labels=['multiple-services-tests']
+)
+
+k8s_yaml(local('./subst_tpl_envs.sh multiple_services testapp2.yaml'))
+k8s_resource(
+ 'multiple-services-testapp2',
+ resource_deps=['rabbitmq'],
+ labels=['multiple-services-tests']
+)
+
+k8s_yaml(local('./subst_tpl_envs.sh multiple_services tests.yaml'))
+k8s_resource(
+ 'multiple-services-tests',
+ resource_deps=['multiple-services-funnel', 'multiple-services-testapp1', 'multiple-services-testapp2'],
+ labels=['multiple-services-tests']
+)
diff --git a/publisher/funnel_test_cluster/env_templates/funnel.yaml.tpl b/publisher/funnel_test_cluster/env_templates/funnel.yaml.tpl
new file mode 100644
index 0000000..80df15a
--- /dev/null
+++ b/publisher/funnel_test_cluster/env_templates/funnel.yaml.tpl
@@ -0,0 +1,57 @@
+apiVersion: v1
+kind: Service
+metadata:
+ name: $ENV_NAME-funnel-svc
+ namespace: $ENV_NAME
+ labels:
+ app: $ENV_NAME-funnel
+spec:
+ ports:
+ - port: 80
+ targetPort: 8080
+ protocol: TCP
+ clusterIP: None
+ selector:
+ app: $ENV_NAME-funnel
+---
+apiVersion: apps/v1
+kind: StatefulSet
+metadata:
+ name: $ENV_NAME-funnel
+ namespace: $ENV_NAME
+ labels:
+ app: $ENV_NAME-funnel
+spec:
+ selector:
+ matchLabels:
+ app: $ENV_NAME-funnel
+ serviceName: $ENV_NAME-funnel-svc
+ replicas: $FUNNEL_REPLICAS
+ template:
+ metadata:
+ labels:
+ app: $ENV_NAME-funnel
+ spec:
+ containers:
+ - name: leanpipe-funnel
+ image: testapp_funnel
+ env:
+ - name: MassTransit__RabbitMq__Url
+ value: rabbitmq://guest:guest@rabbitmq-$RABBITMQ_INSTANCE.rabbitmq-svc.default.svc.cluster.local/
+ ports:
+ - containerPort: 8080
+ - containerPort: 22
+ livenessProbe:
+ httpGet:
+ path: /health/live
+ port: 8080
+ initialDelaySeconds: 2
+ periodSeconds: 2
+ timeoutSeconds: 5
+ readinessProbe:
+ httpGet:
+ path: /health/ready
+ port: 8080
+ initialDelaySeconds: 5
+ periodSeconds: 2
+ timeoutSeconds: 5
diff --git a/publisher/funnel_test_cluster/env_templates/testapp1.yaml.tpl b/publisher/funnel_test_cluster/env_templates/testapp1.yaml.tpl
new file mode 100644
index 0000000..5e07d74
--- /dev/null
+++ b/publisher/funnel_test_cluster/env_templates/testapp1.yaml.tpl
@@ -0,0 +1,57 @@
+apiVersion: v1
+kind: Service
+metadata:
+ name: $ENV_NAME-testapp1-svc
+ namespace: $ENV_NAME
+ labels:
+ app: $ENV_NAME-testapp1
+spec:
+ ports:
+ - port: 80
+ targetPort: 8080
+ protocol: TCP
+ clusterIP: None
+ selector:
+ app: $ENV_NAME-testapp1
+---
+apiVersion: apps/v1
+kind: StatefulSet
+metadata:
+ name: $ENV_NAME-testapp1
+ namespace: $ENV_NAME
+ labels:
+ app: $ENV_NAME-testapp1
+spec:
+ selector:
+ matchLabels:
+ app: $ENV_NAME-testapp1
+ serviceName: $ENV_NAME-testapp1-svc
+ replicas: $TESTAPP1_REPLICAS
+ template:
+ metadata:
+ labels:
+ app: $ENV_NAME-testapp1
+ spec:
+ containers:
+ - name: testapp1
+ image: testapp1
+ env:
+ - name: MassTransit__RabbitMq__Url
+ value: rabbitmq://guest:guest@rabbitmq-$RABBITMQ_INSTANCE.rabbitmq-svc.default.svc.cluster.local/
+ ports:
+ - containerPort: 8080
+ - containerPort: 22
+ livenessProbe:
+ httpGet:
+ path: /health/live
+ port: 8080
+ initialDelaySeconds: 2
+ periodSeconds: 2
+ timeoutSeconds: 5
+ readinessProbe:
+ httpGet:
+ path: /health/ready
+ port: 8080
+ initialDelaySeconds: 5
+ periodSeconds: 2
+ timeoutSeconds: 5
diff --git a/publisher/funnel_test_cluster/env_templates/testapp2.yaml.tpl b/publisher/funnel_test_cluster/env_templates/testapp2.yaml.tpl
new file mode 100644
index 0000000..c12b010
--- /dev/null
+++ b/publisher/funnel_test_cluster/env_templates/testapp2.yaml.tpl
@@ -0,0 +1,57 @@
+apiVersion: v1
+kind: Service
+metadata:
+ name: $ENV_NAME-testapp2-svc
+ namespace: $ENV_NAME
+ labels:
+ app: $ENV_NAME-testapp2
+spec:
+ ports:
+ - port: 80
+ targetPort: 8080
+ protocol: TCP
+ clusterIP: None
+ selector:
+ app: $ENV_NAME-testapp2
+---
+apiVersion: apps/v1
+kind: StatefulSet
+metadata:
+ name: $ENV_NAME-testapp2
+ namespace: $ENV_NAME
+ labels:
+ app: $ENV_NAME-testapp2
+spec:
+ selector:
+ matchLabels:
+ app: $ENV_NAME-testapp2
+ serviceName: $ENV_NAME-testapp2-svc
+ replicas: 1
+ template:
+ metadata:
+ labels:
+ app: $ENV_NAME-testapp2
+ spec:
+ containers:
+ - name: testapp2
+ image: testapp2
+ env:
+ - name: MassTransit__RabbitMq__Url
+ value: rabbitmq://guest:guest@rabbitmq-$RABBITMQ_INSTANCE.rabbitmq-svc.default.svc.cluster.local/
+ ports:
+ - containerPort: 8080
+ - containerPort: 22
+ livenessProbe:
+ httpGet:
+ path: /health/live
+ port: 8080
+ initialDelaySeconds: 2
+ periodSeconds: 2
+ timeoutSeconds: 5
+ readinessProbe:
+ httpGet:
+ path: /health/ready
+ port: 8080
+ initialDelaySeconds: 5
+ periodSeconds: 2
+ timeoutSeconds: 5
diff --git a/publisher/funnel_test_cluster/env_templates/tests.yaml.tpl b/publisher/funnel_test_cluster/env_templates/tests.yaml.tpl
new file mode 100644
index 0000000..eee295f
--- /dev/null
+++ b/publisher/funnel_test_cluster/env_templates/tests.yaml.tpl
@@ -0,0 +1,16 @@
+apiVersion: batch/v1
+kind: Job
+metadata:
+ name: $ENV_NAME-tests
+ namespace: $ENV_NAME
+ labels:
+ app: $ENV_NAME-tests
+spec:
+ template:
+ spec:
+ restartPolicy: Never
+ containers:
+ - name: $ENV_NAME-tests
+ image: $TESTS_NAME
+ ports:
+ - containerPort: 22
diff --git a/publisher/funnel_test_cluster/k3d.yaml b/publisher/funnel_test_cluster/k3d.yaml
new file mode 100644
index 0000000..d94e261
--- /dev/null
+++ b/publisher/funnel_test_cluster/k3d.yaml
@@ -0,0 +1,43 @@
+apiVersion: k3d.io/v1alpha4
+kind: Simple
+metadata:
+ name: testapp
+image: docker.io/rancher/k3s:v1.28.2-k3s1
+servers: 1
+agents: 0
+kubeAPI:
+ host: "testapp-cluster.local.lncd.pl"
+ hostIP: "127.0.0.1"
+ hostPort: "6445"
+ports:
+ - port: 80:80
+ nodeFilters:
+ - loadbalancer
+ - port: 443:443
+ nodeFilters:
+ - loadbalancer
+ - port: 1433:1433
+ nodeFilters:
+ - loadbalancer
+ - port: 5432:5432
+ nodeFilters:
+ - loadbalancer
+ - port: 10000:10000
+ nodeFilters:
+ - loadbalancer
+registries:
+ create:
+ name: k3d-testapp-registry.local.lncd.pl
+ host: "0.0.0.0"
+ hostPort: "21345"
+options:
+ k3d:
+ wait: true
+ k3s:
+ extraArgs:
+ - arg: --disable=traefik,metrics-server
+ nodeFilters:
+ - server:*
+ kubeconfig:
+ updateDefaultKubeconfig: true
+ switchCurrentContext: true
diff --git a/publisher/funnel_test_cluster/multiple_services/env.sh b/publisher/funnel_test_cluster/multiple_services/env.sh
new file mode 100644
index 0000000..cc8b4a8
--- /dev/null
+++ b/publisher/funnel_test_cluster/multiple_services/env.sh
@@ -0,0 +1,7 @@
+#!/usr/bin/env bash
+
+export ENV_NAME=multiple-services
+export TESTS_NAME=multiple_services_tests
+export RABBITMQ_INSTANCE=3
+export FUNNEL_REPLICAS=2
+export TESTAPP1_REPLICAS=1
diff --git a/publisher/funnel_test_cluster/multiple_services/tests.dockerfile b/publisher/funnel_test_cluster/multiple_services/tests.dockerfile
new file mode 100644
index 0000000..4fa0ae5
--- /dev/null
+++ b/publisher/funnel_test_cluster/multiple_services/tests.dockerfile
@@ -0,0 +1,6 @@
+FROM mcr.microsoft.com/dotnet/sdk:8.0
+
+USER $APP_UID
+
+COPY --chown=$APP_UID out/multiple_services_tests /home/app/bin
+ENTRYPOINT ["dotnet", "test", "/home/app/bin/MultipleServicesTests.dll"]
diff --git a/publisher/funnel_test_cluster/no_scaling/env.sh b/publisher/funnel_test_cluster/no_scaling/env.sh
new file mode 100644
index 0000000..a2a3768
--- /dev/null
+++ b/publisher/funnel_test_cluster/no_scaling/env.sh
@@ -0,0 +1,7 @@
+#!/usr/bin/env bash
+
+export ENV_NAME=no-scaling
+export TESTS_NAME=no_scaling_tests
+export RABBITMQ_INSTANCE=0
+export FUNNEL_REPLICAS=1
+export TESTAPP1_REPLICAS=1
diff --git a/publisher/funnel_test_cluster/no_scaling/tests.dockerfile b/publisher/funnel_test_cluster/no_scaling/tests.dockerfile
new file mode 100644
index 0000000..3aeb31b
--- /dev/null
+++ b/publisher/funnel_test_cluster/no_scaling/tests.dockerfile
@@ -0,0 +1,6 @@
+FROM mcr.microsoft.com/dotnet/sdk:8.0
+
+USER $APP_UID
+
+COPY --chown=$APP_UID out/no_scaling_tests /home/app/bin
+ENTRYPOINT ["dotnet", "test", "/home/app/bin/NoScalingTests.dll"]
diff --git a/publisher/funnel_test_cluster/rabbitmq.yaml b/publisher/funnel_test_cluster/rabbitmq.yaml
new file mode 100644
index 0000000..b5e6c19
--- /dev/null
+++ b/publisher/funnel_test_cluster/rabbitmq.yaml
@@ -0,0 +1,40 @@
+apiVersion: v1
+kind: Service
+metadata:
+ name: rabbitmq-svc
+ labels:
+ app: rabbitmq
+spec:
+ ports:
+ - port: 5672
+ name: port1
+ - port: 15672
+ name: port2
+ clusterIP: None
+ selector:
+ app: rabbitmq
+---
+apiVersion: apps/v1
+kind: StatefulSet
+metadata:
+ name: rabbitmq
+ labels:
+ app: rabbitmq
+spec:
+ selector:
+ matchLabels:
+ app: rabbitmq
+ serviceName: rabbitmq-svc
+# A replica for different test configuration
+ replicas: 4
+ template:
+ metadata:
+ labels:
+ app: rabbitmq
+ spec:
+ containers:
+ - name: rabbitmq
+ image: rabbitmq:3-management
+ ports:
+ - containerPort: 5672
+ - containerPort: 15672
diff --git a/publisher/funnel_test_cluster/scaled_funnel/env.sh b/publisher/funnel_test_cluster/scaled_funnel/env.sh
new file mode 100644
index 0000000..34bcebc
--- /dev/null
+++ b/publisher/funnel_test_cluster/scaled_funnel/env.sh
@@ -0,0 +1,7 @@
+#!/usr/bin/env bash
+
+export ENV_NAME=scaled-funnel
+export TESTS_NAME=scaled_funnel_tests
+export RABBITMQ_INSTANCE=2
+export FUNNEL_REPLICAS=2
+export TESTAPP1_REPLICAS=1
diff --git a/publisher/funnel_test_cluster/scaled_funnel/tests.dockerfile b/publisher/funnel_test_cluster/scaled_funnel/tests.dockerfile
new file mode 100644
index 0000000..78770fc
--- /dev/null
+++ b/publisher/funnel_test_cluster/scaled_funnel/tests.dockerfile
@@ -0,0 +1,6 @@
+FROM mcr.microsoft.com/dotnet/sdk:8.0
+
+USER $APP_UID
+
+COPY --chown=$APP_UID out/scaled_funnel_tests /home/app/bin
+ENTRYPOINT ["dotnet", "test", "/home/app/bin/ScaledFunnelTests.dll"]
diff --git a/publisher/funnel_test_cluster/scaled_target_service/env.sh b/publisher/funnel_test_cluster/scaled_target_service/env.sh
new file mode 100644
index 0000000..005d7aa
--- /dev/null
+++ b/publisher/funnel_test_cluster/scaled_target_service/env.sh
@@ -0,0 +1,7 @@
+#!/usr/bin/env bash
+
+export ENV_NAME=scaled-target-service
+export TESTS_NAME=scaled_target_service_tests
+export RABBITMQ_INSTANCE=3
+export FUNNEL_REPLICAS=1
+export TESTAPP1_REPLICAS=2
diff --git a/publisher/funnel_test_cluster/scaled_target_service/tests.dockerfile b/publisher/funnel_test_cluster/scaled_target_service/tests.dockerfile
new file mode 100644
index 0000000..d34b51e
--- /dev/null
+++ b/publisher/funnel_test_cluster/scaled_target_service/tests.dockerfile
@@ -0,0 +1,6 @@
+FROM mcr.microsoft.com/dotnet/sdk:8.0
+
+USER $APP_UID
+
+COPY --chown=$APP_UID out/scaled_target_service_tests /home/app/bin
+ENTRYPOINT ["dotnet", "test", "/home/app/bin/ScaledTargetServiceTests.dll"]
diff --git a/publisher/funnel_test_cluster/subst_tpl_envs.sh b/publisher/funnel_test_cluster/subst_tpl_envs.sh
new file mode 100755
index 0000000..89e9163
--- /dev/null
+++ b/publisher/funnel_test_cluster/subst_tpl_envs.sh
@@ -0,0 +1,7 @@
+#!/usr/bin/env bash
+
+DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+
+source "$DIR/${1}/env.sh"
+
+cat "$DIR/env_templates/${2}.tpl" | envsubst
diff --git a/publisher/funnel_test_cluster/testapp1.dockerfile b/publisher/funnel_test_cluster/testapp1.dockerfile
new file mode 100644
index 0000000..e52eaeb
--- /dev/null
+++ b/publisher/funnel_test_cluster/testapp1.dockerfile
@@ -0,0 +1,9 @@
+FROM mcr.microsoft.com/dotnet/aspnet:8.0
+
+USER $APP_UID
+
+ENV ASPNETCORE_ENVIRONMENT=Development
+ARG MassTransit__RabbitMq__Url
+
+COPY --chown=$APP_UID out/testapp1 /home/app/bin
+ENTRYPOINT ["dotnet", "/home/app/bin/TestApp1.dll"]
diff --git a/publisher/funnel_test_cluster/testapp2.dockerfile b/publisher/funnel_test_cluster/testapp2.dockerfile
new file mode 100644
index 0000000..8310b57
--- /dev/null
+++ b/publisher/funnel_test_cluster/testapp2.dockerfile
@@ -0,0 +1,9 @@
+FROM mcr.microsoft.com/dotnet/aspnet:8.0
+
+USER $APP_UID
+
+ENV ASPNETCORE_ENVIRONMENT=Development
+ARG MassTransit__RabbitMq__Url
+
+COPY --chown=$APP_UID out/testapp2 /home/app/bin
+ENTRYPOINT ["dotnet", "/home/app/bin/TestApp2.dll"]
diff --git a/publisher/funnel_test_cluster/testapp_funnel.dockerfile b/publisher/funnel_test_cluster/testapp_funnel.dockerfile
new file mode 100644
index 0000000..112d637
--- /dev/null
+++ b/publisher/funnel_test_cluster/testapp_funnel.dockerfile
@@ -0,0 +1,9 @@
+FROM mcr.microsoft.com/dotnet/aspnet:8.0
+
+USER $APP_UID
+
+ENV ASPNETCORE_ENVIRONMENT=Development
+ARG MassTransit__RabbitMq__Url
+
+COPY --chown=$APP_UID out/testapp_funnel /home/app/bin
+ENTRYPOINT ["dotnet", "/home/app/bin/TestAppFunnel.dll"]
diff --git a/publisher/test/LeanCode.Pipe.Funnel.MultipleServicesTests/LeanCode.Pipe.Funnel.MultipleServicesTests.csproj b/publisher/test/LeanCode.Pipe.Funnel.MultipleServicesTests/LeanCode.Pipe.Funnel.MultipleServicesTests.csproj
new file mode 100644
index 0000000..b4a5a2c
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.MultipleServicesTests/LeanCode.Pipe.Funnel.MultipleServicesTests.csproj
@@ -0,0 +1,20 @@
+
+
+
+ MultipleServicesTests
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/publisher/test/LeanCode.Pipe.Funnel.MultipleServicesTests/MultipleTargetServicesTests.cs b/publisher/test/LeanCode.Pipe.Funnel.MultipleServicesTests/MultipleTargetServicesTests.cs
new file mode 100644
index 0000000..f36c2b7
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.MultipleServicesTests/MultipleTargetServicesTests.cs
@@ -0,0 +1,153 @@
+using System.Net.Http.Json;
+using FluentAssertions;
+using LeanCode.Pipe.Funnel.TestApp1;
+using LeanCode.Pipe.Funnel.TestApp2;
+using LeanCode.Pipe.TestClient;
+using Microsoft.AspNetCore.Http.Connections;
+using Xunit;
+
+namespace LeanCode.Pipe.Funnel.MultipleServicesTests;
+
+public class MultipleTargetServicesTests : IAsyncLifetime
+{
+ private readonly LeanPipeTestClient leanPipeClient =
+ new(
+ new(
+ "http://multiple-services-funnel-0.multiple-services-funnel-svc.multiple-services.svc.cluster.local:8080/leanpipe"
+ ),
+ new(typeof(Topic1), typeof(Topic2)),
+ cfg =>
+ {
+ cfg.Transports = HttpTransportType.WebSockets;
+ cfg.SkipNegotiation = true;
+ }
+ );
+
+ private readonly HttpClient testApp1Client =
+ new()
+ {
+ BaseAddress = new(
+ "http://multiple-services-testapp1-0.multiple-services-testapp1-svc.multiple-services.svc.cluster.local:8080"
+ ),
+ };
+
+ private readonly HttpClient testApp2Client =
+ new()
+ {
+ BaseAddress = new(
+ "http://multiple-services-testapp2-0.multiple-services-testapp2-svc.multiple-services.svc.cluster.local:8080"
+ ),
+ };
+
+ [Fact]
+ public async Task Subscribing_and_receiving_notifications_from_any_target_service_works_and_does_not_interfere_with_each_other()
+ {
+ var topic1 = new Topic1
+ {
+ Topic1Id = nameof(
+ Subscribing_and_receiving_notifications_from_any_target_service_works_and_does_not_interfere_with_each_other
+ ),
+ };
+
+ var topic2 = new Topic2
+ {
+ Topic2Id = nameof(
+ Subscribing_and_receiving_notifications_from_any_target_service_works_and_does_not_interfere_with_each_other
+ ),
+ };
+
+ var expectedNotification1 = new Notification1
+ {
+ Greeting = $"Hello from topic1 {topic1.Topic1Id}",
+ };
+
+ var expectedNotification2 = new Notification2
+ {
+ Farewell = $"Goodbye from topic2 {topic2.Topic2Id}",
+ };
+
+ await leanPipeClient.SubscribeSuccessAsync(topic1);
+ await leanPipeClient.SubscribeSuccessAsync(topic2);
+
+ await NotificationsAreReceivedOnlyOnThePublishedTopics();
+
+ await leanPipeClient.UnsubscribeSuccessAsync(topic1);
+
+ await NotificationsAreNotReceivedOnTheUnsubscribedTopic();
+ await NotificationsAreStillReceivedOnTheOtherSubscribedTopic();
+
+ await leanPipeClient.UnsubscribeSuccessAsync(topic2);
+
+ async Task NotificationsAreReceivedOnlyOnThePublishedTopics()
+ {
+ var service1Notification = leanPipeClient.WaitForNextNotificationOn(topic1);
+ var service2Notification = leanPipeClient.WaitForNextNotificationOn(
+ topic2,
+ timeout: TimeSpan.FromMilliseconds(500)
+ );
+
+ await testApp1Client.PostAsJsonAsync("/publish", topic1);
+
+ (await service1Notification)
+ .Should()
+ .BeEquivalentTo(expectedNotification1, opts => opts.RespectingRuntimeTypes());
+
+ await service2Notification
+ .Awaiting(x => x)
+ .Should()
+ .ThrowAsync();
+
+ service2Notification = leanPipeClient.WaitForNextNotificationOn(topic2);
+ service1Notification = leanPipeClient.WaitForNextNotificationOn(
+ topic1,
+ timeout: TimeSpan.FromMilliseconds(500)
+ );
+
+ await testApp2Client.PostAsJsonAsync("/publish", topic2);
+
+ (await service2Notification)
+ .Should()
+ .BeEquivalentTo(expectedNotification2, opts => opts.RespectingRuntimeTypes());
+
+ await service1Notification
+ .Awaiting(x => x)
+ .Should()
+ .ThrowAsync();
+ }
+
+ async Task NotificationsAreNotReceivedOnTheUnsubscribedTopic()
+ {
+ var service1Notification = leanPipeClient.WaitForNextNotificationOn(
+ topic1,
+ timeout: TimeSpan.FromMilliseconds(500)
+ );
+
+ await testApp1Client.PostAsJsonAsync("/publish", topic1);
+
+ await service1Notification
+ .Awaiting(x => x)
+ .Should()
+ .ThrowAsync();
+ }
+
+ async Task NotificationsAreStillReceivedOnTheOtherSubscribedTopic()
+ {
+ var service2Notification = leanPipeClient.WaitForNextNotificationOn(topic2);
+
+ await testApp2Client.PostAsJsonAsync("/publish", topic2);
+
+ (await service2Notification)
+ .Should()
+ .BeEquivalentTo(expectedNotification2, opts => opts.RespectingRuntimeTypes());
+ }
+ }
+
+ public Task InitializeAsync() => Task.CompletedTask;
+
+ public async Task DisposeAsync()
+ {
+ await leanPipeClient.DisposeAsync();
+ testApp1Client.Dispose();
+ testApp2Client.Dispose();
+ }
+}
diff --git a/publisher/test/LeanCode.Pipe.Funnel.NoScalingTests/LeanCode.Pipe.Funnel.NoScalingTests.csproj b/publisher/test/LeanCode.Pipe.Funnel.NoScalingTests/LeanCode.Pipe.Funnel.NoScalingTests.csproj
new file mode 100644
index 0000000..fc0bfa0
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.NoScalingTests/LeanCode.Pipe.Funnel.NoScalingTests.csproj
@@ -0,0 +1,19 @@
+
+
+
+ NoScalingTests
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/publisher/test/LeanCode.Pipe.Funnel.NoScalingTests/NoScalingTests.cs b/publisher/test/LeanCode.Pipe.Funnel.NoScalingTests/NoScalingTests.cs
new file mode 100644
index 0000000..df31953
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.NoScalingTests/NoScalingTests.cs
@@ -0,0 +1,68 @@
+using System.Net.Http.Json;
+using FluentAssertions;
+using LeanCode.Pipe.Funnel.TestApp1;
+using LeanCode.Pipe.TestClient;
+using Microsoft.AspNetCore.Http.Connections;
+using Xunit;
+
+namespace LeanCode.Pipe.Funnel.NoScalingTests;
+
+public class NoScalingTests : IAsyncLifetime
+{
+ private readonly LeanPipeTestClient leanPipeClient =
+ new(
+ new(
+ "http://no-scaling-funnel-0.no-scaling-funnel-svc.no-scaling.svc.cluster.local:8080/leanpipe"
+ ),
+ new(typeof(Topic1)),
+ cfg =>
+ {
+ cfg.Transports = HttpTransportType.WebSockets;
+ cfg.SkipNegotiation = true;
+ }
+ );
+
+ private readonly HttpClient testApp1Client =
+ new()
+ {
+ BaseAddress = new(
+ "http://no-scaling-testapp1-0.no-scaling-testapp1-svc.no-scaling.svc.cluster.local:8080"
+ ),
+ };
+
+ [Fact]
+ public async Task Subscribing_and_receiving_notifications_works()
+ {
+ var topic = new Topic1
+ {
+ Topic1Id = nameof(Subscribing_and_receiving_notifications_works),
+ };
+
+ await leanPipeClient.SubscribeSuccessAsync(topic);
+
+ var expectedNotification = new Notification1
+ {
+ Greeting = $"Hello from topic1 {topic.Topic1Id}",
+ };
+
+ var notification = leanPipeClient.WaitForNextNotificationOn(topic);
+
+ await testApp1Client.PostAsJsonAsync("/publish", topic);
+
+ (await notification)
+ .Should()
+ .BeEquivalentTo(expectedNotification, opts => opts.RespectingRuntimeTypes());
+
+ var instanceBNotification = leanPipeClient.WaitForNextNotificationOn(topic);
+
+ await leanPipeClient.UnsubscribeSuccessAsync(topic);
+ }
+
+ public Task InitializeAsync() => Task.CompletedTask;
+
+ public async Task DisposeAsync()
+ {
+ await leanPipeClient.DisposeAsync();
+ testApp1Client.Dispose();
+ }
+}
diff --git a/publisher/test/LeanCode.Pipe.Funnel.ScaledFunnelTests/LeanCode.Pipe.Funnel.ScaledFunnelTests.csproj b/publisher/test/LeanCode.Pipe.Funnel.ScaledFunnelTests/LeanCode.Pipe.Funnel.ScaledFunnelTests.csproj
new file mode 100644
index 0000000..e6fff6b
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.ScaledFunnelTests/LeanCode.Pipe.Funnel.ScaledFunnelTests.csproj
@@ -0,0 +1,19 @@
+
+
+
+ ScaledFunnelTests
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/publisher/test/LeanCode.Pipe.Funnel.ScaledFunnelTests/ScaledFunnelTests.cs b/publisher/test/LeanCode.Pipe.Funnel.ScaledFunnelTests/ScaledFunnelTests.cs
new file mode 100644
index 0000000..4033dae
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.ScaledFunnelTests/ScaledFunnelTests.cs
@@ -0,0 +1,87 @@
+using System.Net.Http.Json;
+using FluentAssertions;
+using LeanCode.Pipe.Funnel.TestApp1;
+using LeanCode.Pipe.TestClient;
+using Microsoft.AspNetCore.Http.Connections;
+using Xunit;
+
+namespace LeanCode.Pipe.Funnel.ScaledFunnelTests;
+
+public class ScaledFunnelTests : IAsyncLifetime
+{
+ private readonly LeanPipeTestClient leanPipeAClient =
+ new(
+ new(
+ "http://scaled-funnel-funnel-0.scaled-funnel-funnel-svc.scaled-funnel.svc.cluster.local:8080/leanpipe"
+ ),
+ new(typeof(Topic1)),
+ cfg =>
+ {
+ cfg.Transports = HttpTransportType.WebSockets;
+ cfg.SkipNegotiation = true;
+ }
+ );
+
+ private readonly LeanPipeTestClient leanPipeBClient =
+ new(
+ new(
+ "http://scaled-funnel-funnel-1.scaled-funnel-funnel-svc.scaled-funnel.svc.cluster.local:8080/leanpipe"
+ ),
+ new(typeof(Topic1)),
+ cfg =>
+ {
+ cfg.Transports = HttpTransportType.WebSockets;
+ cfg.SkipNegotiation = true;
+ }
+ );
+
+ private readonly HttpClient testApp1Client =
+ new()
+ {
+ BaseAddress = new(
+ "http://scaled-funnel-testapp1-0.scaled-funnel-testapp1-svc.scaled-funnel.svc.cluster.local:8080"
+ ),
+ };
+
+ [Fact]
+ public async Task Client_receives_notifications_while_connected_to_any_Funnel_instance()
+ {
+ var topic = new Topic1
+ {
+ Topic1Id = nameof(Client_receives_notifications_while_connected_to_any_Funnel_instance),
+ };
+
+ await leanPipeAClient.SubscribeSuccessAsync(topic);
+ await leanPipeBClient.SubscribeSuccessAsync(topic);
+
+ var expectedNotification = new Notification1
+ {
+ Greeting = $"Hello from topic1 {topic.Topic1Id}",
+ };
+
+ var funnelANotification = leanPipeAClient.WaitForNextNotificationOn(topic);
+ var funnelBNotification = leanPipeBClient.WaitForNextNotificationOn(topic);
+
+ await testApp1Client.PostAsJsonAsync("/publish", topic);
+
+ (await funnelANotification)
+ .Should()
+ .BeEquivalentTo(expectedNotification, opts => opts.RespectingRuntimeTypes());
+
+ (await funnelBNotification)
+ .Should()
+ .BeEquivalentTo(expectedNotification, opts => opts.RespectingRuntimeTypes());
+
+ await leanPipeAClient.UnsubscribeSuccessAsync(topic);
+ await leanPipeBClient.UnsubscribeSuccessAsync(topic);
+ }
+
+ public Task InitializeAsync() => Task.CompletedTask;
+
+ public async Task DisposeAsync()
+ {
+ await leanPipeAClient.DisposeAsync();
+ await leanPipeBClient.DisposeAsync();
+ testApp1Client.Dispose();
+ }
+}
diff --git a/publisher/test/LeanCode.Pipe.Funnel.ScaledTargetServiceTests/LeanCode.Pipe.Funnel.ScaledTargetServiceTests.csproj b/publisher/test/LeanCode.Pipe.Funnel.ScaledTargetServiceTests/LeanCode.Pipe.Funnel.ScaledTargetServiceTests.csproj
new file mode 100644
index 0000000..2100afd
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.ScaledTargetServiceTests/LeanCode.Pipe.Funnel.ScaledTargetServiceTests.csproj
@@ -0,0 +1,19 @@
+
+
+
+ ScaledTargetServiceTests
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/publisher/test/LeanCode.Pipe.Funnel.ScaledTargetServiceTests/ScaledTargetServiceTests.cs b/publisher/test/LeanCode.Pipe.Funnel.ScaledTargetServiceTests/ScaledTargetServiceTests.cs
new file mode 100644
index 0000000..c33b683
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.ScaledTargetServiceTests/ScaledTargetServiceTests.cs
@@ -0,0 +1,83 @@
+using System.Net.Http.Json;
+using FluentAssertions;
+using LeanCode.Pipe.Funnel.TestApp1;
+using LeanCode.Pipe.TestClient;
+using Microsoft.AspNetCore.Http.Connections;
+using Xunit;
+
+namespace LeanCode.Pipe.Funnel.ScaledTargetServiceTests;
+
+public class ScaledTargetServiceTests : IAsyncLifetime
+{
+ private readonly LeanPipeTestClient leanPipeClient =
+ new(
+ new(
+ "http://scaled-target-service-funnel-0.scaled-target-service-funnel-svc.scaled-target-service.svc.cluster.local:8080/leanpipe"
+ ),
+ new(typeof(Topic1)),
+ cfg =>
+ {
+ cfg.Transports = HttpTransportType.WebSockets;
+ cfg.SkipNegotiation = true;
+ }
+ );
+
+ private readonly HttpClient testApp1AClient =
+ new()
+ {
+ BaseAddress = new(
+ "http://scaled-target-service-testapp1-0.scaled-target-service-testapp1-svc.scaled-target-service.svc.cluster.local:8080"
+ ),
+ };
+
+ private readonly HttpClient testApp1BClient =
+ new()
+ {
+ BaseAddress = new(
+ "http://scaled-target-service-testapp1-1.scaled-target-service-testapp1-svc.scaled-target-service.svc.cluster.local:8080"
+ ),
+ };
+
+ [Fact]
+ public async Task Publishing_notifications_from_any_service_instance_works()
+ {
+ var topic = new Topic1
+ {
+ Topic1Id = nameof(Publishing_notifications_from_any_service_instance_works),
+ };
+
+ await leanPipeClient.SubscribeSuccessAsync(topic);
+
+ var expectedNotification = new Notification1
+ {
+ Greeting = $"Hello from topic1 {topic.Topic1Id}",
+ };
+
+ var instanceANotification = leanPipeClient.WaitForNextNotificationOn(topic);
+
+ await testApp1AClient.PostAsJsonAsync("/publish", topic);
+
+ (await instanceANotification)
+ .Should()
+ .BeEquivalentTo(expectedNotification, opts => opts.RespectingRuntimeTypes());
+
+ var instanceBNotification = leanPipeClient.WaitForNextNotificationOn(topic);
+
+ await testApp1BClient.PostAsJsonAsync("/publish", topic);
+
+ (await instanceBNotification)
+ .Should()
+ .BeEquivalentTo(expectedNotification, opts => opts.RespectingRuntimeTypes());
+
+ await leanPipeClient.UnsubscribeSuccessAsync(topic);
+ }
+
+ public Task InitializeAsync() => Task.CompletedTask;
+
+ public async Task DisposeAsync()
+ {
+ await leanPipeClient.DisposeAsync();
+ testApp1AClient.Dispose();
+ testApp1BClient.Dispose();
+ }
+}
diff --git a/publisher/test/LeanCode.Pipe.Funnel.TestApp1/LeanCode.Pipe.Funnel.TestApp1.csproj b/publisher/test/LeanCode.Pipe.Funnel.TestApp1/LeanCode.Pipe.Funnel.TestApp1.csproj
new file mode 100644
index 0000000..6f56ce7
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.TestApp1/LeanCode.Pipe.Funnel.TestApp1.csproj
@@ -0,0 +1,14 @@
+
+
+
+ Exe
+ TestApp1
+
+
+
+
+
+
+
+
+
diff --git a/publisher/test/LeanCode.Pipe.Funnel.TestApp1/Program.cs b/publisher/test/LeanCode.Pipe.Funnel.TestApp1/Program.cs
new file mode 100644
index 0000000..a0f389c
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.TestApp1/Program.cs
@@ -0,0 +1,49 @@
+using LeanCode.Logging;
+using LeanCode.Pipe;
+using LeanCode.Pipe.Funnel.FunnelledService;
+using LeanCode.Pipe.Funnel.TestApp1;
+using MassTransit;
+
+var appBuilder = WebApplication.CreateBuilder(args);
+var hostBuilder = appBuilder.Host;
+
+hostBuilder.ConfigureDefaultLogging("TestApp1", new[] { typeof(Program).Assembly });
+
+var services = appBuilder.Services;
+
+services.AddFunnelledLeanPipe(new(typeof(Topic1)), new(typeof(Topic1Keys)));
+
+services.AddOptions().Configure(opts => opts.WaitUntilStarted = true);
+services.AddMassTransit(cfg =>
+{
+ cfg.AddFunnelledLeanPipeConsumers("TestApp1", new[] { typeof(Program).Assembly });
+
+ cfg.UsingRabbitMq(
+ (ctx, cfg) =>
+ {
+ cfg.Host(appBuilder.Configuration.GetValue("MassTransit:RabbitMq:Url"));
+ cfg.ConfigureEndpoints(ctx);
+ }
+ );
+});
+
+services.AddHealthChecks();
+
+var app = appBuilder.Build();
+
+app.UseRouting();
+
+app.MapHealthChecks("/health/live");
+app.MapHealthChecks("/health/ready", new() { Predicate = check => check.Tags.Contains("ready") });
+
+app.MapPost(
+ "/publish",
+ async (HttpContext ctx, Topic1 topic, ILeanPipePublisher publisher) =>
+ await publisher.PublishAsync(
+ topic,
+ new Notification1 { Greeting = $"Hello from topic1 {topic.Topic1Id}" },
+ ctx.RequestAborted
+ )
+);
+
+app.Run();
diff --git a/publisher/test/LeanCode.Pipe.Funnel.TestApp1/Topic1.cs b/publisher/test/LeanCode.Pipe.Funnel.TestApp1/Topic1.cs
new file mode 100644
index 0000000..6c5dc69
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.TestApp1/Topic1.cs
@@ -0,0 +1,20 @@
+using LeanCode.Contracts;
+using LeanCode.Contracts.Security;
+
+namespace LeanCode.Pipe.Funnel.TestApp1;
+
+[AllowUnauthorized]
+public class Topic1 : ITopic, IProduceNotification
+{
+ public string Topic1Id { get; set; } = default!;
+}
+
+public class Notification1
+{
+ public string Greeting { get; set; } = default!;
+}
+
+public class Topic1Keys : BasicTopicKeys
+{
+ public override IEnumerable Get(Topic1 topic) => new[] { $"topic1_{topic.Topic1Id}" };
+}
diff --git a/publisher/test/LeanCode.Pipe.Funnel.TestApp2/LeanCode.Pipe.Funnel.TestApp2.csproj b/publisher/test/LeanCode.Pipe.Funnel.TestApp2/LeanCode.Pipe.Funnel.TestApp2.csproj
new file mode 100644
index 0000000..a60c848
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.TestApp2/LeanCode.Pipe.Funnel.TestApp2.csproj
@@ -0,0 +1,14 @@
+
+
+
+ Exe
+ TestApp2
+
+
+
+
+
+
+
+
+
diff --git a/publisher/test/LeanCode.Pipe.Funnel.TestApp2/Program.cs b/publisher/test/LeanCode.Pipe.Funnel.TestApp2/Program.cs
new file mode 100644
index 0000000..5b9d7e4
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.TestApp2/Program.cs
@@ -0,0 +1,49 @@
+using LeanCode.Logging;
+using LeanCode.Pipe;
+using LeanCode.Pipe.Funnel.FunnelledService;
+using LeanCode.Pipe.Funnel.TestApp2;
+using MassTransit;
+
+var appBuilder = WebApplication.CreateBuilder(args);
+var hostBuilder = appBuilder.Host;
+
+hostBuilder.ConfigureDefaultLogging("TestApp2", new[] { typeof(Program).Assembly });
+
+var services = appBuilder.Services;
+
+services.AddFunnelledLeanPipe(new(typeof(Topic2)), new(typeof(Topic2Keys)));
+
+services.AddOptions().Configure(opts => opts.WaitUntilStarted = true);
+services.AddMassTransit(cfg =>
+{
+ cfg.AddFunnelledLeanPipeConsumers("TestApp2", new[] { typeof(Program).Assembly });
+
+ cfg.UsingRabbitMq(
+ (ctx, cfg) =>
+ {
+ cfg.Host(appBuilder.Configuration.GetValue("MassTransit:RabbitMq:Url"));
+ cfg.ConfigureEndpoints(ctx);
+ }
+ );
+});
+
+services.AddHealthChecks();
+
+var app = appBuilder.Build();
+
+app.UseRouting();
+
+app.MapHealthChecks("/health/live");
+app.MapHealthChecks("/health/ready", new() { Predicate = check => check.Tags.Contains("ready") });
+
+app.MapPost(
+ "/publish",
+ async (HttpContext ctx, Topic2 topic, ILeanPipePublisher publisher) =>
+ await publisher.PublishAsync(
+ topic,
+ new Notification2 { Farewell = $"Goodbye from topic2 {topic.Topic2Id}" },
+ ctx.RequestAborted
+ )
+);
+
+app.Run();
diff --git a/publisher/test/LeanCode.Pipe.Funnel.TestApp2/Topic2.cs b/publisher/test/LeanCode.Pipe.Funnel.TestApp2/Topic2.cs
new file mode 100644
index 0000000..a8c7bcb
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.TestApp2/Topic2.cs
@@ -0,0 +1,20 @@
+using LeanCode.Contracts;
+using LeanCode.Contracts.Security;
+
+namespace LeanCode.Pipe.Funnel.TestApp2;
+
+[AllowUnauthorized]
+public class Topic2 : ITopic, IProduceNotification
+{
+ public string Topic2Id { get; set; } = default!;
+}
+
+public class Notification2
+{
+ public string Farewell { get; set; } = default!;
+}
+
+public class Topic2Keys : BasicTopicKeys
+{
+ public override IEnumerable Get(Topic2 topic) => new[] { $"topic2_{topic.Topic2Id}" };
+}
diff --git a/publisher/test/LeanCode.Pipe.Funnel.TestAppFunnel/LeanCode.Pipe.Funnel.TestAppFunnel.csproj b/publisher/test/LeanCode.Pipe.Funnel.TestAppFunnel/LeanCode.Pipe.Funnel.TestAppFunnel.csproj
new file mode 100644
index 0000000..bc31403
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.TestAppFunnel/LeanCode.Pipe.Funnel.TestAppFunnel.csproj
@@ -0,0 +1,14 @@
+
+
+
+ Exe
+ TestAppFunnel
+
+
+
+
+
+
+
+
+
diff --git a/publisher/test/LeanCode.Pipe.Funnel.TestAppFunnel/Program.cs b/publisher/test/LeanCode.Pipe.Funnel.TestAppFunnel/Program.cs
new file mode 100644
index 0000000..f1b9b25
--- /dev/null
+++ b/publisher/test/LeanCode.Pipe.Funnel.TestAppFunnel/Program.cs
@@ -0,0 +1,47 @@
+using LeanCode.Logging;
+using LeanCode.Pipe;
+using LeanCode.Pipe.Funnel.Instance;
+using MassTransit;
+using Microsoft.AspNetCore.Http.Connections;
+
+var appBuilder = WebApplication.CreateBuilder(args);
+var hostBuilder = appBuilder.Host;
+
+hostBuilder.ConfigureDefaultLogging("TestAppFunnel", new[] { typeof(Program).Assembly });
+
+var services = appBuilder.Services;
+
+services.AddLeanPipeFunnel();
+
+services.AddOptions().Configure(opts => opts.WaitUntilStarted = true);
+services.AddMassTransit(cfg =>
+{
+ cfg.ConfigureLeanPipeFunnelConsumers();
+
+ cfg.UsingRabbitMq(
+ (ctx, cfg) =>
+ {
+ cfg.Host(appBuilder.Configuration.GetValue("MassTransit:RabbitMq:Url"));
+ cfg.ConfigureEndpoints(ctx);
+ }
+ );
+});
+
+services.AddHealthChecks();
+
+var app = appBuilder.Build();
+
+app.UseRouting();
+
+app.MapHealthChecks("/health/live");
+app.MapHealthChecks("/health/ready", new() { Predicate = check => check.Tags.Contains("ready"), });
+
+app.MapLeanPipe(
+ "/leanpipe",
+ opts =>
+ {
+ opts.Transports = HttpTransportType.WebSockets;
+ }
+);
+
+app.Run();