Skip to content

Commit

Permalink
Merge branch 'release-3.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
SzymonPobiega committed Dec 19, 2018
2 parents f7e042e + c97a491 commit 49152c6
Show file tree
Hide file tree
Showing 220 changed files with 9,990 additions and 1,082 deletions.
8 changes: 8 additions & 0 deletions src/AcceptanceTesting/AcceptanceTesting.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<LangVersion>7.1</LangVersion>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<LangVersion>7.1</LangVersion>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NServiceBus" Version="7.0.0-*" />
<PackageReference Include="NServiceBus.Newtonsoft.Json" Version="2.0.0-*" />
Expand Down
5 changes: 5 additions & 0 deletions src/AcceptanceTesting/Infrastructure/IPoisonSpyContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
public interface IPoisonSpyContext
{
string ExceptionMessage { get; set; }
bool PoisonMessageDetected { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using NServiceBus.Unicast.Subscriptions;
using NServiceBus.Unicast.Subscriptions.MessageDrivenSubscriptions;

class InMemorySubscriptionStorage : ISubscriptionStorage
public class InMemorySubscriptionStorage : ISubscriptionStorage
{
public Task Subscribe(Subscriber subscriber, MessageType messageType, ContextBag context)
{
Expand Down
77 changes: 77 additions & 0 deletions src/AcceptanceTesting/Infrastructure/PoisonSpyComponent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.AcceptanceTesting.Support;
using NServiceBus.Raw;
using NServiceBus.Transport;

class PoisonSpyComponent : IComponentBehavior
{
Action<TransportExtensions<TestTransport>> transportConfiguration;

public PoisonSpyComponent(Action<TransportExtensions<TestTransport>> transportConfiguration)
{
this.transportConfiguration = transportConfiguration;
}

public Task<ComponentRunner> CreateRunner(RunDescriptor run)
{
return Task.FromResult<ComponentRunner>(new Runner(transportConfiguration, (IPoisonSpyContext)run.ScenarioContext));
}

class Runner : ComponentRunner
{
Action<TransportExtensions<TestTransport>> transportConfiguration;
IPoisonSpyContext scenarioContext;
IReceivingRawEndpoint endpoint;
int failureDetected;

public Runner(Action<TransportExtensions<TestTransport>> transportConfiguration, IPoisonSpyContext scenarioContext)
{
this.transportConfiguration = transportConfiguration;
this.scenarioContext = scenarioContext;
}

public override string Name => "PoisonQueueSpy";

public override async Task Start(CancellationToken token)
{
var config = RawEndpointConfiguration.Create("poison", OnMessage, "poison");
config.AutoCreateQueue();
config.CustomErrorHandlingPolicy(new IgnoreErrorsPolicy());
var transport = config.UseTransport<TestTransport>();
transportConfiguration(transport);

endpoint = await RawEndpoint.Start(config);
}

Task OnMessage(MessageContext messageContext, IDispatchMessages dispatcher)
{
if (Interlocked.CompareExchange(ref failureDetected, 1, 0) == 0)
{
if (messageContext.Headers.TryGetValue("NServiceBus.ExceptionInfo.Message", out var exceptionMessage))
{
scenarioContext.ExceptionMessage = exceptionMessage;
}
scenarioContext.PoisonMessageDetected = true;
}
return Task.CompletedTask;
}

public override Task Stop()
{
return endpoint != null
? endpoint.Stop()
: Task.CompletedTask;
}

class IgnoreErrorsPolicy : IErrorHandlingPolicy
{
public Task<ErrorHandleResult> OnError(IErrorHandlingPolicyContext handlingContext, IDispatchMessages dispatcher)
{
return Task.FromResult(ErrorHandleResult.Handled);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTesting.Support;

public static class PoisonSpyComponentExtensions
{
public static IScenarioWithEndpointBehavior<TContext> WithPosionSpyComponent<TContext>(this IScenarioWithEndpointBehavior<TContext> scenario, Action<TransportExtensions<TestTransport>> transportConfiguration)
where TContext : ScenarioContext
{
return scenario.WithComponent(new PoisonSpyComponent(transportConfiguration));
}
}
5 changes: 2 additions & 3 deletions src/AcceptanceTesting/Infrastructure/RouterComponent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using NServiceBus.AcceptanceTesting.Support;
using NServiceBus.Router;


class RouterComponent : IComponentBehavior
{
Func<ScenarioContext, RouterConfiguration> configCallback;
Expand All @@ -23,7 +22,7 @@ public Task<ComponentRunner> CreateRunner(RunDescriptor run)
config.AutoCreateQueues();
var newFactories = new List<Func<Interface>>();

foreach (var factory in config.PortFactories)
foreach (var factory in config.InterfaceFactories)
{
Interface NewFactory()
{
Expand All @@ -33,7 +32,7 @@ Interface NewFactory()
newFactories.Add(NewFactory);
}

config.PortFactories = newFactories;
config.InterfaceFactories = newFactories;
var @switch = Router.Create(config);
return Task.FromResult<ComponentRunner>(new Runner(@switch, "Router"));
}
Expand Down
3 changes: 2 additions & 1 deletion src/AcceptanceTesting/TestTransport/TestTransport.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
namespace NServiceBus
{
using Routing;
using Settings;
using Transport;

/// <summary>
/// A transport optimized for development and learning use. DO NOT use in production.
/// </summary>
public class TestTransport : TransportDefinition
public class TestTransport : TransportDefinition, IMessageDrivenSubscriptionTransport
{
/// <summary>
/// Used by implementations to control if a connection string is necessary.
Expand Down
6 changes: 6 additions & 0 deletions src/LoadTests.Receiver.Router/App.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<add key="Deduplicate" value="true"/>
</appSettings>
</configuration>
16 changes: 16 additions & 0 deletions src/LoadTests.Receiver.Router/LoadTests.Receiver.Router.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net461</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\NServiceBus.Router.SqlServer\NServiceBus.Router.SqlServer.csproj" />
<ProjectReference Include="..\NServiceBus.Router\NServiceBus.Router.csproj" />
<PackageReference Include="NServiceBus.SqlServer" Version="[4.1.0, 5.0.0)" />
<PackageReference Include="NServiceBus.RabbitMQ" Version="[5.0.0, 6.0.0)" />
<PackageReference Include="Metrics.NET" Version="0.5.5" />
</ItemGroup>
</Project>
56 changes: 56 additions & 0 deletions src/LoadTests.Receiver.Router/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
namespace LoadTests.Sender.Router
{
using System;
using System.Data.SqlClient;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Router;

class Program
{
static void Main(string[] args)
{
Start().GetAwaiter().GetResult();
}

static async Task Start()
{
var sqlConnectionString = SettingsReader<string>.Read("SqlConnectionString", "data source=(local); initial catalog=loadtest; integrated security=true");
var rabbitConnectionString = SettingsReader<string>.Read("RabbitConnectionString", "host=localhost");
var epochSize = SettingsReader<int>.Read("EpochSize", 10000);

var routerConfig = new RouterConfiguration("Receiver.Router");
routerConfig.AutoCreateQueues();
var deduplicationConfig = routerConfig.ConfigureDeduplication();
#pragma warning disable 618
deduplicationConfig.EnableInstaller(true);
#pragma warning restore 618

var linkInterface = routerConfig.AddInterface<RabbitMQTransport>("Rabbit", t =>
{
t.ConnectionString(rabbitConnectionString);
t.UseConventionalRoutingTopology();
});

var sqlInterface = routerConfig.AddInterface<SqlServerTransport>("SQL", t =>
{
t.ConnectionString(sqlConnectionString);
t.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
});
sqlInterface.UseSubscriptionPersistence(new SqlSubscriptionStorage(() => new SqlConnection(sqlConnectionString), "ReceiverRouter", new SqlDialect.MsSqlServer(), null));
sqlInterface.EnableDeduplication(linkInterface.Name, "Sender.Router", () => new SqlConnection(sqlConnectionString), epochSize);

var routingProtocol = routerConfig.UseStaticRoutingProtocol();
routingProtocol.AddForwardRoute("Rabbit", "SQL");

var router = Router.Create(routerConfig);

await router.Start();

Console.WriteLine("Press <enter> to exit.");
Console.ReadLine();

await router.Stop();
}
}
}
27 changes: 27 additions & 0 deletions src/LoadTests.Receiver.Router/SettingsReader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;
using System.Configuration;

public static class SettingsReader<T>
{
public static T Read(string name, T defaultValue = default)
{
if (TryRead(name, out var value))
{
return value;
}

return defaultValue;
}

public static bool TryRead(string name, out T value)
{
if (ConfigurationManager.AppSettings[name] != null)
{
value = (T)Convert.ChangeType(ConfigurationManager.AppSettings[name], typeof(T));
return true;
}

value = default;
return false;
}
}
18 changes: 18 additions & 0 deletions src/LoadTests.Receiver/LoadTests.Receiver.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net461</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\NServiceBus.Router.Connector\NServiceBus.Router.Connector.csproj" />
<ProjectReference Include="..\LoadTests.Shared\LoadTests.Shared.csproj" />

<PackageReference Include="NServiceBus" Version="[7.0.0, 8.0.0)" />
<PackageReference Include="NServiceBus.SqlServer" Version="[4.1.0, 5.0.0)" />
<PackageReference Include="NServiceBus.Newtonsoft.Json" Version="[2.0.0, 3.0.0)" />
<PackageReference Include="Metrics.NET" Version="0.5.5" />
</ItemGroup>
</Project>
10 changes: 10 additions & 0 deletions src/LoadTests.Receiver/MyMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Threading.Tasks;
using NServiceBus;

class MyMessageHandler : IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
return Task.CompletedTask;
}
}
49 changes: 49 additions & 0 deletions src/LoadTests.Receiver/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
namespace LoadTests.Sender
{
using System;
using System.Threading.Tasks;
using Metrics;
using NServiceBus;
using NServiceBus.Transport.SQLServer;

class Program
{
static IEndpointInstance endpointInstance;

static void Main(string[] args)
{
Start().GetAwaiter().GetResult();
}

static async Task Start()
{
var config = new EndpointConfiguration("Receiver");
config.UseSerialization<NewtonsoftSerializer>();
config.SendFailedMessagesTo("error");
config.EnableInstallers();
config.UsePersistence<InMemoryPersistence>();

Metric.Config.WithReporting(r =>
{
r.WithCSVReports(".", TimeSpan.FromSeconds(5));
});

config.RegisterComponents(c => c.RegisterSingleton(new Statistics()));

var connectionString = SettingsReader<string>.Read("SqlConnectionString", "data source=(local); initial catalog=loadtest; integrated security=true");

var senderTransport = config.UseTransport<SqlServerTransport>();
senderTransport.UseNativeDelayedDelivery().DisableTimeoutManagerCompatibility();
senderTransport.ConnectionString(connectionString);
senderTransport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);

senderTransport.Routing().RouteToEndpoint(typeof(ProcessingReport), "Sender");

endpointInstance = await Endpoint.Start(config);
Console.WriteLine("Press <enter> to exit.");
Console.ReadLine();

await endpointInstance.Stop();
}
}
}
Loading

0 comments on commit 49152c6

Please sign in to comment.