Skip to content

Commit

Permalink
Amended person sync to create events if there are any QTLS related in…
Browse files Browse the repository at this point in the history
…duction changes (#1835)

### Context

Teachers with QTLS can potentially have an induction status without any
dfeta_induction records associated with a Contact.

We are currently only live syncing any induction status changes for
changes to the dfeta_induction table.

We need to also capture any changes to the induction status on the
Contact table when the QTLS date changes alone as part of the Person
live sync.

### Changes proposed in this pull request

Amend the SyncPersons code in `TrsDataSyncHelper` to also generate any
`DqtContactInductionStatusChangedEvent` events if there are any changes
to the `Contact.dfeta_InductionStatus` field but only where the
`Contact.dfeta_qtlsdate` field has changed too (i.e. the induction
status has changed as a result of the QTLS date having changed).

### Checklist

-   [ ] Attach to Trello card
-   [ ] Rebased master
-   [ ] Cleaned commit history
-   [ ] Tested by running locally
-   [ ] Run DQT integration tests locally (if appropriate)
  • Loading branch information
hortha authored Jan 27, 2025
1 parent 314de62 commit 9556ea3
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,25 +243,42 @@ public async Task<IReadOnlyCollection<Guid>> SyncPersonsAsync(
bool dryRun,
CancellationToken cancellationToken = default)
{
// For the moment we are just making sure we have the audit history in TRS - we will amend to generate events as part of the contact migration work
if (syncAudit)
{
await SyncAuditAsync(Contact.EntityLogicalName, entities.Select(q => q.ContactId!.Value), skipIfExists: false, cancellationToken);
}

// We're syncing all contacts for now.
// Keep this in sync with the filter in the SyncAllContactsFromCrmJob job.
IEnumerable<Contact> toSync = entities;
var auditDetails = await GetAuditRecordsFromAuditRepositoryAsync(Contact.EntityLogicalName, Contact.PrimaryIdAttribute, entities.Select(q => q.ContactId!.Value), cancellationToken);
return await SyncPersonsAsync(entities, auditDetails, ignoreInvalid, dryRun, cancellationToken);
}

public async Task<IReadOnlyCollection<Guid>> SyncPersonsAsync(
IReadOnlyCollection<Contact> entities,
IReadOnlyDictionary<Guid, AuditDetailCollection> auditDetails,
bool ignoreInvalid,
bool dryRun,
CancellationToken cancellationToken = default)
{
var (persons, events) = MapPersonsAndAudits(entities, auditDetails, ignoreInvalid);
return await SyncPersonsAsync(persons, events, ignoreInvalid, dryRun, cancellationToken);
}

public async Task<IReadOnlyCollection<Guid>> SyncPersonsAsync(
IReadOnlyCollection<Person> persons,
IReadOnlyCollection<EventBase> events,
bool ignoreInvalid,
bool dryRun,
CancellationToken cancellationToken = default)
{
var toSync = persons.ToList();

if (ignoreInvalid)
{
// Some bad data in the build environment has a TRN that's longer than 7 digits
toSync = toSync.Where(e => string.IsNullOrEmpty(e.dfeta_TRN) || e.dfeta_TRN.Length == 7);
toSync = toSync.Where(p => string.IsNullOrEmpty(p.Trn) || p.Trn.Length == 7).ToList();
}

var people = MapPersons(toSync);

if (people.Count == 0)
if (!toSync.Any())
{
return [];
}
Expand All @@ -280,7 +297,7 @@ public async Task<IReadOnlyCollection<Guid>> SyncPersonsAsync(

using var writer = await connection.BeginBinaryImportAsync(modelTypeSyncInfo.CopyStatement!, cancellationToken);

foreach (var person in people)
foreach (var person in toSync)
{
writer.StartRow();
modelTypeSyncInfo.WriteRecord!(writer, person);
Expand Down Expand Up @@ -313,28 +330,31 @@ public async Task<IReadOnlyCollection<Guid>> SyncPersonsAsync(
throw;
}

var entitiesExceptFailedOne = entities.Where(e => e.dfeta_TRN != trn).ToArray();
var personsExceptFailedOne = persons.Where(e => e.Trn != trn).ToArray();
var eventsExceptFailedOne = events.Where(e => e is not IEventWithPersonId || ((IEventWithPersonId)e).PersonId != personsExceptFailedOne[0].PersonId).ToArray();

// Be extra sure we've actually removed a record (otherwise we'll go in an endless loop and stack overflow)
if (!(entitiesExceptFailedOne.Length < entities.Count))
if (!(personsExceptFailedOne.Length < persons.Count))
{
Debug.Fail("No entities removed from collection.");
Debug.Fail("No persons removed from collection.");
throw;
}

await txn.DisposeAsync();
await connection.DisposeAsync();

return await SyncPersonsAsync(entitiesExceptFailedOne, syncAudit, ignoreInvalid, dryRun, cancellationToken);
return await SyncPersonsAsync(personsExceptFailedOne, eventsExceptFailedOne, ignoreInvalid, dryRun, cancellationToken);
}

await txn.SaveEventsAsync(events, "events_person_import", clock, cancellationToken, timeoutSeconds: 120);

if (!dryRun)
{
await txn.CommitAsync(cancellationToken);
}

_syncedEntitiesSubject.OnNext(people.ToArray());
return people.Select(p => p.PersonId).ToArray();
_syncedEntitiesSubject.OnNext([.. toSync, .. events]);
return toSync.Select(p => p.PersonId).ToArray();
}

public async Task<int> SyncInductionsAsync(
Expand Down Expand Up @@ -389,17 +409,10 @@ public async Task<int> SyncInductionsAsync(
{
if (syncAudit)
{
await SyncAuditAsync(Contact.EntityLogicalName, contacts.Select(q => q.ContactId!.Value), skipIfExists: false, cancellationToken);
await SyncAuditAsync(dfeta_induction.EntityLogicalName, entities.Select(q => q.Id), skipIfExists: false, cancellationToken);
}

var contactAuditDetails = await GetAuditRecordsFromAuditRepositoryAsync(Contact.EntityLogicalName, Contact.PrimaryIdAttribute, contacts.Select(q => q.ContactId!.Value), cancellationToken);
var inductionAuditDetails = await GetAuditRecordsFromAuditRepositoryAsync(dfeta_induction.EntityLogicalName, dfeta_induction.PrimaryIdAttribute, entities.Select(q => q.Id), cancellationToken);

var auditDetails = contactAuditDetails
.Concat(inductionAuditDetails)
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);

var auditDetails = await GetAuditRecordsFromAuditRepositoryAsync(dfeta_induction.EntityLogicalName, dfeta_induction.PrimaryIdAttribute, entities.Select(q => q.Id), cancellationToken);
return await SyncInductionsAsync(contacts, entities, auditDetails, ignoreInvalid, createMigratedEvent, dryRun, cancellationToken);
}

Expand Down Expand Up @@ -500,7 +513,7 @@ private async Task<int> SyncInductionsAsync(
.Where(e => e is IEventWithPersonId && !unsyncedContactIds.Any(c => c == ((IEventWithPersonId)e).PersonId))
.ToArray();

await txn.SaveEventsAsync(eventsForSyncedContacts, "events_import", clock, cancellationToken, timeoutSeconds: 120);
await txn.SaveEventsAsync(eventsForSyncedContacts, "events_induction_import", clock, cancellationToken, timeoutSeconds: 120);

if (!dryRun)
{
Expand Down Expand Up @@ -1080,6 +1093,105 @@ private static ModelTypeSyncInfo GetModelTypeSyncInfoForEvent()
};
}

private (List<Person> Inductions, List<EventBase> Events) MapPersonsAndAudits(
IReadOnlyCollection<Contact> contacts,
IReadOnlyDictionary<Guid, AuditDetailCollection> auditDetails,
bool ignoreInvalid)
{
var events = new List<EventBase>();
var persons = MapPersons(contacts);

foreach (var contact in contacts)
{
if (auditDetails.TryGetValue(contact.ContactId!.Value, out var contactAudits))
{
// At the moment we are only interested in induction related changes
var contactAttributeNames = new[]
{
Contact.Fields.dfeta_qtlsdate,
Contact.Fields.dfeta_InductionStatus
};

var orderedContactAuditDetails = contactAudits.AuditDetails
.OfType<AttributeAuditDetail>()
.Where(a => a.AuditRecord.ToEntity<Audit>().Action != Audit_Action.Merge)
.Select(a =>
{
var allChangedAttributes = a.NewValue.Attributes.Keys.Union(a.OldValue.Attributes.Keys).ToArray();
var relevantChangedAttributes = allChangedAttributes.Where(k => contactAttributeNames.Contains(k)).ToArray();
var newValue = a.NewValue.ToEntity<Contact>().SparseClone(contactAttributeNames);
newValue.Id = contact.ContactId!.Value;
var oldValue = a.OldValue.ToEntity<Contact>().SparseClone(contactAttributeNames);
oldValue.Id = contact.ContactId!.Value;

return new AuditInfo<Contact>
{
AllChangedAttributes = allChangedAttributes,
RelevantChangedAttributes = relevantChangedAttributes,
NewValue = newValue,
OldValue = oldValue,
AuditRecord = a.AuditRecord.ToEntity<Audit>()
};
})
.OrderBy(a => a.AuditRecord.CreatedOn)
.ThenBy(a => a.AuditRecord.Action == Audit_Action.Create ? 0 : 1)
.ToArray();

foreach (var item in orderedContactAuditDetails)
{
// For the moment (until we migrate contacts) we are not interested in the created event
if (item.AuditRecord.Action == Audit_Action.Create)
{
continue;
}

if (item.AllChangedAttributes.Contains(Contact.Fields.StateCode) && item.AuditRecord.Action != Audit_Action.Create)
{
var nonStateAttributes = item.AllChangedAttributes
.Where(a => !(a is Contact.Fields.StateCode or Contact.Fields.StatusCode))
.ToArray();

if (nonStateAttributes.Length > 0)
{
throw new InvalidOperationException(
$"Expected state and status attributes to change in isolation but also received: {string.Join(", ", nonStateAttributes)}.");
}
}

var mappedEvent = MapContactInductionStatusChangedEvent(contact.ContactId!.Value, item.RelevantChangedAttributes, item.NewValue, item.OldValue, item.AuditRecord, orderedContactAuditDetails);
if (mappedEvent is not null)
{
events.Add(mappedEvent);
}
}
}
}

return (persons, events);
}

private static EventBase? MapContactInductionStatusChangedEvent(Guid contactId, string[] changedAttributes, Contact newValue, Contact oldValue, Audit audit, AuditInfo<Contact>[] allAuditDetails)
{
// Needs to have changed because of a QTLS date change which would be in a separate audit record shortly before the induction status change one (allow 10 seconds)
if (!changedAttributes.Contains(Contact.Fields.dfeta_InductionStatus) ||
(changedAttributes.Contains(Contact.Fields.dfeta_InductionStatus) &&
!allAuditDetails.Any(a => a.RelevantChangedAttributes.Contains(Contact.Fields.dfeta_qtlsdate) && audit.CreatedOn!.Value.Subtract(a.AuditRecord.CreatedOn!.Value).TotalSeconds < 10)))
{
return null;
}

return new DqtContactInductionStatusChangedEvent()
{
EventId = Guid.NewGuid(),
Key = $"{audit.Id}", // The CRM Audit ID
CreatedUtc = audit.CreatedOn!.Value,
RaisedBy = EventModels.RaisedByUserInfo.FromDqtUser(audit.UserId.Id, audit.UserId.Name),
PersonId = contactId,
InductionStatus = newValue.dfeta_InductionStatus.ToString(),
OldInductionStatus = oldValue.dfeta_InductionStatus.ToString()
};
}

private static List<Person> MapPersons(IEnumerable<Contact> contacts) => contacts
.Select(c => new Person()
{
Expand Down Expand Up @@ -1171,7 +1283,14 @@ private static List<Person> MapPersons(IEnumerable<Contact> contacts) => contact
var oldValue = a.OldValue.ToEntity<dfeta_induction>().SparseClone(inductionAttributeNames);
oldValue.Id = induction.Id;

return (AllChangedAttributes: allChangedAttributes, RelevantChangedAttributes: relevantChangedAttributes, NewValue: newValue, OldValue: oldValue, AuditRecord: a.AuditRecord.ToEntity<Audit>());
return new AuditInfo<dfeta_induction>
{
AllChangedAttributes = allChangedAttributes,
RelevantChangedAttributes = relevantChangedAttributes,
NewValue = newValue,
OldValue = oldValue,
AuditRecord = a.AuditRecord.ToEntity<Audit>()
};
})
.OrderBy(a => a.AuditRecord.CreatedOn)
.ThenBy(a => a.AuditRecord.Action == Audit_Action.Create ? 0 : 1)
Expand Down Expand Up @@ -1221,54 +1340,6 @@ private static List<Person> MapPersons(IEnumerable<Contact> contacts) => contact
}
}

if (auditDetails.TryGetValue(contact.ContactId!.Value, out var contactAudits))
{
var contactAttributeNames = new[]
{
Contact.Fields.dfeta_InductionStatus
};

var orderedContactAuditDetails = contactAudits.AuditDetails
.OfType<AttributeAuditDetail>()
.Where(a => a.AuditRecord.ToEntity<Audit>().Action != Audit_Action.Merge)
.Select(a =>
{
var allChangedAttributes = a.NewValue.Attributes.Keys.Union(a.OldValue.Attributes.Keys).ToArray();
var relevantChangedAttributes = allChangedAttributes.Where(k => contactAttributeNames.Contains(k)).ToArray();
var newValue = a.NewValue.ToEntity<Contact>().SparseClone(contactAttributeNames);
newValue.Id = contact.ContactId!.Value;
var oldValue = a.OldValue.ToEntity<Contact>().SparseClone(contactAttributeNames);
oldValue.Id = contact.ContactId!.Value;

return (AllChangedAttributes: allChangedAttributes, RelevantChangedAttributes: relevantChangedAttributes, NewValue: newValue, OldValue: oldValue, AuditRecord: a.AuditRecord.ToEntity<Audit>());
})
.OrderBy(a => a.AuditRecord.CreatedOn)
.ThenBy(a => a.AuditRecord.Action == Audit_Action.Create ? 0 : 1)
.ToArray();

foreach (var item in orderedContactAuditDetails)
{
if (item.AllChangedAttributes.Contains(Contact.Fields.StateCode) && item.AuditRecord.Action != Audit_Action.Create)
{
var nonStateAttributes = item.AllChangedAttributes
.Where(a => !(a is Contact.Fields.StateCode or Contact.Fields.StatusCode))
.ToArray();

if (nonStateAttributes.Length > 0)
{
throw new InvalidOperationException(
$"Expected state and status attributes to change in isolation but also received: {string.Join(", ", nonStateAttributes)}.");
}
}

var mappedEvent = MapContactInductionStatusChangedEvent(contact.ContactId!.Value, item.RelevantChangedAttributes, item.NewValue, item.OldValue, item.AuditRecord);
if (mappedEvent is not null)
{
events.Add(mappedEvent);
}
}
}

inductions.Add(mapped);
}

