Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nonoptimal database query #1302

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public async Task<string> CreateEventSubscription(EventSubscription subscription
{
subscription.Id = Guid.NewGuid().ToString();
var persistable = subscription.ToPersistable();
var result = db.Set<PersistedSubscription>().Add(persistable);
_ = db.Set<PersistedSubscription>().Add(persistable);
await db.SaveChangesAsync(cancellationToken);
return subscription.Id;
}
Expand All @@ -44,7 +44,7 @@ public async Task<string> CreateNewWorkflow(WorkflowInstance workflow, Cancellat
{
workflow.Id = Guid.NewGuid().ToString();
var persistable = workflow.ToPersistable();
var result = db.Set<PersistedWorkflow>().Add(persistable);
_ = db.Set<PersistedWorkflow>().Add(persistable);
await db.SaveChangesAsync(cancellationToken);
return workflow.Id;
}
Expand Down Expand Up @@ -77,7 +77,7 @@ public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowSt
if (status.HasValue)
query = query.Where(x => x.Status == status.Value);

if (!String.IsNullOrEmpty(type))
if (!string.IsNullOrEmpty(type))
query = query.Where(x => x.WorkflowDefinitionId == type);

if (createdFrom.HasValue)
Expand All @@ -87,12 +87,8 @@ public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowSt
query = query.Where(x => x.CreateTime <= createdTo.Value);

var rawResult = await query.Skip(skip).Take(take).ToListAsync();
List<WorkflowInstance> result = new List<WorkflowInstance>();

foreach (var item in rawResult)
result.Add(item.ToWorkflowInstance());

return result;
return rawResult.Select(item => item.ToWorkflowInstance()).ToList();
}
}

Expand Down Expand Up @@ -147,12 +143,12 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
.AsTracking()
.FirstAsync(cancellationToken);

var persistable = workflow.ToPersistable(existingEntity);
_ = workflow.ToPersistable(existingEntity);
await db.SaveChangesAsync(cancellationToken);
}
}
public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)

