diff --git a/src/tasks/sync_stores.ts b/src/tasks/sync_stores.ts index 6426074..9e583c6 100644 --- a/src/tasks/sync_stores.ts +++ b/src/tasks/sync_stores.ts @@ -1,57 +1,71 @@ -import _ from "lodash"; +import fs from "fs"; +import path from 'path'; import { SimpleIntervalJob, Task } from "toad-scheduler"; import { getStoresList, + Wallet, DataStore, DigNetwork, NconfManager, ServerCoin, - StoreMonitorRegistry, } from "@dignetwork/dig-sdk"; import { Mutex } from "async-mutex"; -import { RootHistoryItem } from "@dignetwork/dig-sdk/dist/types"; +import { getStorageLocation } from "../utils/storage"; + +const STORE_PATH = path.join(getStorageLocation(), "stores"); + const mutex = new Mutex(); const PUBLIC_IP_KEY = "publicIp"; const nconfManager = new NconfManager("config.json"); -const missingSyncPool = new Set(); +const syncStore = async (storeId: string): Promise => { + console.log(`Starting sync process for store ${storeId}...`); -// ------------------------- -// Helper Functions -// ------------------------- - -/** - * Attempts to synchronize a store from the network. - * @param storeId - The ID of the store to synchronize. - */ -const syncStoreFromNetwork = async (storeId: string): Promise => { try { - console.log(`Attempting to sync store ${storeId} from the network...`); + const isUpToDate = await isStoreUpToDate(storeId); - const digNetwork = new DigNetwork(storeId); - await digNetwork.syncStoreFromPeers(); + if (isUpToDate) { + console.log(`Store ${storeId} is already up to date.`); + return; + } - const dataStore = await DataStore.from(storeId); - await dataStore.fetchCoinInfo(); + console.log(`Store ${storeId} is out of date. Syncing...`); + await syncStoreFromNetwork(storeId); + } catch (error: any) { + console.trace(`Error processing store ${storeId}: ${error.message}`); + } finally { + await finalizeStoreSync(storeId); + } +}; - const rootHistory = await dataStore.getRootHistory(); +const isStoreUpToDate = async (storeId: string): Promise => { + console.log(`Checking if store ${storeId} is up to date...`); + const dataStore = await DataStore.from(storeId); - const lastRoot = _.last(rootHistory); - if (!lastRoot) { - console.error(`No root history found for store ${storeId}.`); - return; - } + const rootHistory = await dataStore.getRootHistory(); + const storePath = path.join(STORE_PATH, storeId); - // If not synced put in the pool to keep trying - if ((lastRoot as RootHistoryItem).synced) { - missingSyncPool.delete(storeId); - console.log(`Store ${storeId} synchronized successfully.`); - } else { - missingSyncPool.add(storeId); - console.log(`Store ${storeId} could not synchronize, will try again in a few minutes.`); - } + if (!fs.existsSync(storePath)) { + console.log(`Store path not found for store ${storeId}.`); + return false; + } + + // Get the count of .dat files in the store directory + const datFiles = fs.readdirSync(storePath).filter(file => file.endsWith(".dat") && !file.includes('manifest')); + const datFileCount = datFiles.length; + + console.log(`Root history count: ${rootHistory.length}, .dat files count: ${datFileCount}`); + return rootHistory.length === datFileCount; +}; + + +const syncStoreFromNetwork = async (storeId: string): Promise => { + try { + console.log(`Attempting to sync store ${storeId} from the network...`); + const digNetwork = new DigNetwork(storeId); + await digNetwork.syncStoreFromPeers(); } catch (error: any) { console.warn( `Initial sync attempt failed for ${storeId}: ${error.message}` @@ -63,92 +77,31 @@ const syncStoreFromNetwork = async (storeId: string): Promise => { } }; -/** - * Ensures that the server coin exists and is valid for a specific store. - * @param storeId - The ID of the store. - * @param publicIp - The public IP address of the node. - */ -const ensureServerCoin = async ( - storeId: string, - publicIp: string -): Promise => { +const finalizeStoreSync = async (storeId: string): Promise => { try { - const serverCoin = new ServerCoin(storeId); - await serverCoin.ensureServerCoinExists(publicIp); - await serverCoin.meltOutdatedEpochs(publicIp); + console.log(`Finalizing sync for store ${storeId}...`); + const dataStore = await DataStore.from(storeId); + await dataStore.fetchCoinInfo(); + console.log(`Finalization complete for store ${storeId}.`); } catch (error: any) { - console.error( - `Failed to ensure server coin for store ${storeId}: ${error.message}` - ); + console.error(`Error in finalizing store ${storeId}: ${error.message}`); } }; -const processMissingSyncPool = async (): Promise => { - for (const storeId of missingSyncPool) { +const task = new Task("sync-stores", async () => { + if (!mutex.isLocked()) { + const releaseMutex = await mutex.acquire(); + let mnemonic: string | undefined; + try { - await syncStoreFromNetwork(storeId); + const wallet = await Wallet.load("default", false); + mnemonic = await wallet.getMnemonic(); } catch (error: any) { - console.error(`Failed to sync store ${storeId}: ${error.message}`); - } - } -}; - -// ------------------------- -// Initialization Function -// ------------------------- - -/** - * Initializes all stores by registering them with the store monitor and syncing them. - */ -const initializeStoreMonitor = async (): Promise => { - try { - console.log("Initializing stores monitor..."); - const storeMonitor = StoreMonitorRegistry.getInstance(); - - const storeList = getStoresList(); - - const publicIp: string | null | undefined = - await nconfManager.getConfigValue(PUBLIC_IP_KEY); - - // Register each store with the store monitor - storeList.forEach((storeId) => { - const serverCoin = new ServerCoin(storeId); - storeMonitor.registerStore(storeId, async () => { - if (publicIp) { - await serverCoin.ensureServerCoinExists(publicIp); - } - - console.log(`Store update detected for ${storeId}. Syncing...`); - await syncStoreFromNetwork(storeId); - - if (publicIp) { - await serverCoin.ensureServerCoinExists(publicIp); - } - }); - }); - - // Attempt to sync each store initially - for (const storeId of storeList) { - await syncStoreFromNetwork(storeId); + console.error(`Error in sync-stores task: ${error.message}`); + releaseMutex(); + return; } - console.log("All stores have been initialized and synchronized."); - } catch (error: any) { - console.error(`Initialization failed: ${error.message}`); - } -}; - -// ------------------------- -// Scheduler Task -// ------------------------- - -/** - * Defines the scheduled task to sync stores and ensure server coins. - */ -const syncStoresTask = new Task("sync-stores", async () => { - if (!mutex.isLocked()) { - const releaseMutex = await mutex.acquire(); - try { console.log("Starting sync-stores task..."); @@ -168,16 +121,16 @@ const syncStoresTask = new Task("sync-stores", async () => { console.error( `Failed to retrieve public IP from configuration: ${error.message}` ); - releaseMutex(); return; // Exit the task if we can't retrieve the public IP } - await processMissingSyncPool(); - for (const storeId of storeList) { try { + await syncStore(storeId); if (publicIp) { - await ensureServerCoin(storeId, publicIp); + const serverCoin = new ServerCoin(storeId); + await serverCoin.ensureServerCoinExists(publicIp); + await serverCoin.meltOutdatedEpochs(publicIp); } else { console.warn( `Skipping server coin check for store ${storeId} due to missing public IP.` @@ -199,21 +152,13 @@ const syncStoresTask = new Task("sync-stores", async () => { } }); -// ------------------------- -// Scheduler Job Setup -// ------------------------- - const job = new SimpleIntervalJob( { - minutes: 5, + seconds: 60, runImmediately: true, }, - syncStoresTask, + task, { id: "sync-stores", preventOverrun: true } ); -setTimeout(() => { - initializeStoreMonitor(); -}, 5000); - -export default job; +export default job; \ No newline at end of file