Skip to content

Commit

Permalink
Merge pull request #43 from by-pinja/feature/37-additional-topic-sett…
Browse files Browse the repository at this point in the history
…ings

Feature/37 additional topic settings
  • Loading branch information
toka-p authored Jan 9, 2023
2 parents 0a7f811 + 773ed8b commit b8542a6
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,10 @@ public interface ITopicItem
{
string TopicName { get; }
}

public interface IConfigurableTopicItem : ITopicItem
{
int PrefetchCount { get; }
string ReceiveMode { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,26 @@ await listener
.FirstAsync();
}

[Fact]
public void WhenTopicTypeContainsExtraSettings_SubscriptionClientUsesThoseSettings()
{
var settings = TestSettings.TopicSettingsOptions();
var subscriber = new AzureTopicSubscriber(settings, new AzureBusTopicManagement(settings), Substitute.For<ILogger<AzureTopicSubscriber>>());
var clientWithoutMode = subscriber.Client<TestMessageForTopic>();
var clientWithMode = subscriber.Client<ConfigurableTestMessageForTopic>();

Assert.Equal(ReceiveMode.PeekLock, clientWithoutMode.ReceiveMode);
Assert.Equal(ReceiveMode.ReceiveAndDelete, clientWithMode.ReceiveMode);
Assert.Equal(0, clientWithoutMode.PrefetchCount);
Assert.Equal(100, clientWithMode.PrefetchCount);
}

[Fact]
public async void WhenSettingFalse_MessagesDoesNotContainArrivalTimeStamp()
{
var settings = TestSettings.TopicSettingsOptions();
settings.Value.AddArrival = false;

var subscriber = new AzureTopicSubscriber(settings, new AzureBusTopicManagement(settings), Substitute.For<ILogger<AzureTopicSubscriber>>());
var publisher = new AzureTopicPublisher(settings, new AzureBusTopicManagement(settings), Substitute.For<ILogger<AzureTopicPublisher>>());

Expand All @@ -138,15 +152,16 @@ await publisher.SendAsync(new TestMessageForTopic
.Where(x => x.ExampleId == id)
.Timeout(TimeSpan.FromSeconds(10))
.FirstAsync();

Assert.Null(receivedMessage.RxMqArrival);
}

[Fact]
public async void WhenSettingTrue_MessagesContainArrivalTimeStamp()
{
var settings = TestSettings.TopicSettingsOptions();
settings.Value.AddArrival = true;

var subscriber = new AzureTopicSubscriber(settings, new AzureBusTopicManagement(settings), Substitute.For<ILogger<AzureTopicSubscriber>>());
var publisher = new AzureTopicPublisher(settings, new AzureBusTopicManagement(settings), Substitute.For<ILogger<AzureTopicPublisher>>());

Expand All @@ -162,7 +177,7 @@ await publisher.SendAsync(new TestMessageForTopic
.Where(x => x.ExampleId == id)
.Timeout(TimeSpan.FromSeconds(10))
.FirstAsync();

var yesterday = DateTimeOffset.UtcNow.AddDays(-1).ToUnixTimeMilliseconds();
Assert.NotNull(receivedMessage.RxMqArrival);
Assert.True(receivedMessage.RxMqArrival > yesterday);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Protacon.RxMq.Abstractions.DefaultMessageRouting;

namespace Protacon.RxMq.AzureServiceBus.Tests.Messages
{
public class ConfigurableTestMessageForTopic: IConfigurableTopicItem
{
public string TopicName => "v1.ctesttopic";
public int PrefetchCount => 100;
public string ReceiveMode => "ReceiveAndDelete";
}
}
2 changes: 2 additions & 0 deletions Protacon.RxMq.AzureServiceBus/Queue/AzureBusSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ internal Binding(AzureBusQueueSettings settings, ILogger<AzureQueueSubscriber> l
queueManagement.CreateQueIfMissing(queueName, typeof(T));

var queueClient = new QueueClient(settings.ConnectionString, queueName);
queueClient.PrefetchCount = settings.DefaultPrefetchCount;

_excludeQueuesFromLogging = new LoggingConfiguration().ExcludeQueuesFromLogging();

queueClient.RegisterMessageHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace Protacon.RxMq.AzureServiceBus.Queue
{
public class AzureBusQueueSettings: AzureMqSettingsBase
{
public int DefaultPrefetchCount { get; set; } = 0;
public Func<Type, string> QueueNameBuilderForSubscriber { get; set; } = type =>
{
var instance = Activator.CreateInstance(type);
Expand Down
21 changes: 20 additions & 1 deletion Protacon.RxMq.AzureServiceBus/Topic/AzureTopicMqSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ namespace Protacon.RxMq.AzureServiceBus.Topic
public class AzureBusTopicSettings : AzureMqSettingsBase
{
public string TopicSubscriberId { get; set; } = Environment.MachineName;
public int DefaultPrefetchCount { get; set; } = 0;
public string DefaultReceiveMode { get; set; } = "PeekLock";
public bool AddArrival { get; set; }

public Func<Type, string> TopicNameBuilder { get; set; } = type =>
Expand All @@ -19,7 +21,24 @@ public class AzureBusTopicSettings : AzureMqSettingsBase
return t.TopicName;
}

throw new InvalidOperationException($"Default implementation of queue name builder expects used objects to extend '{nameof(ITopicItem)}'");
throw new InvalidOperationException($"Default implementation of topic name builder expects used objects to extend '{nameof(ITopicItem)}'");
};

public Func<Type, Tuple<string, int?, string>> TopicConfigBuilder { get; set; } = type =>
{
var instance = Activator.CreateInstance(type);

if (instance is IConfigurableTopicItem cti)
{
return new Tuple<string, int?, string>(cti.TopicName, cti.PrefetchCount, cti.ReceiveMode);
}

if (instance is ITopicItem t)
{
return new Tuple<string, int?, string>(t.TopicName, null, "");
}

throw new InvalidOperationException($"Default implementation of topic configuration builder expects used objects to extend '{nameof(ITopicItem)}' or '{nameof(IConfigurableTopicItem)}'");
};

public Action<Microsoft.Azure.Management.ServiceBus.Fluent.Topic.Definition.IBlank, Type> AzureTopicBuilder { get; set; } = (create, messageType) =>
Expand Down
96 changes: 81 additions & 15 deletions Protacon.RxMq.AzureServiceBus/Topic/AzureTopicSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,44 @@ namespace Protacon.RxMq.AzureServiceBus.Topic
{
public class AzureTopicSubscriber : IMqTopicSubscriber
{
internal class SubscriptionClientOptions {
public string TopicName { get; set; }
public string ConnectionString { get; set; }
public string SubscriptionName { get; set; }
public int PrefetchCount { get; set; }
public ReceiveMode ReceiveMode { get; set; }
}

private readonly AzureBusTopicSettings _settings;
private readonly AzureBusTopicManagement _queueManagement;
private readonly AzureBusTopicManagement _topicManagement;
private readonly ILogger<AzureTopicSubscriber> _logging;
private readonly ConcurrentDictionary<Type, IDisposable> _bindings = new ConcurrentDictionary<Type, IDisposable>();

private readonly BlockingCollection<IBinding> _errorActions = new BlockingCollection<IBinding>(1);
private readonly CancellationTokenSource _source;

private class Binding<T> : IDisposable, IBinding where T : new()
{
private readonly IList<string> _excludeTopicsFromLogging;

internal Binding(AzureBusTopicSettings settings, ILogger<AzureTopicSubscriber> logging,
AzureBusTopicManagement queueManagement, BlockingCollection<IBinding> errorActions)
AzureBusTopicManagement topicManagement, BlockingCollection<IBinding> errorActions)
{
_excludeTopicsFromLogging = new LoggingConfiguration().ExcludeTopicsFromLogging();
var topicName = settings.TopicNameBuilder(typeof(T));
var (topicName, prefetchCount, receiveModeCode) = settings.TopicConfigBuilder(typeof(T));
var subscriptionName = $"{topicName}.{settings.TopicSubscriberId}";

queueManagement.CreateSubscriptionIfMissing(topicName, subscriptionName, typeof(T));
topicManagement.CreateSubscriptionIfMissing(topicName, subscriptionName, typeof(T));

var subscriptionClient = new SubscriptionClient(settings.ConnectionString, topicName, subscriptionName);
var subscriptionClient = CreateClient(new SubscriptionClientOptions
{
ConnectionString = settings.ConnectionString,
TopicName = topicName,
SubscriptionName = subscriptionName,
PrefetchCount = prefetchCount ?? settings.DefaultPrefetchCount,
ReceiveMode = ParseReceiveMode(receiveModeCode, settings.DefaultReceiveMode, logging)
});

UpdateRules(subscriptionClient, settings);

subscriptionClient.RegisterMessageHandler(
Expand Down Expand Up @@ -69,17 +85,30 @@ internal Binding(AzureBusTopicSettings settings, ILogger<AzureTopicSubscriber> l
errorActions.Add(this);
}
}));

Client = subscriptionClient;
logging.LogInformation("Created SubscriptionClient for '{topicName}' with prefetchCount '{prefetchCount}' and receiveMode '{receiveMode}'",
topicName, subscriptionClient.PrefetchCount, subscriptionClient.ReceiveMode);
}

public void ReCreate(AzureBusTopicSettings settings, AzureBusTopicManagement queueManagement)
public void ReCreate(AzureBusTopicSettings settings, AzureBusTopicManagement topicManagement)
{
var topicName = settings.TopicNameBuilder(typeof(T));
var (topicName, prefetchCount, receiveModeCode) = settings.TopicConfigBuilder(typeof(T));
var subscriptionName = $"{topicName}.{settings.TopicSubscriberId}";

queueManagement.CreateSubscriptionIfMissing(topicName, subscriptionName, typeof(T));
topicManagement.CreateSubscriptionIfMissing(topicName, subscriptionName, typeof(T));

var subscriptionClient = new SubscriptionClient(settings.ConnectionString, topicName, subscriptionName);
var subscriptionClient = CreateClient(new SubscriptionClientOptions
{
ConnectionString = settings.ConnectionString,
TopicName = topicName,
SubscriptionName = subscriptionName,
PrefetchCount = prefetchCount ?? settings.DefaultPrefetchCount,
ReceiveMode = ParseReceiveMode(receiveModeCode, settings.DefaultReceiveMode, null)
});

UpdateRules(subscriptionClient, settings);
Client = subscriptionClient;
}

private void UpdateRules(SubscriptionClient subscriptionClient, AzureBusTopicSettings settings)
Expand Down Expand Up @@ -108,18 +137,45 @@ private static T AsObject(string body, long arrival, bool addArrival = false)
return parsed["data"].ToObject<T>();
}

private static SubscriptionClient CreateClient(SubscriptionClientOptions options)
{
var client = new SubscriptionClient(options.ConnectionString, options.TopicName, options.SubscriptionName, options.ReceiveMode);
client.PrefetchCount = options.PrefetchCount;

return client;
}

private static ReceiveMode ParseReceiveMode(string topicReceiveModeCode, string defaultReceiveModeCode, ILogger<AzureTopicSubscriber> logging = null)
{
if (Enum.TryParse<ReceiveMode>(topicReceiveModeCode, out var topicMode))
{
return topicMode;
}
logging?.LogDebug("Invalid receive mode '{topicReceiveModeCode}' provided from topic. Trying settings.", topicReceiveModeCode);

if (Enum.TryParse<ReceiveMode>(defaultReceiveModeCode, out var defaultMode))
{
return defaultMode;
}
logging?.LogWarning("Invalid receive mode '{defaultReceiveModeCode}' provided from settings. Defaulting to 'PeekLock'", defaultReceiveModeCode);

return ReceiveMode.PeekLock;
}

public ReplaySubject<T> Subject { get; } = new ReplaySubject<T>(TimeSpan.FromSeconds(30));
public SubscriptionClient Client { get; set; }

public void Dispose()
{
Subject?.Dispose();
Client = null;
}
}

public AzureTopicSubscriber(IOptions<AzureBusTopicSettings> settings, AzureBusTopicManagement queueManagement, ILogger<AzureTopicSubscriber> logging)
public AzureTopicSubscriber(IOptions<AzureBusTopicSettings> settings, AzureBusTopicManagement topicManagement, ILogger<AzureTopicSubscriber> logging)
{
_settings = settings.Value;
_queueManagement = queueManagement;
_topicManagement = topicManagement;
_logging = logging;

_source = new CancellationTokenSource();
Expand All @@ -132,7 +188,7 @@ public AzureTopicSubscriber(IOptions<AzureBusTopicSettings> settings, AzureBusTo
var action = _errorActions.Take(_source.Token);
try
{
action.ReCreate(_settings, _queueManagement);
action.ReCreate(_settings, _topicManagement);
}
catch (Exception exception)
{
Expand All @@ -155,11 +211,21 @@ public AzureTopicSubscriber(IOptions<AzureBusTopicSettings> settings, AzureBusTo
{
if (!_bindings.ContainsKey(typeof(T)))
{
_bindings.TryAdd(typeof(T), new Binding<T>(_settings, _logging, _queueManagement, _errorActions));
_bindings.TryAdd(typeof(T), new Binding<T>(_settings, _logging, _topicManagement, _errorActions));
}

return ((Binding<T>)_bindings[typeof(T)]).Subject;
}

public SubscriptionClient Client<T>() where T : new()
{
if (!_bindings.ContainsKey(typeof(T)))
{
_bindings.TryAdd(typeof(T), new Binding<T>(_settings, _logging, _topicManagement, _errorActions));
}

return ((Binding<T>)_bindings[typeof(T)]).Client;
}

public void Dispose()
{
Expand All @@ -171,7 +237,7 @@ public void Dispose()

private interface IBinding
{
void ReCreate(AzureBusTopicSettings settings, AzureBusTopicManagement queueManagement);
void ReCreate(AzureBusTopicSettings settings, AzureBusTopicManagement topicManagement);
}
}
}
4 changes: 3 additions & 1 deletion Protacon.RxMq.ConsoleExample/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
"ConnectionString": "",
"AzureSpAppId": "",
"AzureSpPassword": "",
"AzureNamespace": ""
"AzureNamespace": "",
"DefaultPrefetchCount": 0,
"DefaultReceiveMode": "PeekLock"
}
}

0 comments on commit b8542a6

Please sign in to comment.