Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
adrien2p committed Jan 24, 2025
1 parent 84cd431 commit c6861e3
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
import {
configLoader,
container,
logger,
MedusaAppLoader,
} from "@medusajs/framework"
import { MedusaAppOutput, MedusaModule } from "@medusajs/framework/modules-sdk"
import { EventBusTypes, IndexTypes } from "@medusajs/framework/types"
import {
ContainerRegistrationKeys,
ModuleRegistrationName,
Modules,
} from "@medusajs/framework/utils"
import { initDb, TestDatabaseUtils } from "@medusajs/test-utils"
import { EntityManager } from "@mikro-orm/postgresql"
import { asValue } from "awilix"
import * as path from "path"
import { EventBusServiceMock } from "../__fixtures__"
import { dbName } from "../__fixtures__/medusa-config"
import { DataSynchronizer } from "../../src/utils/sync/data-synchronizer"

const eventBusMock = new EventBusServiceMock()
const queryMock = {
graph: jest.fn(),
}

const dbUtils = TestDatabaseUtils.dbTestUtilFactory()

jest.setTimeout(30000)

const testProductId = "test_prod_1"
const testVariantId = "test_var_1"

const mockData = [
{
id: testProductId,
title: "Test Product",
updated_at: new Date(),
},
{
id: testVariantId,
title: "Test Variant",
product_id: testProductId,
updated_at: new Date(),
},
]

describe("DataSynchronizer", () => {
let medusaApp: MedusaAppOutput
let medusaAppLoader: MedusaAppLoader
let index: IndexTypes.IIndexService
let dataSynchronizer: DataSynchronizer
let manager: EntityManager

beforeAll(async () => {
await configLoader(
path.join(__dirname, "./../__fixtures__"),
"medusa-config"
)

await dbUtils.create(dbName)
dbUtils.pgConnection_ = await initDb()

container.register({
[ContainerRegistrationKeys.LOGGER]: asValue(logger),
[ContainerRegistrationKeys.QUERY]: asValue(null),
[ContainerRegistrationKeys.PG_CONNECTION]: asValue(dbUtils.pgConnection_),
})

medusaAppLoader = new MedusaAppLoader(container as any)
await medusaAppLoader.runModulesMigrations()

MedusaModule.clearInstances()
medusaApp = await medusaAppLoader.load()

index = container.resolve(Modules.INDEX)
;(index as any).eventBusModuleService_ = eventBusMock
;(index as any).storageProvider_.query_ = queryMock

await medusaApp.onApplicationStart()

manager = (
medusaApp.sharedContainer!.resolve(ModuleRegistrationName.INDEX) as any
).container_.manager as EntityManager

// Initialize DataSynchronizer
const mockStorageProvider = {
consumeEvent: jest.fn().mockImplementation(() => Promise.resolve()),
}

const mockSchemaRepresentation = {
product: {
fields: ["id", "title", "updated_at"],
alias: "product",
moduleConfig: {
linkableKeys: {
id: true,
},
},
},
product_variant: {
fields: ["id", "title", "product_id", "updated_at"],
alias: "product_variant",
moduleConfig: {
linkableKeys: {
id: true,
},
},
},
}

dataSynchronizer = new DataSynchronizer({
storageProvider: mockStorageProvider as any,
schemaObjectRepresentation: mockSchemaRepresentation as any,
query: queryMock as any,
})
})

afterAll(async () => {
await medusaApp.onApplicationPrepareShutdown()
await medusaApp.onApplicationShutdown()
await dbUtils.shutdown(dbName)
})

beforeEach(async () => {
jest.clearAllMocks()
})

afterEach(async () => {
await dbUtils.teardown({ schema: "public" })
})

describe("sync", () => {
it("should sync products data correctly", async () => {
// Mock query response for products
queryMock.graph.mockResolvedValueOnce({
data: [mockData[0]],
})

const ackMock = jest.fn()

const result = await dataSynchronizer.sync({
entityName: "product",
pagination: {
cursor: "0",
limit: 10,
},
ack: ackMock,
})

expect(queryMock.graph).toHaveBeenCalledWith({
entity: "product",
fields: ["id"],
filters: {
id: { $gt: "0" },
},
pagination: {
order: {
id: "asc",
},
take: 1000,
},
})

expect(result).toEqual({
lastCursor: testProductId,
done: true,
})

expect(ackMock).toHaveBeenCalledWith({
lastCursor: testProductId,
})
})

it("should sync product variants data correctly", async () => {
// Mock query response for variants
queryMock.graph.mockResolvedValueOnce({
data: [mockData[1]],
})

const ackMock = jest.fn()

const result = await dataSynchronizer.sync({
entityName: "product_variant",
pagination: {
cursor: "0",
updated_at: new Date(Date.now() - 24 * 60 * 60 * 1000), // 24 hours ago
},
ack: ackMock,
})

expect(queryMock.graph).toHaveBeenCalledWith({
entity: "product_variant",
fields: ["id"],
filters: {
id: { $gt: "0" },
updated_at: { $gt: expect.any(Date) },
},
pagination: {
order: {
id: "asc",
},
take: 1000,
},
})

expect(result).toEqual({
lastCursor: testVariantId,
done: true,
})

expect(ackMock).toHaveBeenCalledWith({
lastCursor: testVariantId,
})
})

it("should handle errors during sync", async () => {
const error = new Error("Sync failed")
queryMock.graph.mockRejectedValueOnce(error)

const ackMock = jest.fn()

const result = await dataSynchronizer.sync({
entityName: "product",
pagination: {
cursor: "0",
},
ack: ackMock,
})

expect(ackMock).toHaveBeenCalledWith({
lastCursor: "0",
err: error,
})
})

it("should handle empty data response", async () => {
queryMock.graph.mockResolvedValueOnce({
data: [],
})

const ackMock = jest.fn()

const result = await dataSynchronizer.sync({
entityName: "product",
pagination: {
cursor: "0",
},
ack: ackMock,
})

expect(result).toEqual({
lastCursor: "0",
done: true,
})
})
})
})
2 changes: 1 addition & 1 deletion packages/modules/index/src/services/postgres-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
data: parentData_,
})

const parentIndexRelationEntry = indexRelationRepository.create({
const parentIndexRelationEntry = indexRelationRepository.upsert({
parent_id: (parentData_ as any).id,
parent_name: parentEntity,
child_id: cleanedEntityData.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ export class DataSynchronizer {
}: {
entityName: string
pagination: {
cursor: string
cursor?: string
updated_at?: string | Date
limit?: number
batchSize?: number
}
ack: (ack: {
lastCursor: string
lastCursor: string | null
done?: true
err?: Error
}) => Promise<void>
Expand All @@ -55,7 +55,7 @@ export class DataSynchronizer {

if (!entityPrimaryKey) {
void ack({
lastCursor: pagination.cursor,
lastCursor: pagination.cursor ?? null,
err: new Error(
`Entity ${entityName} does not have a linkable primary key`
),
Expand All @@ -64,14 +64,16 @@ export class DataSynchronizer {
}

let processed = 0
let currentCursor = pagination.cursor
let currentCursor = pagination.cursor!
const batchSize = pagination.batchSize ?? 1000
const limit = pagination.limit ?? Infinity
let done = false

while (processed < limit || !done) {
const filters: Record<string, any> = {
[entityPrimaryKey]: { $gt: currentCursor },
const filters: Record<string, any> = {}

if (currentCursor) {
filters[entityPrimaryKey] = { $gt: currentCursor }
}

if (pagination.updated_at) {
Expand Down

0 comments on commit c6861e3

Please sign in to comment.