Expand Down Expand Up @@ -1356,25 +1427,6 @@ EventBase MapImportedEvent(Guid contactId, dfeta_induction induction)
};
}

EventBase? MapContactInductionStatusChangedEvent(Guid contactId, string[] changedAttributes, Contact newValue, Contact oldValue, Audit audit)
{
if (!changedAttributes.Contains(Contact.Fields.dfeta_InductionStatus))
{
return null;
}

return new DqtContactInductionStatusChangedEvent()
{
EventId = Guid.NewGuid(),
Key = $"{audit.Id}", // The CRM Audit ID
CreatedUtc = audit.CreatedOn!.Value,
RaisedBy = EventModels.RaisedByUserInfo.FromDqtUser(audit.UserId.Id, audit.UserId.Name),
PersonId = contactId,
InductionStatus = newValue.dfeta_InductionStatus.ToString(),
OldInductionStatus = oldValue.dfeta_InductionStatus.ToString()
};
}

async Task<EventBase> MapMigratedEventAsync(Guid contactId, dfeta_induction dqtInduction, InductionInfo mappedInduction)
{
var inductionExemptionReason = "";
Expand Down Expand Up @@ -1466,6 +1518,15 @@ private record InductionInfo
public required DateTime? DqtModifiedOn { get; init; }
}

private record AuditInfo<TEntity>
{
public required string[] AllChangedAttributes { get; init; }
public required string[] RelevantChangedAttributes { get; init; }
public required TEntity NewValue { get; init; }
public required TEntity OldValue { get; init; }
public required Audit AuditRecord { get; init; }
}

public static class ModelTypes
{
public const string Person = "Person";
Expand Down
Loading

0 comments on commit 9556ea3

Please sign in to comment.