diff --git a/src/cli.ts b/src/cli.ts index 0062ea9e..ef95c7e3 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -1,8 +1,12 @@ import * as repl from 'node:repl' import { dbClient } from '@/database/client' import { dispatch } from '@/queue/default-queue' +import { getPineconeClient } from '@/lib/pinecone/pinecone-client' const replServer = repl.start('> ') replServer.context.dbClient = dbClient replServer.context.dispatch = dispatch +replServer.context.pineconeIndex = getPineconeClient().Index( + process.env.PINECONE_INDEX_MAIN!, +) diff --git a/src/queue/handlers/handle-migrate-organization-embeddings.ts b/src/queue/handlers/handle-migrate-organization-embeddings.ts index b3907bf6..ae1b8296 100644 --- a/src/queue/handlers/handle-migrate-organization-embeddings.ts +++ b/src/queue/handlers/handle-migrate-organization-embeddings.ts @@ -1,4 +1,5 @@ import { dbClient } from '@/database/client' +import { logger } from '@/lib/log/logger' import { createOpenAIEmbedder } from '@/lib/open-ai/create-open-ai-embedder' import { getPineconeClient } from '@/lib/pinecone/pinecone-client' import { MigrateOrganizationEmbeddings } from '@/queue/jobs' @@ -54,6 +55,12 @@ export async function handleMigrateOrganizationEmbeddings( }) const namespace = `organization_${organization.id}` + const namespaceIndex = index.namespace(namespace) + + logger.debug(`Migration organization ${organization.id} embeddings`, { + event: 'migrate_org_embeddings:start', + organization_id: organization.id, + }) // Pull Request Diffs - straight copy @@ -71,8 +78,14 @@ export async function handleMigrateOrganizationEmbeddings( }, }) + logger.debug('Got pull_request_diff(s)', { + event: 'migrate_org_embeddings:got_pr_diff', + organization_id: organization.id, + num_pull_request_diffs: pullRequestDiffs.matches.length, + }) + for (const pullRequestDiff of pullRequestDiffs.matches) { - index.namespace(namespace).upsert([ + namespaceIndex.upsert([ { id: pullRequestDiff.id, values: pullRequestDiff.values, @@ -81,6 +94,12 @@ export async function handleMigrateOrganizationEmbeddings( ]) } + logger.debug('Finished migrating PR diffs', { + event: 'migrate_org_embeddings:finish_pr_diffs', + organization_id: organization.id, + num_pull_request_diffs: pullRequestDiffs.matches.length, + }) + // PR change - straight copy const pullRequestChanges = await index.query({ @@ -97,8 +116,15 @@ export async function handleMigrateOrganizationEmbeddings( }, }) + logger.debug('Got pr changes', { + event: 'migrate_org_embeddings:got_pr_changes', + organization_id: organization.id, + num_pull_request_diffs: pullRequestDiffs.matches.length, + num_pull_request_changes: pullRequestChanges.matches.length, + }) + for (const pullRequestChange of pullRequestChanges.matches) { - index.namespace(namespace).upsert([ + namespaceIndex.upsert([ { id: pullRequestChange.id, values: pullRequestChange.values, @@ -107,6 +133,13 @@ export async function handleMigrateOrganizationEmbeddings( ]) } + logger.debug('Finished PR changes', { + event: 'migrate_org_embeddings:finished_pr_changes', + organization_id: organization.id, + num_pull_request_diffs: pullRequestDiffs.matches.length, + num_pull_request_changes: pullRequestChanges.matches.length, + }) + // Conversation - straight copy const conversations = await index.query({ @@ -123,8 +156,16 @@ export async function handleMigrateOrganizationEmbeddings( }, }) + logger.debug('Got conversations', { + event: 'migrate_org_embeddings:got_conversations', + organization_id: organization.id, + num_pull_request_diffs: pullRequestDiffs.matches.length, + num_pull_request_changes: pullRequestChanges.matches.length, + num_conversations: conversations.matches.length, + }) + for (const conversation of conversations.matches) { - index.namespace(namespace).upsert([ + namespaceIndex.upsert([ { id: conversation.id, values: conversation.values, @@ -133,6 +174,14 @@ export async function handleMigrateOrganizationEmbeddings( ]) } + logger.debug('Finished conversations', { + event: 'migrate_org_embeddings:finished_conversations', + organization_id: organization.id, + num_pull_request_diffs: pullRequestDiffs.matches.length, + num_pull_request_changes: pullRequestChanges.matches.length, + num_conversations: conversations.matches.length, + }) + // PR Summary const prSummaries = await index.query({ @@ -149,6 +198,15 @@ export async function handleMigrateOrganizationEmbeddings( }, }) + logger.debug('Got PR summaries', { + event: 'migrate_org_embeddings:got_pr_summaries', + organization_id: organization.id, + num_pull_request_diffs: pullRequestDiffs.matches.length, + num_pull_request_changes: pullRequestChanges.matches.length, + num_conversations: conversations.matches.length, + num_pr_summaries: prSummaries.matches.length, + }) + for (const prSummary of prSummaries.matches) { const githubPrId = prSummary.metadata?.ext_gh_pull_request_id if (!githubPrId) { @@ -178,7 +236,7 @@ export async function handleMigrateOrganizationEmbeddings( const newChangeText = `${pr.title}\n${change}` const newChangeEmbedding = await embedder.embedQuery(newChangeText) - index.namespace(namespace).upsert([ + namespaceIndex.upsert([ { id: prSummary.id, values: newChangeEmbedding, @@ -198,6 +256,15 @@ export async function handleMigrateOrganizationEmbeddings( ]) } + logger.debug('Finished PR summaries', { + event: 'migrate_org_embeddings:finished_pr_summaries', + organization_id: organization.id, + num_pull_request_diffs: pullRequestDiffs.matches.length, + num_pull_request_changes: pullRequestChanges.matches.length, + num_conversations: conversations.matches.length, + num_pr_summaries: prSummaries.matches.length, + }) + const mrSummaries = await index.query({ vector: randomSearchVector, topK: 10000, @@ -212,6 +279,16 @@ export async function handleMigrateOrganizationEmbeddings( }, }) + logger.debug('Got MR summaries', { + event: 'migrate_org_embeddings:got_mr_summaries', + organization_id: organization.id, + num_pull_request_diffs: pullRequestDiffs.matches.length, + num_pull_request_changes: pullRequestChanges.matches.length, + num_conversations: conversations.matches.length, + num_pr_summaries: prSummaries.matches.length, + num_mr_summaries: mrSummaries.matches.length, + }) + for (const mrSummary of mrSummaries.matches) { const gitlabMrId = mrSummary.metadata?.ext_gitlab_merge_request_id if (!gitlabMrId) { @@ -241,7 +318,7 @@ export async function handleMigrateOrganizationEmbeddings( const newChangeText = `${pr.title}\n${change}` const newChangeEmbedding = await embedder.embedQuery(newChangeText) - index.namespace(namespace).upsert([ + namespaceIndex.upsert([ { id: mrSummary.id, values: newChangeEmbedding, @@ -261,6 +338,16 @@ export async function handleMigrateOrganizationEmbeddings( ]) } + logger.debug('Finished MR summaries', { + event: 'migrate_org_embeddings:finished_mr_summaries', + organization_id: organization.id, + num_pull_request_diffs: pullRequestDiffs.matches.length, + num_pull_request_changes: pullRequestChanges.matches.length, + num_conversations: conversations.matches.length, + num_pr_summaries: prSummaries.matches.length, + num_mr_summaries: mrSummaries.matches.length, + }) + // Diffs const diffs = await index.query({ vector: randomSearchVector, @@ -276,6 +363,17 @@ export async function handleMigrateOrganizationEmbeddings( }, }) + logger.debug('Got Diffs', { + event: 'migrate_org_embeddings:got_diffs', + organization_id: organization.id, + num_pull_request_diffs: pullRequestDiffs.matches.length, + num_pull_request_changes: pullRequestChanges.matches.length, + num_conversations: conversations.matches.length, + num_pr_summaries: prSummaries.matches.length, + num_mr_summaries: mrSummaries.matches.length, + num_diffs: diffs.matches.length, + }) + for (const diff of diffs.matches) { const prId = diff.metadata?.pull_request_id if (!prId) { @@ -305,7 +403,7 @@ export async function handleMigrateOrganizationEmbeddings( const newSnippetText = `${pr.title}\n${snippet}` const newSnippetEmbedding = await embedder.embedQuery(newSnippetText) - index.namespace(namespace).upsert([ + namespaceIndex.upsert([ { id: diff.id, values: newSnippetEmbedding, @@ -323,6 +421,17 @@ export async function handleMigrateOrganizationEmbeddings( }, ]) } + + logger.debug('Finished Diffs', { + event: 'migrate_org_embeddings:got_diffs', + organization_id: organization.id, + num_pull_request_diffs: pullRequestDiffs.matches.length, + num_pull_request_changes: pullRequestChanges.matches.length, + num_conversations: conversations.matches.length, + num_pr_summaries: prSummaries.matches.length, + num_mr_summaries: mrSummaries.matches.length, + num_diffs: diffs.matches.length, + }) } async function findHomiePr(organizationId: number, id: any) {