Skip to content

Commit

Permalink
Merge pull request #31 from Particular/conventional-usage-refactor
Browse files Browse the repository at this point in the history
Changed structure/usage to adhere to default conventions
  • Loading branch information
WilliamBZA authored Sep 20, 2024
2 parents bdaf8f4 + 6f1d71d commit a584b3e
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 59 deletions.
20 changes: 10 additions & 10 deletions src/Billing/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ public static IHostBuilder CreateHostBuilder(string[] args)
{
services.AddMassTransit(x =>
{
x.AddConsumers(Assembly.GetExecutingAssembly());

x.AddConfigureEndpointsCallback((name, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
{
rmq.SetQuorumQueue();
}
});

x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
Expand All @@ -28,16 +38,6 @@ public static IHostBuilder CreateHostBuilder(string[] args)

cfg.ConfigureEndpoints(context);
});

x.AddConfigureEndpointsCallback((name, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
{
rmq.SetQuorumQueue();
}
});

x.AddConsumers(Assembly.GetExecutingAssembly());
});

services.AddSingleton<SimulationEffects>();
Expand Down
22 changes: 11 additions & 11 deletions src/ClientUI/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ public static IHostBuilder CreateHostBuilder(string[] args)
{
services.AddMassTransit(x =>
{
x.AddConsumers(Assembly.GetExecutingAssembly());

x.AddConfigureEndpointsCallback((name, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
{
rmq.SetQuorumQueue();
}
});

x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
Expand All @@ -28,20 +38,10 @@ public static IHostBuilder CreateHostBuilder(string[] args)

cfg.ConfigureEndpoints(context);
});

x.AddConfigureEndpointsCallback((name, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
{
rmq.SetQuorumQueue();
}
});

x.AddConsumers(Assembly.GetExecutingAssembly());
});

services.AddSingleton<SimulatedCustomers>();
services.AddHostedService(p => p.GetRequiredService<SimulatedCustomers>());
services.AddHostedService<SimulatedCustomers>();
services.AddHostedService<ConsoleBackgroundService>();
});

Expand Down
27 changes: 15 additions & 12 deletions src/ClientUI/SimulatedCustomers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,40 @@

using MassTransit;
using Messages;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

class SimulatedCustomers(IBus _bus) : BackgroundService
class SimulatedCustomers(IServiceScopeFactory factory) : BackgroundService
{
int _rate = 1;

public void WriteState(TextWriter output)
{
output.WriteLine($"Sending {rate} orders / second");
output.WriteLine($"Sending {_rate} orders / second");
}

public void IncreaseTraffic()
{
rate++;
_rate++;
}

public void DecreaseTraffic()
{
if (rate > 0)
if (_rate > 0)
{
rate--;
_rate--;
}
}

Task PlaceSingleOrder(CancellationToken cancellationToken)
async Task PlaceSingleOrder(CancellationToken cancellationToken)
{
await using var scope = factory.CreateAsyncScope();

var placeOrderCommand = new PlaceOrder { OrderId = Guid.NewGuid().ToString() };

Console.Write("!");
return _bus.Publish(placeOrderCommand, cancellationToken);

await scope.ServiceProvider.GetRequiredService<IPublishEndpoint>().Publish(placeOrderCommand, cancellationToken);
}

protected override async Task ExecuteAsync(CancellationToken cancellationToken = default)
Expand All @@ -42,7 +48,6 @@ await Task.WhenAll(
SendBatch(cancellationToken),
Task.Delay(1000, cancellationToken)
);

}
catch (TaskCanceledException)
{
Expand All @@ -53,8 +58,8 @@ await Task.WhenAll(

async Task SendBatch(CancellationToken cancellationToken)
{
var x = rate;
if (rate > 0)
int x = _rate;
if (_rate > 0)
{
var tasks = new List<Task>(x);

Expand All @@ -66,6 +71,4 @@ async Task SendBatch(CancellationToken cancellationToken)
await Task.WhenAll(tasks);
}
}

int rate = 1;
}
4 changes: 2 additions & 2 deletions src/Messages/OrderBilled.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Messages;

public class OrderBilled
public record OrderBilled
{
public string OrderId { get; set; }
public string OrderId { get; init; }
}
4 changes: 2 additions & 2 deletions src/Messages/OrderPlaced.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Messages;

public class OrderPlaced
public record OrderPlaced
{
public string OrderId { get; set; }
public string OrderId { get; init; }
}
4 changes: 2 additions & 2 deletions src/Messages/PlaceOrder.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Messages;

public class PlaceOrder
public record PlaceOrder
{
public string OrderId { get; set; }
public string OrderId { get; init; }
}
20 changes: 10 additions & 10 deletions src/Sales/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ public static IHostBuilder CreateHostBuilder(string[] args)
{
services.AddMassTransit(x =>
{
x.AddConsumers(Assembly.GetExecutingAssembly());

x.AddConfigureEndpointsCallback((name, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
{
rmq.SetQuorumQueue();
}
});

x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
Expand All @@ -29,16 +39,6 @@ public static IHostBuilder CreateHostBuilder(string[] args)

cfg.ConfigureEndpoints(context);
});

x.AddConsumers(Assembly.GetExecutingAssembly());

x.AddConfigureEndpointsCallback((name, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
{
rmq.SetQuorumQueue();
}
});
});

services.AddSingleton<SimulationEffects>();
Expand Down
20 changes: 10 additions & 10 deletions src/Shipping/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ public static IHostBuilder CreateHostBuilder(string[] args)
{
services.AddMassTransit(x =>
{
x.AddConsumers(Assembly.GetExecutingAssembly());

x.AddConfigureEndpointsCallback((name, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
{
rmq.SetQuorumQueue();
}
});

x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
Expand All @@ -29,16 +39,6 @@ public static IHostBuilder CreateHostBuilder(string[] args)

cfg.ConfigureEndpoints(context);
});

x.AddConsumers(Assembly.GetExecutingAssembly());

x.AddConfigureEndpointsCallback((name, cfg) =>
{
if (cfg is IRabbitMqReceiveEndpointConfigurator rmq)
{
rmq.SetQuorumQueue();
}
});
});

services.AddSingleton<SimulationEffects>();
Expand Down

0 comments on commit a584b3e

Please sign in to comment.