Skip to content

Commit

Permalink
Merge pull request #96 from DIG-Network/release/v0.0.1-alpha.102
Browse files Browse the repository at this point in the history
Release/v0.0.1 alpha.102
  • Loading branch information
MichaelTaylor3D authored Oct 5, 2024
2 parents 9f929bd + 67bfc8b commit 9fd5ace
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 45 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.

### [0.0.1-alpha.102](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.101...v0.0.1-alpha.102) (2024-10-05)


### Features

* more agressive propagation algorithm ([c8e0f53](https://github.com/DIG-Network/dig-propagation-server/commit/c8e0f53feea7d9c25d2427f3f5229529506946dc))

### [0.0.1-alpha.101](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.100...v0.0.1-alpha.101) (2024-10-04)

### [0.0.1-alpha.100](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.99...v0.0.1-alpha.100) (2024-10-04)

### [0.0.1-alpha.99](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.98...v0.0.1-alpha.99) (2024-10-04)
Expand Down
12 changes: 6 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dig-propagation-server",
"version": "0.0.1-alpha.100",
"version": "0.0.1-alpha.102",
"description": "",
"type": "commonjs",
"main": "./dist/index.js",
Expand All @@ -26,7 +26,7 @@
],
"dependencies": {
"@dignetwork/datalayer-driver": "^0.1.28",
"@dignetwork/dig-sdk": "^0.0.1-alpha.131",
"@dignetwork/dig-sdk": "^0.0.1-alpha.133",
"async-mutex": "^0.5.0",
"busboy": "^1.6.0",
"express": "^4.19.2",
Expand Down
1 change: 0 additions & 1 deletion src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ const serverKey = fs.readFileSync(serverKeyPath);
const app = express();
const PORT = Number(process.env.PORT) || 4159;

app.set('trust proxy', true);
app.use(requestIp.mw());

// Apply store routes
Expand Down
193 changes: 157 additions & 36 deletions src/tasks/sync_stores.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import path from 'path';
import { SimpleIntervalJob, Task } from "toad-scheduler";
import {
getStoresList,
Wallet,
DataStore,
DigNetwork,
NconfManager,
ServerCoin,
DigPeer,
} from "@dignetwork/dig-sdk";
import { Mutex } from "async-mutex";
import { getStorageLocation } from "../utils/storage";
Expand All @@ -19,53 +19,162 @@ const mutex = new Mutex();
const PUBLIC_IP_KEY = "publicIp";
const nconfManager = new NconfManager("config.json");

const syncStore = async (storeId: string): Promise<void> => {
console.log(`Starting sync process for store ${storeId}...`);
// Map to track which peerIps have been checked for each rootHash
const checkedPeersMap: Map<string, Set<string>> = new Map();

/**
* Process a single peer: check for rootHash and ping update if necessary.
* @param peerIp - The IP address of the peer.
* @param rootHash - The current rootHash to check against.
* @param checkedPeers - The set of peers already checked for the rootHash.
*/
const processPeer = async (peerIp: string, storeId: string, rootHash: string, checkedPeers: Set<string>): Promise<void> => {
try {
const isUpToDate = await isStoreUpToDate(storeId);
const digPeer = new DigPeer(peerIp, storeId);
const hasRootHash = await digPeer.contentServer.hasRootHash(rootHash);

if (isUpToDate) {
console.log(`Store ${storeId} is already up to date.`);
if (hasRootHash) {
console.log(`Peer ${peerIp} already has rootHash ${rootHash}. Marking as checked.`);
checkedPeers.add(peerIp); // Mark as checked only if peer has the rootHash
} else {
console.log(`Peer ${peerIp} does not have rootHash ${rootHash}. Pinging update.`);
await digPeer.propagationServer.pingUpdate(rootHash);
// Do NOT mark as checked if peer lacks the rootHash
}
} catch (error: any) {
console.error(`Error interacting with peer ${peerIp}: ${error.message}`);
}
};

/**
* Clean the checkedPeersMap to retain only the current rootHash.
* @param currentRootHash - The rootHash to retain in the map.
*/
const cleanCheckedPeersMap = (currentRootHash: string): void => {
for (const [rootHash, _] of checkedPeersMap.entries()) {
if (rootHash !== currentRootHash) {
checkedPeersMap.delete(rootHash);
console.log(`Removed outdated rootHash ${rootHash} from checkedPeersMap.`);
}
}
};

/**
* Handle a store that is already synced by checking peers for updates sequentially.
* @param storeId - The ID of the synced store.
* @param serverCoin - The ServerCoin instance associated with the store.
*/
const handleSyncedStore = async (storeId: string, serverCoin: ServerCoin): Promise<void> => {
try {
const dataStore = await DataStore.from(storeId);
const rootHistory = await dataStore.getRootHistory();

if (rootHistory.length === 0) {
console.warn(`No root history found for store ${storeId}. Skipping peer checks.`);
return;
}

console.log(`Store ${storeId} is out of date. Syncing...`);
await syncStoreFromNetwork(storeId);
// Get the current rootHash from the last entry of rootHistory
const currentRootHashObj = rootHistory[rootHistory.length - 1];
const currentRootHash = currentRootHashObj.root_hash;
console.log(`Current rootHash for store ${storeId}: ${currentRootHash}`);

// Clean checkedPeersMap to only retain peers checked for the current rootHash
cleanCheckedPeersMap(currentRootHash);

// Initialize the set for the current rootHash if not present
if (!checkedPeersMap.has(currentRootHash)) {
checkedPeersMap.set(currentRootHash, new Set<string>());
console.log(`Initialized checkedPeersMap for current rootHash ${currentRootHash}.`);
}

const checkedPeers = checkedPeersMap.get(currentRootHash)!;

// Pass the checkedPeers as the blocklist to getActiveEpochPeers
const blocklist = Array.from(checkedPeers);
const peerIps: string[] = await serverCoin.getActiveEpochPeers(blocklist);
console.log(`Active epoch peers for store ${storeId}:`, peerIps);

if (peerIps.length === 0) {
console.log(`No new peers to process for rootHash ${currentRootHash} in store ${storeId}.`);
return;
}

// Process peers one at a time sequentially
for (const peerIp of peerIps) {
if (checkedPeers.has(peerIp)) {
console.log(`Peer ${peerIp} has already been checked for rootHash ${currentRootHash}. Skipping.`);
continue;
}

await processPeer(peerIp, storeId, currentRootHash, checkedPeers);
}

console.log(`Completed processing peers for rootHash ${currentRootHash} in store ${storeId}.`);

} catch (error: any) {
console.trace(`Error processing store ${storeId}: ${error.message}`);
} finally {
await finalizeStoreSync(storeId);
console.error(`Error handling synced store ${storeId}: ${error.message}`);
}
};

/**
* Synchronize a single store based on its sync status.
* @param storeId - The ID of the store to synchronize.
* @param serverCoin - The ServerCoin instance associated with the store.
*/
const synchronizeStore = async (storeId: string, serverCoin: ServerCoin): Promise<void> => {
console.log(`Starting synchronization for store ${storeId}...`);

const isSynced = await isStoreSynced(storeId);

if (isSynced) {
console.log(`Store ${storeId} is synced. Proceeding with peer checks.`);
await handleSyncedStore(storeId, serverCoin);
} else {
console.log(`Store ${storeId} is not synced. Initiating synchronization from peers.`);
await syncStoreFromNetwork(storeId);
}
};

const isStoreUpToDate = async (storeId: string): Promise<boolean> => {
console.log(`Checking if store ${storeId} is up to date...`);
/**
* Check if the store is synced by verifying root history entries.
* @param storeId - The ID of the store to check.
* @returns A boolean indicating whether the store is up to date.
*/
const isStoreSynced = async (storeId: string): Promise<boolean> => {
console.log(`Checking synchronization status for store ${storeId}...`);
const dataStore = await DataStore.from(storeId);

const rootHistory = await dataStore.getRootHistory();
const storePath = path.join(STORE_PATH, storeId);

if (!fs.existsSync(storePath)) {
console.log(`Store path not found for store ${storeId}.`);
console.log(`Store path not found for store ${storeId}. Considering it as not synced.`);
return false;
}

// Get the count of .dat files in the store directory
const datFiles = fs.readdirSync(storePath).filter(file => file.endsWith(".dat") && !file.includes('manifest'));
const datFileCount = datFiles.length;
// Check if any entry in rootHistory has synced = false
const hasUnsyncedEntries = rootHistory.some(entry => entry.synced === false);

console.log(`Root history count: ${rootHistory.length}, .dat files count: ${datFileCount}`);

return rootHistory.length === datFileCount;
if (hasUnsyncedEntries) {
console.log(`Store ${storeId} has unsynced entries in root history.`);
return false;
} else {
console.log(`All entries in root history for store ${storeId} are synced.`);
return true;
}
};


/**
* Synchronize the store from the network.
* @param storeId - The ID of the store to synchronize.
*/
const syncStoreFromNetwork = async (storeId: string): Promise<void> => {
try {
console.log(`Attempting to sync store ${storeId} from the network...`);
const digNetwork = new DigNetwork(storeId);
await digNetwork.syncStoreFromPeers();
console.log(`Successfully synced store ${storeId} from peers.`);
} catch (error: any) {
console.warn(
`Initial sync attempt failed for ${storeId}: ${error.message}`
Expand All @@ -74,9 +183,14 @@ const syncStoreFromNetwork = async (storeId: string): Promise<void> => {
console.error(`No DIG Peers found for store ${storeId}. Skipping...`);
return;
}
// Optionally, implement retry logic or additional error handling here
}
};

/**
* Finalize the synchronization process for a store.
* @param storeId - The ID of the store to finalize.
*/
const finalizeStoreSync = async (storeId: string): Promise<void> => {
try {
console.log(`Finalizing sync for store ${storeId}...`);
Expand All @@ -88,19 +202,12 @@ const finalizeStoreSync = async (storeId: string): Promise<void> => {
}
};

/**
* Main task to synchronize all stores.
*/
const task = new Task("sync-stores", async () => {
if (!mutex.isLocked()) {
const releaseMutex = await mutex.acquire();
let mnemonic: string | undefined;

try {
const wallet = await Wallet.load("default", false);
mnemonic = await wallet.getMnemonic();
} catch (error: any) {
console.error(`Error in sync-stores task: ${error.message}`);
releaseMutex();
return;
}

try {
console.log("Starting sync-stores task...");
Expand All @@ -121,23 +228,34 @@ const task = new Task("sync-stores", async () => {
console.error(
`Failed to retrieve public IP from configuration: ${error.message}`
);
releaseMutex();
return; // Exit the task if we can't retrieve the public IP
}

for (const storeId of storeList) {
try {
await syncStore(storeId);
if (publicIp) {
const serverCoin = new ServerCoin(storeId);
await serverCoin.ensureServerCoinExists(publicIp);
await serverCoin.meltOutdatedEpochs(publicIp);

// Synchronize the store based on its sync status
await synchronizeStore(storeId, serverCoin);

// After synchronization, ensure the server coin is updated
await serverCoin.ensureServerCoinExists(publicIp);
} else {
console.warn(
`Skipping server coin check for store ${storeId} due to missing public IP.`
`Skipping server coin operations for store ${storeId} due to missing public IP.`
);

// Even if public IP is missing, you might still want to synchronize the store
await synchronizeStore(storeId, new ServerCoin(storeId));
}

// Finalize synchronization
await finalizeStoreSync(storeId);
} catch (error: any) {
console.error(`Failed to sync store ${storeId}: ${error.message}`);
console.error(`Failed to synchronize store ${storeId}: ${error.message}`);
}
}

Expand All @@ -149,9 +267,12 @@ const task = new Task("sync-stores", async () => {
} finally {
releaseMutex();
}
} else {
console.log("Sync-stores task is already running. Skipping this run.");
}
});

// Schedule the task to run every 60 seconds, starting immediately
const job = new SimpleIntervalJob(
{
seconds: 60,
Expand All @@ -161,4 +282,4 @@ const job = new SimpleIntervalJob(
{ id: "sync-stores", preventOverrun: true }
);

export default job;
export default job;

0 comments on commit 9fd5ace

Please sign in to comment.