Skip to content

Commit

Permalink
Merge pull request #93 from DIG-Network/develop
Browse files Browse the repository at this point in the history
revert: old propagation logic
  • Loading branch information
MichaelTaylor3D authored Oct 4, 2024
2 parents 8f7ed34 + d6f9fbd commit e89e722
Showing 1 changed file with 69 additions and 124 deletions.
193 changes: 69 additions & 124 deletions src/tasks/sync_stores.ts
Original file line number Diff line number Diff line change
@@ -1,57 +1,71 @@
import _ from "lodash";
import fs from "fs";
import path from 'path';
import { SimpleIntervalJob, Task } from "toad-scheduler";
import {
getStoresList,
Wallet,
DataStore,
DigNetwork,
NconfManager,
ServerCoin,
StoreMonitorRegistry,
} from "@dignetwork/dig-sdk";
import { Mutex } from "async-mutex";
import { RootHistoryItem } from "@dignetwork/dig-sdk/dist/types";
import { getStorageLocation } from "../utils/storage";

const STORE_PATH = path.join(getStorageLocation(), "stores");

const mutex = new Mutex();

const PUBLIC_IP_KEY = "publicIp";
const nconfManager = new NconfManager("config.json");

const missingSyncPool = new Set<string>();
const syncStore = async (storeId: string): Promise<void> => {
console.log(`Starting sync process for store ${storeId}...`);

// -------------------------
// Helper Functions
// -------------------------

/**
* Attempts to synchronize a store from the network.
* @param storeId - The ID of the store to synchronize.
*/
const syncStoreFromNetwork = async (storeId: string): Promise<void> => {
try {
console.log(`Attempting to sync store ${storeId} from the network...`);
const isUpToDate = await isStoreUpToDate(storeId);

const digNetwork = new DigNetwork(storeId);
await digNetwork.syncStoreFromPeers();
if (isUpToDate) {
console.log(`Store ${storeId} is already up to date.`);
return;
}

const dataStore = await DataStore.from(storeId);
await dataStore.fetchCoinInfo();
console.log(`Store ${storeId} is out of date. Syncing...`);
await syncStoreFromNetwork(storeId);
} catch (error: any) {
console.trace(`Error processing store ${storeId}: ${error.message}`);
} finally {
await finalizeStoreSync(storeId);
}
};

const rootHistory = await dataStore.getRootHistory();
const isStoreUpToDate = async (storeId: string): Promise<boolean> => {
console.log(`Checking if store ${storeId} is up to date...`);
const dataStore = await DataStore.from(storeId);

const lastRoot = _.last(rootHistory);
if (!lastRoot) {
console.error(`No root history found for store ${storeId}.`);
return;
}
const rootHistory = await dataStore.getRootHistory();
const storePath = path.join(STORE_PATH, storeId);

// If not synced put in the pool to keep trying
if ((lastRoot as RootHistoryItem).synced) {
missingSyncPool.delete(storeId);
console.log(`Store ${storeId} synchronized successfully.`);
} else {
missingSyncPool.add(storeId);
console.log(`Store ${storeId} could not synchronize, will try again in a few minutes.`);
}
if (!fs.existsSync(storePath)) {
console.log(`Store path not found for store ${storeId}.`);
return false;
}

// Get the count of .dat files in the store directory
const datFiles = fs.readdirSync(storePath).filter(file => file.endsWith(".dat") && !file.includes('manifest'));
const datFileCount = datFiles.length;

console.log(`Root history count: ${rootHistory.length}, .dat files count: ${datFileCount}`);

return rootHistory.length === datFileCount;
};


const syncStoreFromNetwork = async (storeId: string): Promise<void> => {
try {
console.log(`Attempting to sync store ${storeId} from the network...`);
const digNetwork = new DigNetwork(storeId);
await digNetwork.syncStoreFromPeers();
} catch (error: any) {
console.warn(
`Initial sync attempt failed for ${storeId}: ${error.message}`
Expand All @@ -63,92 +77,31 @@ const syncStoreFromNetwork = async (storeId: string): Promise<void> => {
}
};

/**
* Ensures that the server coin exists and is valid for a specific store.
* @param storeId - The ID of the store.
* @param publicIp - The public IP address of the node.
*/
const ensureServerCoin = async (
storeId: string,
publicIp: string
): Promise<void> => {
const finalizeStoreSync = async (storeId: string): Promise<void> => {
try {
const serverCoin = new ServerCoin(storeId);
await serverCoin.ensureServerCoinExists(publicIp);
await serverCoin.meltOutdatedEpochs(publicIp);
console.log(`Finalizing sync for store ${storeId}...`);
const dataStore = await DataStore.from(storeId);
await dataStore.fetchCoinInfo();
console.log(`Finalization complete for store ${storeId}.`);
} catch (error: any) {
console.error(
`Failed to ensure server coin for store ${storeId}: ${error.message}`
);
console.error(`Error in finalizing store ${storeId}: ${error.message}`);
}
};

const processMissingSyncPool = async (): Promise<void> => {
for (const storeId of missingSyncPool) {
const task = new Task("sync-stores", async () => {
if (!mutex.isLocked()) {
const releaseMutex = await mutex.acquire();
let mnemonic: string | undefined;

try {
await syncStoreFromNetwork(storeId);
const wallet = await Wallet.load("default", false);
mnemonic = await wallet.getMnemonic();
} catch (error: any) {
console.error(`Failed to sync store ${storeId}: ${error.message}`);
}
}
};

// -------------------------
// Initialization Function
// -------------------------

/**
* Initializes all stores by registering them with the store monitor and syncing them.
*/
const initializeStoreMonitor = async (): Promise<void> => {
try {
console.log("Initializing stores monitor...");
const storeMonitor = StoreMonitorRegistry.getInstance();

const storeList = getStoresList();

const publicIp: string | null | undefined =
await nconfManager.getConfigValue(PUBLIC_IP_KEY);

// Register each store with the store monitor
storeList.forEach((storeId) => {
const serverCoin = new ServerCoin(storeId);
storeMonitor.registerStore(storeId, async () => {
if (publicIp) {
await serverCoin.ensureServerCoinExists(publicIp);
}

console.log(`Store update detected for ${storeId}. Syncing...`);
await syncStoreFromNetwork(storeId);

if (publicIp) {
await serverCoin.ensureServerCoinExists(publicIp);
}
});
});

// Attempt to sync each store initially
for (const storeId of storeList) {
await syncStoreFromNetwork(storeId);
console.error(`Error in sync-stores task: ${error.message}`);
releaseMutex();
return;
}

console.log("All stores have been initialized and synchronized.");
} catch (error: any) {
console.error(`Initialization failed: ${error.message}`);
}
};

// -------------------------
// Scheduler Task
// -------------------------

/**
* Defines the scheduled task to sync stores and ensure server coins.
*/
const syncStoresTask = new Task("sync-stores", async () => {
if (!mutex.isLocked()) {
const releaseMutex = await mutex.acquire();

try {
console.log("Starting sync-stores task...");

Expand All @@ -168,16 +121,16 @@ const syncStoresTask = new Task("sync-stores", async () => {
console.error(
`Failed to retrieve public IP from configuration: ${error.message}`
);
releaseMutex();
return; // Exit the task if we can't retrieve the public IP
}

await processMissingSyncPool();

for (const storeId of storeList) {
try {
await syncStore(storeId);
if (publicIp) {
await ensureServerCoin(storeId, publicIp);
const serverCoin = new ServerCoin(storeId);
await serverCoin.ensureServerCoinExists(publicIp);
await serverCoin.meltOutdatedEpochs(publicIp);
} else {
console.warn(
`Skipping server coin check for store ${storeId} due to missing public IP.`
Expand All @@ -199,21 +152,13 @@ const syncStoresTask = new Task("sync-stores", async () => {
}
});

// -------------------------
// Scheduler Job Setup
// -------------------------

const job = new SimpleIntervalJob(
{
minutes: 5,
seconds: 60,
runImmediately: true,
},
syncStoresTask,
task,
{ id: "sync-stores", preventOverrun: true }
);

setTimeout(() => {
initializeStoreMonitor();
}, 5000);

export default job;
export default job;

0 comments on commit e89e722

Please sign in to comment.