From c67d4c683c7dc66645941acfa6e1f04470f55693 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Tue, 8 Oct 2024 17:05:14 -0400 Subject: [PATCH 1/2] chore: update sdk --- src/tasks/sync_stores.ts | 95 ++++++++++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 34 deletions(-) diff --git a/src/tasks/sync_stores.ts b/src/tasks/sync_stores.ts index 1cc3048..8492ef2 100644 --- a/src/tasks/sync_stores.ts +++ b/src/tasks/sync_stores.ts @@ -1,5 +1,5 @@ import fs from "fs"; -import path from 'path'; +import path from "path"; import { SimpleIntervalJob, Task } from "toad-scheduler"; import { getStoresList, @@ -9,7 +9,7 @@ import { ServerCoin, DigPeer, withTimeout, - PeerRanker + PeerRanker, } from "@dignetwork/dig-sdk"; import { Mutex } from "async-mutex"; import { getStorageLocation } from "../utils/storage"; @@ -30,26 +30,27 @@ const checkedPeersMap: Map> = new Map(); * @param rootHash - The current rootHash to check against. * @param checkedPeers - The set of peers already checked for the rootHash. */ -const processPeer = async (peerIp: string, storeId: string, rootHash: string, checkedPeers: Set): Promise => { +const processPeer = async ( + peerIp: string, + storeId: string, + rootHash: string, + checkedPeers: Set +): Promise => { try { const digPeer = new DigPeer(peerIp, storeId); - const { storeExists, rootHashExists } = await withTimeout(digPeer.propagationServer.checkStoreExists(rootHash), 15000, `Dig Peer: ${peerIp} took to long to respond to head request`); - if (!storeExists) { - console.log(`Dig Peer ${peerIp} does not have store ${storeId}. Skipping...`); - return; - } - - if (rootHashExists) { - console.log(`Dig Peer ${peerIp} already has rootHash ${rootHash}. Marking as checked.`); - checkedPeers.add(peerIp); // Mark as checked only if peer has the rootHash - } else { - console.log(`Dig Peer ${peerIp} does not have ${storeId}-${rootHash}. Pinging update.`); - await withTimeout(digPeer.propagationServer.pingUpdate(rootHash), 15000, `Dig Peer: ${peerIp} took to long to respond to ping request`); - // Do NOT mark as checked if peer lacks the rootHash - } + // Just ping the peer to update, if its already updated then itll mark it as updated + // for this root hash and wont check again, much better then pinging for each root hash for every peer + // before attempting this call. + await withTimeout( + digPeer.propagationServer.pingUpdate(rootHash), + 15000, + `Dig Peer: ${peerIp} took to long to respond to ping request` + ); } catch (error: any) { - console.error(`Error interacting with Dig Peer: ${peerIp}: ${error.message}`); + console.error( + `Error interacting with Dig Peer: ${peerIp}: ${error.message}` + ); } }; @@ -57,11 +58,16 @@ const processPeer = async (peerIp: string, storeId: string, rootHash: string, ch * Clean the checkedPeersMap to retain only the current rootHash. * @param currentRootHash - The rootHash to retain in the map. */ -const cleanCheckedPeersMap = (storeId: string, currentRootHash: string): void => { +const cleanCheckedPeersMap = ( + storeId: string, + currentRootHash: string +): void => { for (const [rootHash, _] of checkedPeersMap.entries()) { if (rootHash !== currentRootHash) { checkedPeersMap.delete(rootHash); - console.log(`Removed outdated rootHash ${storeId}-${rootHash} from checkedPeersMap.`); + console.log( + `Removed outdated rootHash ${storeId}-${rootHash} from checkedPeersMap.` + ); } } }; @@ -71,13 +77,18 @@ const cleanCheckedPeersMap = (storeId: string, currentRootHash: string): void => * @param storeId - The ID of the synced store. * @param serverCoin - The ServerCoin instance associated with the store. */ -const handleSyncedStore = async (storeId: string, serverCoin: ServerCoin): Promise => { +const handleSyncedStore = async ( + storeId: string, + serverCoin: ServerCoin +): Promise => { try { const dataStore = await DataStore.from(storeId); const rootHistory = await dataStore.getRootHistory(); if (rootHistory.length === 0) { - console.warn(`No root history found for store ${storeId}. Skipping peer checks.`); + console.warn( + `No root history found for store ${storeId}. Skipping peer checks.` + ); return; } @@ -92,7 +103,9 @@ const handleSyncedStore = async (storeId: string, serverCoin: ServerCoin): Promi // Initialize the set for the current rootHash if not present if (!checkedPeersMap.has(currentRootHash)) { checkedPeersMap.set(currentRootHash, new Set()); - console.log(`Initialized checkedPeersMap for current rootHash ${currentRootHash}.`); + console.log( + `Initialized checkedPeersMap for current rootHash ${currentRootHash}.` + ); } const checkedPeers = checkedPeersMap.get(currentRootHash)!; @@ -100,10 +113,11 @@ const handleSyncedStore = async (storeId: string, serverCoin: ServerCoin): Promi // Pass the checkedPeers as the blocklist to getActiveEpochPeers const blocklist = Array.from(checkedPeers); const peerIps: string[] = await serverCoin.getActiveEpochPeers(blocklist); - if (peerIps.length === 0) { - console.log(`No new peers to process for rootHash ${currentRootHash} in store ${storeId}.`); + console.log( + `No new peers to process for rootHash ${currentRootHash} in store ${storeId}.` + ); return; } @@ -115,15 +129,18 @@ const handleSyncedStore = async (storeId: string, serverCoin: ServerCoin): Promi // Process peers one at a time sequentially, starting with the best-ranked peers for (const { ip: peerIp } of rankedPeers) { if (checkedPeers.has(peerIp)) { - console.log(`Peer ${peerIp} has already been checked for rootHash ${currentRootHash}. Skipping.`); + console.log( + `Peer ${peerIp} has already been checked for rootHash ${currentRootHash}. Skipping.` + ); continue; } await processPeer(peerIp, storeId, currentRootHash, checkedPeers); } - console.log(`Completed processing peers for rootHash ${currentRootHash} in store ${storeId}.`); - + console.log( + `Completed processing peers for rootHash ${currentRootHash} in store ${storeId}.` + ); } catch (error: any) { console.error(`Error handling synced store ${storeId}: ${error.message}`); } @@ -134,7 +151,10 @@ const handleSyncedStore = async (storeId: string, serverCoin: ServerCoin): Promi * @param storeId - The ID of the store to synchronize. * @param serverCoin - The ServerCoin instance associated with the store. */ -const synchronizeStore = async (storeId: string, serverCoin: ServerCoin): Promise => { +const synchronizeStore = async ( + storeId: string, + serverCoin: ServerCoin +): Promise => { console.log(`Starting synchronization for store ${storeId}...`); const isSynced = await isStoreSynced(storeId); @@ -143,7 +163,9 @@ const synchronizeStore = async (storeId: string, serverCoin: ServerCoin): Promis console.log(`Store ${storeId} is synced. Proceeding with peer checks.`); await handleSyncedStore(storeId, serverCoin); } else { - console.log(`Store ${storeId} is not synced. Initiating synchronization from peers.`); + console.log( + `Store ${storeId} is not synced. Initiating synchronization from peers.` + ); await syncStoreFromNetwork(storeId); } }; @@ -161,12 +183,16 @@ const isStoreSynced = async (storeId: string): Promise => { const storePath = path.join(STORE_PATH, storeId); if (!fs.existsSync(storePath)) { - console.log(`Store path not found for store ${storeId}. Considering it as not synced.`); + console.log( + `Store path not found for store ${storeId}. Considering it as not synced.` + ); return false; } // Check if any entry in rootHistory has synced = false - const hasUnsyncedEntries = rootHistory.some(entry => entry.synced === false); + const hasUnsyncedEntries = rootHistory.some( + (entry) => entry.synced === false + ); if (hasUnsyncedEntries) { console.log(`Store ${storeId} has unsynced entries in root history.`); @@ -266,7 +292,9 @@ const task = new Task("sync-stores", async () => { // Finalize synchronization await finalizeStoreSync(storeId); } catch (error: any) { - console.error(`Failed to synchronize store ${storeId}: ${error.message}`); + console.error( + `Failed to synchronize store ${storeId}: ${error.message}` + ); } } @@ -283,7 +311,6 @@ const task = new Task("sync-stores", async () => { } }); - // Schedule the task to run every 60 seconds, starting immediately const job = new SimpleIntervalJob( { From 8e84105d77607180b2aa815d3624ffc4e7520ab1 Mon Sep 17 00:00:00 2001 From: Michael Taylor Date: Tue, 8 Oct 2024 17:05:38 -0400 Subject: [PATCH 2/2] chore(release): 0.0.1-alpha.132 --- CHANGELOG.md | 2 ++ package-lock.json | 4 ++-- package.json | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b08699b..51d0fec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +### [0.0.1-alpha.132](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.131...v0.0.1-alpha.132) (2024-10-08) + ### [0.0.1-alpha.131](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.130...v0.0.1-alpha.131) (2024-10-08) diff --git a/package-lock.json b/package-lock.json index 0483313..a6f0217 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "dig-propagation-server", - "version": "0.0.1-alpha.131", + "version": "0.0.1-alpha.132", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "dig-propagation-server", - "version": "0.0.1-alpha.131", + "version": "0.0.1-alpha.132", "license": "ISC", "dependencies": { "@dignetwork/datalayer-driver": "^0.1.28", diff --git a/package.json b/package.json index 4e0bd2c..7c70f1a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "dig-propagation-server", - "version": "0.0.1-alpha.131", + "version": "0.0.1-alpha.132", "description": "", "type": "commonjs", "main": "./dist/index.js",