Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/v0.0.1 alpha.132 #126

Merged
merged 2 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.

### [0.0.1-alpha.132](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.131...v0.0.1-alpha.132) (2024-10-08)

### [0.0.1-alpha.131](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.130...v0.0.1-alpha.131) (2024-10-08)


Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dig-propagation-server",
"version": "0.0.1-alpha.131",
"version": "0.0.1-alpha.132",
"description": "",
"type": "commonjs",
"main": "./dist/index.js",
Expand Down
95 changes: 61 additions & 34 deletions src/tasks/sync_stores.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import fs from "fs";
import path from 'path';
import path from "path";
import { SimpleIntervalJob, Task } from "toad-scheduler";
import {
getStoresList,
Expand All @@ -9,7 +9,7 @@ import {
ServerCoin,
DigPeer,
withTimeout,
PeerRanker
PeerRanker,
} from "@dignetwork/dig-sdk";
import { Mutex } from "async-mutex";
import { getStorageLocation } from "../utils/storage";
Expand All @@ -30,38 +30,44 @@ const checkedPeersMap: Map<string, Set<string>> = new Map();
* @param rootHash - The current rootHash to check against.
* @param checkedPeers - The set of peers already checked for the rootHash.
*/
const processPeer = async (peerIp: string, storeId: string, rootHash: string, checkedPeers: Set<string>): Promise<void> => {
const processPeer = async (
peerIp: string,
storeId: string,
rootHash: string,
checkedPeers: Set<string>
): Promise<void> => {
try {
const digPeer = new DigPeer(peerIp, storeId);
const { storeExists, rootHashExists } = await withTimeout(digPeer.propagationServer.checkStoreExists(rootHash), 15000, `Dig Peer: ${peerIp} took to long to respond to head request`);

if (!storeExists) {
console.log(`Dig Peer ${peerIp} does not have store ${storeId}. Skipping...`);
return;
}

if (rootHashExists) {
console.log(`Dig Peer ${peerIp} already has rootHash ${rootHash}. Marking as checked.`);
checkedPeers.add(peerIp); // Mark as checked only if peer has the rootHash
} else {
console.log(`Dig Peer ${peerIp} does not have ${storeId}-${rootHash}. Pinging update.`);
await withTimeout(digPeer.propagationServer.pingUpdate(rootHash), 15000, `Dig Peer: ${peerIp} took to long to respond to ping request`);
// Do NOT mark as checked if peer lacks the rootHash
}
// Just ping the peer to update, if its already updated then itll mark it as updated
// for this root hash and wont check again, much better then pinging for each root hash for every peer
// before attempting this call.
await withTimeout(
digPeer.propagationServer.pingUpdate(rootHash),
15000,
`Dig Peer: ${peerIp} took to long to respond to ping request`
);
} catch (error: any) {
console.error(`Error interacting with Dig Peer: ${peerIp}: ${error.message}`);
console.error(
`Error interacting with Dig Peer: ${peerIp}: ${error.message}`
);
}
};

/**
* Clean the checkedPeersMap to retain only the current rootHash.
* @param currentRootHash - The rootHash to retain in the map.
*/
const cleanCheckedPeersMap = (storeId: string, currentRootHash: string): void => {
const cleanCheckedPeersMap = (
storeId: string,
currentRootHash: string
): void => {
for (const [rootHash, _] of checkedPeersMap.entries()) {
if (rootHash !== currentRootHash) {
checkedPeersMap.delete(rootHash);
console.log(`Removed outdated rootHash ${storeId}-${rootHash} from checkedPeersMap.`);
console.log(
`Removed outdated rootHash ${storeId}-${rootHash} from checkedPeersMap.`
);
}
}
};
Expand All @@ -71,13 +77,18 @@ const cleanCheckedPeersMap = (storeId: string, currentRootHash: string): void =>
* @param storeId - The ID of the synced store.
* @param serverCoin - The ServerCoin instance associated with the store.
*/
const handleSyncedStore = async (storeId: string, serverCoin: ServerCoin): Promise<void> => {
const handleSyncedStore = async (
storeId: string,
serverCoin: ServerCoin
): Promise<void> => {
try {
const dataStore = await DataStore.from(storeId);
const rootHistory = await dataStore.getRootHistory();

if (rootHistory.length === 0) {
console.warn(`No root history found for store ${storeId}. Skipping peer checks.`);
console.warn(
`No root history found for store ${storeId}. Skipping peer checks.`
);
return;
}

Expand All @@ -92,18 +103,21 @@ const handleSyncedStore = async (storeId: string, serverCoin: ServerCoin): Promi
// Initialize the set for the current rootHash if not present
if (!checkedPeersMap.has(currentRootHash)) {
checkedPeersMap.set(currentRootHash, new Set<string>());
console.log(`Initialized checkedPeersMap for current rootHash ${currentRootHash}.`);
console.log(
`Initialized checkedPeersMap for current rootHash ${currentRootHash}.`
);
}

const checkedPeers = checkedPeersMap.get(currentRootHash)!;

// Pass the checkedPeers as the blocklist to getActiveEpochPeers
const blocklist = Array.from(checkedPeers);
const peerIps: string[] = await serverCoin.getActiveEpochPeers(blocklist);


if (peerIps.length === 0) {
console.log(`No new peers to process for rootHash ${currentRootHash} in store ${storeId}.`);
console.log(
`No new peers to process for rootHash ${currentRootHash} in store ${storeId}.`
);
return;
}

Expand All @@ -115,15 +129,18 @@ const handleSyncedStore = async (storeId: string, serverCoin: ServerCoin): Promi
// Process peers one at a time sequentially, starting with the best-ranked peers
for (const { ip: peerIp } of rankedPeers) {
if (checkedPeers.has(peerIp)) {
console.log(`Peer ${peerIp} has already been checked for rootHash ${currentRootHash}. Skipping.`);
console.log(
`Peer ${peerIp} has already been checked for rootHash ${currentRootHash}. Skipping.`
);
continue;
}

await processPeer(peerIp, storeId, currentRootHash, checkedPeers);
}

console.log(`Completed processing peers for rootHash ${currentRootHash} in store ${storeId}.`);

console.log(
`Completed processing peers for rootHash ${currentRootHash} in store ${storeId}.`
);
} catch (error: any) {
console.error(`Error handling synced store ${storeId}: ${error.message}`);
}
Expand All @@ -134,7 +151,10 @@ const handleSyncedStore = async (storeId: string, serverCoin: ServerCoin): Promi
* @param storeId - The ID of the store to synchronize.
* @param serverCoin - The ServerCoin instance associated with the store.
*/
const synchronizeStore = async (storeId: string, serverCoin: ServerCoin): Promise<void> => {
const synchronizeStore = async (
storeId: string,
serverCoin: ServerCoin
): Promise<void> => {
console.log(`Starting synchronization for store ${storeId}...`);

const isSynced = await isStoreSynced(storeId);
Expand All @@ -143,7 +163,9 @@ const synchronizeStore = async (storeId: string, serverCoin: ServerCoin): Promis
console.log(`Store ${storeId} is synced. Proceeding with peer checks.`);
await handleSyncedStore(storeId, serverCoin);
} else {
console.log(`Store ${storeId} is not synced. Initiating synchronization from peers.`);
console.log(
`Store ${storeId} is not synced. Initiating synchronization from peers.`
);
await syncStoreFromNetwork(storeId);
}
};
Expand All @@ -161,12 +183,16 @@ const isStoreSynced = async (storeId: string): Promise<boolean> => {
const storePath = path.join(STORE_PATH, storeId);

if (!fs.existsSync(storePath)) {
console.log(`Store path not found for store ${storeId}. Considering it as not synced.`);
console.log(
`Store path not found for store ${storeId}. Considering it as not synced.`
);
return false;
}

// Check if any entry in rootHistory has synced = false
const hasUnsyncedEntries = rootHistory.some(entry => entry.synced === false);
const hasUnsyncedEntries = rootHistory.some(
(entry) => entry.synced === false
);

if (hasUnsyncedEntries) {
console.log(`Store ${storeId} has unsynced entries in root history.`);
Expand Down Expand Up @@ -266,7 +292,9 @@ const task = new Task("sync-stores", async () => {
// Finalize synchronization
await finalizeStoreSync(storeId);
} catch (error: any) {
console.error(`Failed to synchronize store ${storeId}: ${error.message}`);
console.error(
`Failed to synchronize store ${storeId}: ${error.message}`
);
}
}

Expand All @@ -283,7 +311,6 @@ const task = new Task("sync-stores", async () => {
}
});


// Schedule the task to run every 60 seconds, starting immediately
const job = new SimpleIntervalJob(
{
Expand Down
Loading