public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscription> subscriptions, CancellationToken cancellationToken = default)
{
using (var db = ConstructDbContext())
{
Expand All @@ -165,7 +161,7 @@ public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscript
.AsTracking()
.FirstAsync(cancellationToken);

var workflowPersistable = workflow.ToPersistable(existingEntity);
_ = workflow.ToPersistable(existingEntity);

foreach (var subscription in subscriptions)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
using Microsoft.EntityFrameworkCore;
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Threading.Tasks;
using WorkflowCore.Persistence.EntityFramework.Models;
using WorkflowCore.Models;
using WorkflowCore.Persistence.EntityFramework.Interfaces;
using System.Threading;
using WorkflowCore.Interface;

namespace WorkflowCore.Persistence.EntityFramework.Services
{
public sealed class LargeDataOptimizedEntityFrameworkPersistenceProvider : EntityFrameworkPersistenceProvider, IPersistenceProvider
{
private readonly IWorkflowDbContextFactory _contextFactory;

public LargeDataOptimizedEntityFrameworkPersistenceProvider(IWorkflowDbContextFactory contextFactory, bool canCreateDb, bool canMigrateDb)
: base(contextFactory, canCreateDb, canMigrateDb)
{
_contextFactory = contextFactory;
}

/// <inheritdoc/>
public new async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take)
{
using (var db = _contextFactory.Build())
{
IQueryable<PersistedWorkflow> query = db.Set<PersistedWorkflow>()
.Include(wf => wf.ExecutionPointers)
.ThenInclude(ep => ep.ExtensionAttributes)
.Include(wf => wf.ExecutionPointers)
.AsSplitQuery()
.AsQueryable();

if (status.HasValue)
{
query = query.Where(x => x.Status == status.Value);
}

if (!string.IsNullOrEmpty(type))
{
query = query.Where(x => x.WorkflowDefinitionId == type);
}

if (createdFrom.HasValue)
{
query = query.Where(x => x.CreateTime >= createdFrom.Value);
}

if (createdTo.HasValue)
{
query = query.Where(x => x.CreateTime <= createdTo.Value);
}

var rawResult = await query.OrderBy(x => x.PersistenceId).Skip(skip).Take(take).ToListAsync();

var result = new List<WorkflowInstance>(rawResult.Count);

foreach (var item in rawResult)
{
result.Add(item.ToWorkflowInstance());
}

return result;
}
}

/// <inheritdoc/>
public new async Task<WorkflowInstance> GetWorkflowInstance(string id, CancellationToken cancellationToken = default)
{
using (var db = _contextFactory.Build())
{
var uid = new Guid(id);
var raw = await db.Set<PersistedWorkflow>()
.Include(wf => wf.ExecutionPointers)
.ThenInclude(ep => ep.ExtensionAttributes)
.Include(wf => wf.ExecutionPointers)
.AsSplitQuery()
.FirstAsync(x => x.InstanceId == uid, cancellationToken);

return raw?.ToWorkflowInstance();
}
}

/// <inheritdoc/>
public new async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids, CancellationToken cancellationToken = default)
{
if (ids == null)
{
return Array.Empty<WorkflowInstance>();
}

using (var db = _contextFactory.Build())
{
var uids = ids.Select(i => new Guid(i));
var raw = db.Set<PersistedWorkflow>()
.Include(wf => wf.ExecutionPointers)
.ThenInclude(ep => ep.ExtensionAttributes)
.Include(wf => wf.ExecutionPointers)
.AsSplitQuery()
.Where(x => uids.Contains(x.InstanceId));

var persistedWorkflows = await raw.ToListAsync(cancellationToken);

return persistedWorkflows.Select(i => i.ToWorkflowInstance());
}
}

/// <inheritdoc/>
public new async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
{
using (var db = _contextFactory.Build())
using (var transaction = await db.Database.BeginTransactionAsync(IsolationLevel.RepeatableRead, cancellationToken))
{
var uid = new Guid(workflow.Id);
var existingEntity = await db.Set<PersistedWorkflow>()
.Where(x => x.InstanceId == uid)
.Include(wf => wf.ExecutionPointers)
.ThenInclude(ep => ep.ExtensionAttributes)
.Include(wf => wf.ExecutionPointers)
.AsSplitQuery()
.AsTracking()
.FirstAsync(cancellationToken);

_ = workflow.ToPersistable(existingEntity);

await db.SaveChangesAsync(cancellationToken);

await transaction.CommitAsync(cancellationToken);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard2.1' ">
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="3.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="5.0.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="5.0.1" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,22 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static class ServiceCollectionExtensions
{
public static WorkflowOptions UsePostgreSQL(this WorkflowOptions options,
string connectionString, bool canCreateDB, bool canMigrateDB, string schemaName="wfc")
private static readonly Func<PostgresContextFactory, bool, bool, IPersistenceProvider> DefaultProviderFactory =
(sqlContextFactory, canCreateDb, canMigrateDb) =>
new EntityFrameworkPersistenceProvider(sqlContextFactory, canCreateDb, canMigrateDb);

private static readonly Func<PostgresContextFactory, bool, bool, IPersistenceProvider> OptimizedProviderFactory =
(sqlContextFactory, canCreateDb, canMigrateDb) =>
new LargeDataOptimizedEntityFrameworkPersistenceProvider(sqlContextFactory, canCreateDb, canMigrateDb);

public static WorkflowOptions UsePostgreSQL(this WorkflowOptions options, string connectionString, bool canCreateDB, bool canMigrateDB, string schemaName = "wfc") =>
options.UsePostgreSQL(connectionString, canCreateDB, canMigrateDB, false, schemaName);

public static WorkflowOptions UsePostgreSQL(this WorkflowOptions options, string connectionString, bool canCreateDB, bool canMigrateDB, bool largeDataOptimized, string schemaName="wfc")
{
options.UsePersistence(sp => new EntityFrameworkPersistenceProvider(new PostgresContextFactory(connectionString, schemaName), canCreateDB, canMigrateDB));
var providerFactory = largeDataOptimized ? OptimizedProviderFactory : DefaultProviderFactory;

options.UsePersistence(_ => providerFactory(new PostgresContextFactory(connectionString, schemaName), canCreateDB, canMigrateDB));
options.Services.AddTransient<IWorkflowPurger>(sp => new WorkflowPurger(new PostgresContextFactory(connectionString, schemaName)));
return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,24 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static class ServiceCollectionExtensions
{
public static WorkflowOptions UseSqlServer(this WorkflowOptions options, string connectionString, bool canCreateDB, bool canMigrateDB, Action<DbConnection> initAction = null)
private static readonly Func<SqlContextFactory, bool, bool, IPersistenceProvider> DefaultProviderFactory =
(sqlContextFactory, canCreateDb, canMigrateDb) =>
new EntityFrameworkPersistenceProvider(sqlContextFactory, canCreateDb, canMigrateDb);

private static readonly Func<SqlContextFactory, bool, bool, IPersistenceProvider> OptimizedProviderFactory =
(sqlContextFactory, canCreateDb, canMigrateDb) =>
new LargeDataOptimizedEntityFrameworkPersistenceProvider(sqlContextFactory, canCreateDb, canMigrateDb);

public static WorkflowOptions UseSqlServer(this WorkflowOptions options, string connectionString, bool canCreateDB, bool canMigrateDB, Action<DbConnection> initAction = null) =>
options.UseSqlServer(connectionString, canCreateDB, canMigrateDB, false, initAction);

public static WorkflowOptions UseSqlServer(this WorkflowOptions options, string connectionString, bool canCreateDB, bool canMigrateDB, bool largeDataOptimized, Action<DbConnection> initAction = null)
{
options.UsePersistence(sp => new EntityFrameworkPersistenceProvider(new SqlContextFactory(connectionString, initAction), canCreateDB, canMigrateDB));
var providerFactory = largeDataOptimized ? OptimizedProviderFactory : DefaultProviderFactory;

options.UsePersistence(_ => providerFactory(new SqlContextFactory(connectionString, initAction), canCreateDB, canMigrateDB));
options.Services.AddTransient<IWorkflowPurger>(sp => new WorkflowPurger(new SqlContextFactory(connectionString, initAction)));
return options;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace WorkflowCore.IntegrationTests.Scenarios
{
public class ForkScenario : BaseScenario<ForkScenario.OutcomeFork, Object>
public class ForkScenario<TSelf> : BaseScenario<ForkScenario<TSelf>.OutcomeFork, Object>
{
static int TaskATicker = 0;
static int TaskBTicker = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace WorkflowCore.IntegrationTests.Scenarios
{
public class IfScenario : WorkflowTest<IfScenario.IfWorkflow, IfScenario.MyDataClass>
public class IfScenario<TSelf> : WorkflowTest<IfScenario<TSelf>.IfWorkflow, IfScenario<TSelf>.MyDataClass>
{
internal static int Step1Ticker = 0;
internal static int Step2Ticker = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace WorkflowCore.IntegrationTests.Scenarios
{
public class UserScenario : WorkflowTest<UserScenario.HumanWorkflow, Object>
public class UserScenario<TSelf> : WorkflowTest<UserScenario<TSelf>.HumanWorkflow, Object>
{
internal static int ApproveStepTicker = 0;
internal static int DisapproveStepTicker = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace WorkflowCore.IntegrationTests.Scenarios
{
public class WhenScenario : WorkflowTest<WhenScenario.WhenWorkflow, WhenScenario.MyDataClass>
public class WhenScenario<TSelf> : WorkflowTest<WhenScenario<TSelf>.WhenWorkflow, WhenScenario<TSelf>.MyDataClass>
{
internal static int Case1Ticker = 0;
internal static int Case2Ticker = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace WorkflowCore.IntegrationTests.Scenarios
{
public class WhileScenario : WorkflowTest<WhileScenario.WhileWorkflow, WhileScenario.MyDataClass>
public class WhileScenario<TSelf> : WorkflowTest<WhileScenario<TSelf>.WhileWorkflow, WhileScenario<TSelf>.MyDataClass>
{
internal static int Step1Ticker = 0;
internal static int Step2Ticker = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
namespace WorkflowCore.Tests.DynamoDB.Scenarios
{
[Collection("DynamoDb collection")]
public class DynamoIfScenario : IfScenario
{
public class DynamoIfScenario : IfScenario<DynamoIfScenario>
{
protected override void ConfigureServices(IServiceCollection services)
{
var cfg = new AmazonDynamoDBConfig {ServiceURL = DynamoDbDockerSetup.ConnectionString};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
namespace WorkflowCore.Tests.DynamoDB.Scenarios
{
[Collection("DynamoDb collection")]
public class DynamoWhileScenario : WhileScenario
{
public class DynamoWhileScenario : WhileScenario<DynamoWhileScenario>
{
protected override void ConfigureServices(IServiceCollection services)
{
var cfg = new AmazonDynamoDBConfig {ServiceURL = DynamoDbDockerSetup.ConnectionString};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
namespace WorkflowCore.Tests.MongoDB.Scenarios
{
[Collection("Mongo collection")]
public class MongoForkScenario : ForkScenario
{
public class MongoForkScenario : ForkScenario<MongoForkScenario>
{
protected override void Configure(IServiceCollection services)
{
services.AddWorkflow(x => x.UseMongoDB(MongoDockerSetup.ConnectionString, nameof(MongoForkScenario)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
namespace WorkflowCore.Tests.MongoDB.Scenarios
{
[Collection("Mongo collection")]
public class MongoIfScenario : IfScenario
{
public class MongoIfScenario : IfScenario<MongoIfScenario>
{
protected override void ConfigureServices(IServiceCollection services)
{
services.AddWorkflow(x => x.UseMongoDB(MongoDockerSetup.ConnectionString, nameof(MongoIfScenario)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace WorkflowCore.Tests.MongoDB.Scenarios
{
[Collection("Mongo collection")]
public class MongoUserScenario : UserScenario
public class MongoUserScenario : UserScenario<MongoUserScenario>
{
protected override void ConfigureServices(IServiceCollection services)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace WorkflowCore.Tests.MongoDB.Scenarios
{
[Collection("Mongo collection")]
public class MongoWhenScenario : WhenScenario
public class MongoWhenScenario : WhenScenario<MongoWhenScenario>
{
protected override void ConfigureServices(IServiceCollection services)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace WorkflowCore.Tests.MongoDB.Scenarios
{
[Collection("Mongo collection")]
public class MongoWhileScenario : WhileScenario
public class MongoWhileScenario : WhileScenario<MongoWhileScenario>
{
protected override void ConfigureServices(IServiceCollection services)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
namespace WorkflowCore.Tests.MySQL.Scenarios
{
[Collection("Mysql collection")]
public class MysqlForkScenario : ForkScenario
{
public class MysqlForkScenario : ForkScenario<MysqlForkScenario>
{
protected override void Configure(IServiceCollection services)
{
services.AddWorkflow(x => x.UseMySQL(MysqlDockerSetup.ScenarioConnectionString, true, true));
Expand Down
Loading