From 1d228ac0e18e21f9cf627f87bc845b4d1076a860 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Fri, 4 Oct 2024 19:42:54 -0400 Subject: [PATCH 1/4] chore(release): 0.0.1-alpha.101 --- CHANGELOG.md | 2 ++ package-lock.json | 4 ++-- package.json | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e13b47a..5f31ad2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +### [0.0.1-alpha.101](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.100...v0.0.1-alpha.101) (2024-10-04) + ### [0.0.1-alpha.100](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.99...v0.0.1-alpha.100) (2024-10-04) ### [0.0.1-alpha.99](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.98...v0.0.1-alpha.99) (2024-10-04) diff --git a/package-lock.json b/package-lock.json index 977606b..14ef6cd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "dig-propagation-server", - "version": "0.0.1-alpha.100", + "version": "0.0.1-alpha.101", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "dig-propagation-server", - "version": "0.0.1-alpha.100", + "version": "0.0.1-alpha.101", "license": "ISC", "dependencies": { "@dignetwork/datalayer-driver": "^0.1.28", diff --git a/package.json b/package.json index 995ce7d..0f3c4c7 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "dig-propagation-server", - "version": "0.0.1-alpha.100", + "version": "0.0.1-alpha.101", "description": "", "type": "commonjs", "main": "./dist/index.js", From c8e0f53feea7d9c25d2427f3f5229529506946dc Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Sat, 5 Oct 2024 13:09:20 -0400 Subject: [PATCH 2/4] feat: more agressive propagation algorithm --- src/app.ts | 1 - src/tasks/sync_stores.ts | 193 +++++++++++++++++++++++++++++++-------- 2 files changed, 157 insertions(+), 37 deletions(-) diff --git a/src/app.ts b/src/app.ts index ecf5b2c..2048e5f 100644 --- a/src/app.ts +++ b/src/app.ts @@ -35,7 +35,6 @@ const serverKey = fs.readFileSync(serverKeyPath); const app = express(); const PORT = Number(process.env.PORT) || 4159; -app.set('trust proxy', true); app.use(requestIp.mw()); // Apply store routes diff --git a/src/tasks/sync_stores.ts b/src/tasks/sync_stores.ts index 9e583c6..13f9d57 100644 --- a/src/tasks/sync_stores.ts +++ b/src/tasks/sync_stores.ts @@ -3,11 +3,11 @@ import path from 'path'; import { SimpleIntervalJob, Task } from "toad-scheduler"; import { getStoresList, - Wallet, DataStore, DigNetwork, NconfManager, ServerCoin, + DigPeer, } from "@dignetwork/dig-sdk"; import { Mutex } from "async-mutex"; import { getStorageLocation } from "../utils/storage"; @@ -19,53 +19,162 @@ const mutex = new Mutex(); const PUBLIC_IP_KEY = "publicIp"; const nconfManager = new NconfManager("config.json"); -const syncStore = async (storeId: string): Promise => { - console.log(`Starting sync process for store ${storeId}...`); +// Map to track which peerIps have been checked for each rootHash +const checkedPeersMap: Map> = new Map(); +/** + * Process a single peer: check for rootHash and ping update if necessary. + * @param peerIp - The IP address of the peer. + * @param rootHash - The current rootHash to check against. + * @param checkedPeers - The set of peers already checked for the rootHash. + */ +const processPeer = async (peerIp: string, storeId: string, rootHash: string, checkedPeers: Set): Promise => { try { - const isUpToDate = await isStoreUpToDate(storeId); + const digPeer = new DigPeer(peerIp, storeId); + const hasRootHash = await digPeer.contentServer.hasRootHash(rootHash); - if (isUpToDate) { - console.log(`Store ${storeId} is already up to date.`); + if (hasRootHash) { + console.log(`Peer ${peerIp} already has rootHash ${rootHash}. Marking as checked.`); + checkedPeers.add(peerIp); // Mark as checked only if peer has the rootHash + } else { + console.log(`Peer ${peerIp} does not have rootHash ${rootHash}. Pinging update.`); + await digPeer.propagationServer.pingUpdate(rootHash); + // Do NOT mark as checked if peer lacks the rootHash + } + } catch (error: any) { + console.error(`Error interacting with peer ${peerIp}: ${error.message}`); + } +}; + +/** + * Clean the checkedPeersMap to retain only the current rootHash. + * @param currentRootHash - The rootHash to retain in the map. + */ +const cleanCheckedPeersMap = (currentRootHash: string): void => { + for (const [rootHash, _] of checkedPeersMap.entries()) { + if (rootHash !== currentRootHash) { + checkedPeersMap.delete(rootHash); + console.log(`Removed outdated rootHash ${rootHash} from checkedPeersMap.`); + } + } +}; + +/** + * Handle a store that is already synced by checking peers for updates sequentially. + * @param storeId - The ID of the synced store. + * @param serverCoin - The ServerCoin instance associated with the store. + */ +const handleSyncedStore = async (storeId: string, serverCoin: ServerCoin): Promise => { + try { + const dataStore = await DataStore.from(storeId); + const rootHistory = await dataStore.getRootHistory(); + + if (rootHistory.length === 0) { + console.warn(`No root history found for store ${storeId}. Skipping peer checks.`); return; } - console.log(`Store ${storeId} is out of date. Syncing...`); - await syncStoreFromNetwork(storeId); + // Get the current rootHash from the last entry of rootHistory + const currentRootHashObj = rootHistory[rootHistory.length - 1]; + const currentRootHash = currentRootHashObj.root_hash; + console.log(`Current rootHash for store ${storeId}: ${currentRootHash}`); + + // Clean checkedPeersMap to only retain peers checked for the current rootHash + cleanCheckedPeersMap(currentRootHash); + + // Initialize the set for the current rootHash if not present + if (!checkedPeersMap.has(currentRootHash)) { + checkedPeersMap.set(currentRootHash, new Set()); + console.log(`Initialized checkedPeersMap for current rootHash ${currentRootHash}.`); + } + + const checkedPeers = checkedPeersMap.get(currentRootHash)!; + + // Pass the checkedPeers as the blocklist to getActiveEpochPeers + const blocklist = Array.from(checkedPeers); + const peerIps: string[] = await serverCoin.getActiveEpochPeers(blocklist); + console.log(`Active epoch peers for store ${storeId}:`, peerIps); + + if (peerIps.length === 0) { + console.log(`No new peers to process for rootHash ${currentRootHash} in store ${storeId}.`); + return; + } + + // Process peers one at a time sequentially + for (const peerIp of peerIps) { + if (checkedPeers.has(peerIp)) { + console.log(`Peer ${peerIp} has already been checked for rootHash ${currentRootHash}. Skipping.`); + continue; + } + + await processPeer(peerIp, storeId, currentRootHash, checkedPeers); + } + + console.log(`Completed processing peers for rootHash ${currentRootHash} in store ${storeId}.`); + } catch (error: any) { - console.trace(`Error processing store ${storeId}: ${error.message}`); - } finally { - await finalizeStoreSync(storeId); + console.error(`Error handling synced store ${storeId}: ${error.message}`); + } +}; + +/** + * Synchronize a single store based on its sync status. + * @param storeId - The ID of the store to synchronize. + * @param serverCoin - The ServerCoin instance associated with the store. + */ +const synchronizeStore = async (storeId: string, serverCoin: ServerCoin): Promise => { + console.log(`Starting synchronization for store ${storeId}...`); + + const isSynced = await isStoreSynced(storeId); + + if (isSynced) { + console.log(`Store ${storeId} is synced. Proceeding with peer checks.`); + await handleSyncedStore(storeId, serverCoin); + } else { + console.log(`Store ${storeId} is not synced. Initiating synchronization from peers.`); + await syncStoreFromNetwork(storeId); } }; -const isStoreUpToDate = async (storeId: string): Promise => { - console.log(`Checking if store ${storeId} is up to date...`); +/** + * Check if the store is synced by verifying root history entries. + * @param storeId - The ID of the store to check. + * @returns A boolean indicating whether the store is up to date. + */ +const isStoreSynced = async (storeId: string): Promise => { + console.log(`Checking synchronization status for store ${storeId}...`); const dataStore = await DataStore.from(storeId); const rootHistory = await dataStore.getRootHistory(); const storePath = path.join(STORE_PATH, storeId); if (!fs.existsSync(storePath)) { - console.log(`Store path not found for store ${storeId}.`); + console.log(`Store path not found for store ${storeId}. Considering it as not synced.`); return false; } - // Get the count of .dat files in the store directory - const datFiles = fs.readdirSync(storePath).filter(file => file.endsWith(".dat") && !file.includes('manifest')); - const datFileCount = datFiles.length; + // Check if any entry in rootHistory has synced = false + const hasUnsyncedEntries = rootHistory.some(entry => entry.synced === false); - console.log(`Root history count: ${rootHistory.length}, .dat files count: ${datFileCount}`); - - return rootHistory.length === datFileCount; + if (hasUnsyncedEntries) { + console.log(`Store ${storeId} has unsynced entries in root history.`); + return false; + } else { + console.log(`All entries in root history for store ${storeId} are synced.`); + return true; + } }; - +/** + * Synchronize the store from the network. + * @param storeId - The ID of the store to synchronize. + */ const syncStoreFromNetwork = async (storeId: string): Promise => { try { console.log(`Attempting to sync store ${storeId} from the network...`); const digNetwork = new DigNetwork(storeId); await digNetwork.syncStoreFromPeers(); + console.log(`Successfully synced store ${storeId} from peers.`); } catch (error: any) { console.warn( `Initial sync attempt failed for ${storeId}: ${error.message}` @@ -74,9 +183,14 @@ const syncStoreFromNetwork = async (storeId: string): Promise => { console.error(`No DIG Peers found for store ${storeId}. Skipping...`); return; } + // Optionally, implement retry logic or additional error handling here } }; +/** + * Finalize the synchronization process for a store. + * @param storeId - The ID of the store to finalize. + */ const finalizeStoreSync = async (storeId: string): Promise => { try { console.log(`Finalizing sync for store ${storeId}...`); @@ -88,19 +202,12 @@ const finalizeStoreSync = async (storeId: string): Promise => { } }; +/** + * Main task to synchronize all stores. + */ const task = new Task("sync-stores", async () => { if (!mutex.isLocked()) { const releaseMutex = await mutex.acquire(); - let mnemonic: string | undefined; - - try { - const wallet = await Wallet.load("default", false); - mnemonic = await wallet.getMnemonic(); - } catch (error: any) { - console.error(`Error in sync-stores task: ${error.message}`); - releaseMutex(); - return; - } try { console.log("Starting sync-stores task..."); @@ -121,23 +228,34 @@ const task = new Task("sync-stores", async () => { console.error( `Failed to retrieve public IP from configuration: ${error.message}` ); + releaseMutex(); return; // Exit the task if we can't retrieve the public IP } for (const storeId of storeList) { try { - await syncStore(storeId); if (publicIp) { const serverCoin = new ServerCoin(storeId); await serverCoin.ensureServerCoinExists(publicIp); - await serverCoin.meltOutdatedEpochs(publicIp); + + // Synchronize the store based on its sync status + await synchronizeStore(storeId, serverCoin); + + // After synchronization, ensure the server coin is updated + await serverCoin.ensureServerCoinExists(publicIp); } else { console.warn( - `Skipping server coin check for store ${storeId} due to missing public IP.` + `Skipping server coin operations for store ${storeId} due to missing public IP.` ); + + // Even if public IP is missing, you might still want to synchronize the store + await synchronizeStore(storeId, new ServerCoin(storeId)); } + + // Finalize synchronization + await finalizeStoreSync(storeId); } catch (error: any) { - console.error(`Failed to sync store ${storeId}: ${error.message}`); + console.error(`Failed to synchronize store ${storeId}: ${error.message}`); } } @@ -149,9 +267,12 @@ const task = new Task("sync-stores", async () => { } finally { releaseMutex(); } + } else { + console.log("Sync-stores task is already running. Skipping this run."); } }); +// Schedule the task to run every 60 seconds, starting immediately const job = new SimpleIntervalJob( { seconds: 60, @@ -161,4 +282,4 @@ const job = new SimpleIntervalJob( { id: "sync-stores", preventOverrun: true } ); -export default job; \ No newline at end of file +export default job; From 3137b7fbab3a53d49141a0d027a3ed8070973bff Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Sat, 5 Oct 2024 13:24:57 -0400 Subject: [PATCH 3/4] chore: update sdk --- package-lock.json | 8 ++++---- package.json | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/package-lock.json b/package-lock.json index 14ef6cd..c4ec30d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,7 +10,7 @@ "license": "ISC", "dependencies": { "@dignetwork/datalayer-driver": "^0.1.28", - "@dignetwork/dig-sdk": "^0.0.1-alpha.131", + "@dignetwork/dig-sdk": "^0.0.1-alpha.133", "async-mutex": "^0.5.0", "busboy": "^1.6.0", "express": "^4.19.2", @@ -252,9 +252,9 @@ } }, "node_modules/@dignetwork/dig-sdk": { - "version": "0.0.1-alpha.131", - "resolved": "https://registry.npmjs.org/@dignetwork/dig-sdk/-/dig-sdk-0.0.1-alpha.131.tgz", - "integrity": "sha512-FHJLv2piS9Z8cCzVH60aPxEi6dpCkqb/BZv079ZgrzOukuclWRUREdbvilNKBEVEYmArEpM+6m+cBAuGMELS2Q==", + "version": "0.0.1-alpha.133", + "resolved": "https://registry.npmjs.org/@dignetwork/dig-sdk/-/dig-sdk-0.0.1-alpha.133.tgz", + "integrity": "sha512-Cq1O15ZCPX1fdMxfYF0R1gDNnWhx1VfKl2/3admhKVZf9KCUABwFZbWZGa9CKJFk4lYnj57q4nMA4ha52qmVNw==", "dependencies": { "@dignetwork/datalayer-driver": "^0.1.29", "@dignetwork/dig-sdk": "^0.0.1-alpha.124", diff --git a/package.json b/package.json index 0f3c4c7..e689cfd 100644 --- a/package.json +++ b/package.json @@ -26,7 +26,7 @@ ], "dependencies": { "@dignetwork/datalayer-driver": "^0.1.28", - "@dignetwork/dig-sdk": "^0.0.1-alpha.131", + "@dignetwork/dig-sdk": "^0.0.1-alpha.133", "async-mutex": "^0.5.0", "busboy": "^1.6.0", "express": "^4.19.2", From 67bfc8b1849578e86607f0a492133e4523aa5b40 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Sat, 5 Oct 2024 13:25:43 -0400 Subject: [PATCH 4/4] chore(release): 0.0.1-alpha.102 --- CHANGELOG.md | 7 +++++++ package-lock.json | 4 ++-- package.json | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f31ad2..9dafeac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +### [0.0.1-alpha.102](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.101...v0.0.1-alpha.102) (2024-10-05) + + +### Features + +* more agressive propagation algorithm ([c8e0f53](https://github.com/DIG-Network/dig-propagation-server/commit/c8e0f53feea7d9c25d2427f3f5229529506946dc)) + ### [0.0.1-alpha.101](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.100...v0.0.1-alpha.101) (2024-10-04) ### [0.0.1-alpha.100](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.99...v0.0.1-alpha.100) (2024-10-04) diff --git a/package-lock.json b/package-lock.json index c4ec30d..dc2be05 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "dig-propagation-server", - "version": "0.0.1-alpha.101", + "version": "0.0.1-alpha.102", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "dig-propagation-server", - "version": "0.0.1-alpha.101", + "version": "0.0.1-alpha.102", "license": "ISC", "dependencies": { "@dignetwork/datalayer-driver": "^0.1.28", diff --git a/package.json b/package.json index e689cfd..062fd58 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "dig-propagation-server", - "version": "0.0.1-alpha.101", + "version": "0.0.1-alpha.102", "description": "", "type": "commonjs", "main": "./dist/index.js",