Skip to content

Commit

Permalink
feat:test bulkevent
Browse files Browse the repository at this point in the history
  • Loading branch information
duiapro committed May 7, 2024
1 parent 449e060 commit ebd7617
Show file tree
Hide file tree
Showing 44 changed files with 579 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ public IConcurrencyStampProvider? ConcurrencyStampProvider
}
}

protected virtual Dictionary<EntityState, Type> ChangeEventTypes
{
get
{
var eventTypes = new Dictionary<EntityState, Type>
{
{EntityState.Added,typeof(EntityCreatedDomainEvent<>)},
{EntityState.Modified,typeof(EntityModifiedDomainEvent<>)},
{EntityState.Deleted,typeof(EntityDeletedDomainEvent<>)}
};
return eventTypes;
}
}

private IMultiEnvironmentContext? EnvironmentContext => Options?.ServiceProvider?.GetService<IMultiEnvironmentContext>();

protected IMultiTenantContext? TenantContext => Options?.ServiceProvider?.GetService<IMultiTenantContext>();
Expand Down Expand Up @@ -280,21 +294,14 @@ protected virtual Task PublishEntityChangedEventAsync(ChangeTracker changeTracke
if (!domainEntities.Any())
return Task.CompletedTask;

var eventTypes = new Dictionary<EntityState, Type>
{
{EntityState.Added,typeof(EntityCreatedDomainEvent<>)},
{EntityState.Modified,typeof(EntityModifiedDomainEvent<>)},
{EntityState.Deleted,typeof(EntityDeletedDomainEvent<>)}
};

domainEntities.ForEach(item =>
{
var entityType = item.Entity.GetType();
var eventType = item.State switch
{
EntityState.Added => eventTypes[EntityState.Added].MakeGenericType(entityType),
EntityState.Modified => eventTypes[EntityState.Modified].MakeGenericType(entityType),
EntityState.Deleted => eventTypes[EntityState.Deleted].MakeGenericType(entityType),
EntityState.Added => ChangeEventTypes[EntityState.Added].MakeGenericType(entityType),
EntityState.Modified => ChangeEventTypes[EntityState.Modified].MakeGenericType(entityType),
EntityState.Deleted => ChangeEventTypes[EntityState.Deleted].MakeGenericType(entityType),
_ => null,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public async Task BulkPublishAsync<T>(
string topicName, List<(T @event, IntegrationEventExpand? eventMessageExpand)> @events,
CancellationToken stoppingToken = default)
{

_logger?.LogDebug("-----BulkPublishEvent Integration event publishing is in progress from {AppId} with DaprAppId as '{DaprAppId}'", _appId,
_daprAppId);

Expand All @@ -81,7 +80,7 @@ public async Task BulkPublishAsync<T>(

MasaArgumentException.ThrowIfNullOrWhiteSpace(_daprAppId);

var masaCloudEvents = new List<MasaCloudEvent<IntegrationEventMessage>>();
var waitMasaCloudEvents = new List<MasaCloudEvent<IntegrationEventMessage>>();
var waitEvents = new List<T>();

@events.ForEach(item =>
Expand All @@ -94,27 +93,27 @@ public async Task BulkPublishAsync<T>(
Source = new Uri(_daprAppId, UriKind.RelativeOrAbsolute)
};
masaCloudEvents.Add(masaCloudEvent);
waitMasaCloudEvents.Add(masaCloudEvent);
}
else
{
waitEvents.Add(item.@event);
}
});

if (masaCloudEvents.Any())
if (waitMasaCloudEvents.Any())
{
await DaprClient.PublishEventAsync(_pubSubName, topicName, masaCloudEvents, stoppingToken);
await DaprClient.BulkPublishEventAsync(_pubSubName, topicName, waitMasaCloudEvents, cancellationToken: stoppingToken);
_logger?.LogDebug(
"-----BulkPublishEvent Publishing integration event from {AppId} succeeded with DaprAppId is {DaprAppId} and Event is {Event}",
"-----BulkPublishEvent MasaCloudEvent Publishing integration event from {AppId} succeeded with DaprAppId is {DaprAppId} and Event is {Event}",
_appId,
_daprAppId,
masaCloudEvents);
waitMasaCloudEvents);
}

if (waitEvents.Any())
{
await DaprClient.BulkPublishEventAsync(_pubSubName, topicName, @events.ToList(), cancellationToken: stoppingToken);
await DaprClient.BulkPublishEventAsync(_pubSubName, topicName, waitEvents, cancellationToken: stoppingToken);
_logger?.LogDebug(
"-----BulkPublishEvent Publishing integration event from {AppId} succeeded with DaprAppId is {DaprAppId} and Event is {Event}",
_appId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public void BulkAddJobs(List<IntegrationEventLogItem> items)
public void RemoveJobs(Guid eventId)
=> _retryEventLogs.TryRemove(eventId, out _);

public void BulkRemoveJobs(IEnumerable<Guid> eventIds)
=> eventIds.ToList().ForEach(eventId => _retryEventLogs.TryRemove(eventId, out _));

public void RetryJobs(Guid eventId)
{
if (_retryEventLogs.TryGetValue(eventId, out IntegrationEventLogItem? item))
Expand All @@ -36,9 +39,34 @@ public void RetryJobs(Guid eventId)
}
}

public void BulkRetryJobs(IEnumerable<Guid> eventIds)
{
foreach (var eventId in eventIds)
{
if (_retryEventLogs.TryGetValue(eventId, out IntegrationEventLogItem? item))
{
item.Retry();
}
}
}

public bool IsExist(Guid eventId)
=> _retryEventLogs.ContainsKey(eventId);

public List<Guid> IsExist(IEnumerable<Guid> eventIds)
{
var notEventIds = new List<Guid>();
foreach (var eventId in eventIds)
{
if (_retryEventLogs.ContainsKey(eventId))
{
notEventIds.Add(eventId);
}
}

return notEventIds;
}

public void Delete(int maxRetryTimes)
{
var eventLogItems = _retryEventLogs.Values.Where(log => log.RetryCount >= maxRetryTimes - 1).ToList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ public class IntegrationEventOptions : IIntegrationEventOptions

public Assembly[] Assemblies { get; }

/// <summary>
/// Send in batches according to Topic
/// </summary>
public bool BatchesGroupSendOrRetry { get; set; } = false;

private int _localRetryTimes = 3;

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ public RetryByDataProcessor(

protected override async Task ExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken)
{
if (_options.Value.BatchesGroupSendOrRetry)
{
await this.BulkExecuteAsync(serviceProvider, stoppingToken);
return;
}

var unitOfWork = serviceProvider.GetService<IUnitOfWork>();
if (unitOfWork != null)
unitOfWork.UseTransaction = false;
Expand All @@ -36,7 +42,7 @@ await eventLogService.RetrieveEventLogsFailedToPublishAsync(
_options.Value.MinimumRetryInterval,
stoppingToken);

if(!retrieveEventLogs.Any())
if (!retrieveEventLogs.Any())
return;

var publisher = serviceProvider.GetRequiredService<IPublisher>();
Expand All @@ -54,7 +60,7 @@ await eventLogService.RetrieveEventLogsFailedToPublishAsync(
eventLog,
eventLog.Topic);

await publisher.PublishAsync(eventLog.Topic, eventLog.Event, eventLog.EventExpand, stoppingToken);
await publisher.PublishAsync(eventLog.Topic, eventLog.Event, eventLog.EventExpand, stoppingToken);

LocalQueueProcessor.Default.RemoveJobs(eventLog.EventId);

Expand All @@ -73,4 +79,71 @@ await eventLogService.RetrieveEventLogsFailedToPublishAsync(
}
}
}

protected async Task BulkExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken)
{
var unitOfWork = serviceProvider.GetService<IUnitOfWork>();
if (unitOfWork != null)
unitOfWork.UseTransaction = false;
var eventLogService = serviceProvider.GetRequiredService<IIntegrationEventLogService>();

var retrieveEventLogs =
await eventLogService.RetrieveEventLogsFailedToPublishAsync(
_options.Value.RetryBatchSize,
_options.Value.MaxRetryTimes,
_options.Value.MinimumRetryInterval,
stoppingToken);

if (!retrieveEventLogs.Any())
return;

var publisher = serviceProvider.GetRequiredService<IPublisher>();
var retrieveEventLogsGroupByTopic = retrieveEventLogs.GroupBy(eventLog => eventLog.Topic)
.Select(eventLog => new
{
TopicName = eventLog.Key,
Events = eventLog.Select(log => new { log.Event, log.EventExpand, log.EventId }).ToList(),
}).ToList();
var allEventIds = retrieveEventLogsGroupByTopic.
SelectMany(eventLog => eventLog.Events.Select(item => item.EventId));
var removeEventIds = LocalQueueProcessor.Default.IsExist(allEventIds);

foreach (var eventLog in retrieveEventLogsGroupByTopic)
{
eventLog.Events.RemoveAll(item => removeEventIds.Contains(item.EventId));

var eventIds = eventLog.Events.Select(item => item.EventId);
var events = eventLog.Events.Select(item => (item.Event, item.EventExpand)).ToList();

try
{
if (!eventIds.Any())
continue; // The local queue is retrying, no need to retry

await eventLogService.BulkMarkEventAsInProgressAsync(eventIds, _options.Value.MinimumRetryInterval, stoppingToken);

_logger?.LogDebug("Publishing integration event {Event} to {TopicName}",
eventLog,
eventLog.TopicName);

await publisher.BulkPublishAsync(eventLog.TopicName, events, stoppingToken);

LocalQueueProcessor.Default.BulkRemoveJobs(eventIds);

await eventLogService.BulkMarkEventAsPublishedAsync(eventIds, stoppingToken);
}
catch (UserFriendlyException)
{
//Update state due to multitasking contention, no processing required
}
catch (Exception ex)
{
_logger?.LogError(ex,
"Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})",
eventIds, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog);
await eventLogService.BulkMarkEventAsFailedAsync(eventIds, stoppingToken);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ public RetryByLocalQueueProcessor(

protected override async Task ExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken)
{
if (_options.Value.BatchesGroupSendOrRetry)
{
await this.BulkExecuteAsync(serviceProvider, stoppingToken);
return;
}

var unitOfWork = serviceProvider.GetService<IUnitOfWork>();
if (unitOfWork != null)
unitOfWork.UseTransaction = false;
Expand All @@ -34,7 +40,7 @@ protected override async Task ExecuteAsync(IServiceProvider serviceProvider, Can
LocalQueueProcessor.Default.RetrieveEventLogsFailedToPublishAsync(_options.Value.LocalRetryTimes,
_options.Value.RetryBatchSize);

if(!retrieveEventLogs.Any())
if (!retrieveEventLogs.Any())
return;

var publisher = serviceProvider.GetRequiredService<IPublisher>();
Expand All @@ -52,7 +58,7 @@ protected override async Task ExecuteAsync(IServiceProvider serviceProvider, Can
eventLog,
eventLog.Topic);

await publisher.PublishAsync(eventLog.Topic, eventLog.Event, eventLog.EventExpand, stoppingToken);
await publisher.PublishAsync(eventLog.Topic, eventLog.Event, eventLog.EventExpand, stoppingToken);

await eventLogService.MarkEventAsPublishedAsync(eventLog.EventId, stoppingToken);

Expand All @@ -72,4 +78,63 @@ protected override async Task ExecuteAsync(IServiceProvider serviceProvider, Can
}
}
}

protected async Task BulkExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken)
{
var unitOfWork = serviceProvider.GetService<IUnitOfWork>();
if (unitOfWork != null)
unitOfWork.UseTransaction = false;

var eventLogService = serviceProvider.GetRequiredService<IIntegrationEventLogService>();

var retrieveEventLogs =
LocalQueueProcessor.Default.RetrieveEventLogsFailedToPublishAsync(_options.Value.LocalRetryTimes,
_options.Value.RetryBatchSize);

if (!retrieveEventLogs.Any())
return;

var publisher = serviceProvider.GetRequiredService<IPublisher>();
var retrieveEventLogsGroupByTopic = retrieveEventLogs.GroupBy(eventLog => eventLog.Topic)
.Select(eventLog => new
{
TopicName = eventLog.Key,
Events = eventLog.Select(log => new { log.Event, log.EventExpand, log.EventId }).ToList(),
}).ToList();

foreach (var eventLog in retrieveEventLogsGroupByTopic)
{
var eventIds = eventLog.Events.Select(item => item.EventId);
var events = eventLog.Events.Select(item => (item.Event, item.EventExpand)).ToList();

try
{
LocalQueueProcessor.Default.BulkRemoveJobs(eventIds);

await eventLogService.BulkMarkEventAsInProgressAsync(eventIds, _options.Value.MinimumRetryInterval, stoppingToken);

_logger?.LogDebug("Publishing integration event {Event} to {TopicName}",
eventLog,
eventLog.TopicName);

await publisher.BulkPublishAsync(eventLog.TopicName, events, stoppingToken);

await eventLogService.BulkMarkEventAsPublishedAsync(eventIds, stoppingToken);

LocalQueueProcessor.Default.BulkRemoveJobs(eventIds);
}
catch (UserFriendlyException)
{
//Update state due to multitasking contention
LocalQueueProcessor.Default.BulkRemoveJobs(eventIds);
}
catch (Exception ex)
{
_logger?.LogError(ex,
"Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})",
eventIds, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog);
await eventLogService.BulkMarkEventAsFailedAsync(eventIds, stoppingToken);
}
}
}
}
Loading

0 comments on commit ebd7617

Please sign in to comment.