Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/v0.0.1 alpha.102 #96

Merged
merged 4 commits into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.

### [0.0.1-alpha.102](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.101...v0.0.1-alpha.102) (2024-10-05)


### Features

* more agressive propagation algorithm ([c8e0f53](https://github.com/DIG-Network/dig-propagation-server/commit/c8e0f53feea7d9c25d2427f3f5229529506946dc))

### [0.0.1-alpha.101](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.100...v0.0.1-alpha.101) (2024-10-04)

### [0.0.1-alpha.100](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.99...v0.0.1-alpha.100) (2024-10-04)

### [0.0.1-alpha.99](https://github.com/DIG-Network/dig-propagation-server/compare/v0.0.1-alpha.98...v0.0.1-alpha.99) (2024-10-04)
Expand Down
12 changes: 6 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dig-propagation-server",
"version": "0.0.1-alpha.100",
"version": "0.0.1-alpha.102",
"description": "",
"type": "commonjs",
"main": "./dist/index.js",
Expand All @@ -26,7 +26,7 @@
],
"dependencies": {
"@dignetwork/datalayer-driver": "^0.1.28",
"@dignetwork/dig-sdk": "^0.0.1-alpha.131",
"@dignetwork/dig-sdk": "^0.0.1-alpha.133",
"async-mutex": "^0.5.0",
"busboy": "^1.6.0",
"express": "^4.19.2",
Expand Down
1 change: 0 additions & 1 deletion src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ const serverKey = fs.readFileSync(serverKeyPath);
const app = express();
const PORT = Number(process.env.PORT) || 4159;

app.set('trust proxy', true);
app.use(requestIp.mw());

// Apply store routes
Expand Down
193 changes: 157 additions & 36 deletions src/tasks/sync_stores.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import path from 'path';
import { SimpleIntervalJob, Task } from "toad-scheduler";
import {
getStoresList,
Wallet,
DataStore,
DigNetwork,
NconfManager,
ServerCoin,
DigPeer,
} from "@dignetwork/dig-sdk";
import { Mutex } from "async-mutex";
import { getStorageLocation } from "../utils/storage";
Expand All @@ -19,53 +19,162 @@ const mutex = new Mutex();
const PUBLIC_IP_KEY = "publicIp";
const nconfManager = new NconfManager("config.json");

const syncStore = async (storeId: string): Promise<void> => {
console.log(`Starting sync process for store ${storeId}...`);
// Map to track which peerIps have been checked for each rootHash
const checkedPeersMap: Map<string, Set<string>> = new Map();

/**
* Process a single peer: check for rootHash and ping update if necessary.
* @param peerIp - The IP address of the peer.
* @param rootHash - The current rootHash to check against.
* @param checkedPeers - The set of peers already checked for the rootHash.
*/
const processPeer = async (peerIp: string, storeId: string, rootHash: string, checkedPeers: Set<string>): Promise<void> => {
try {
const isUpToDate = await isStoreUpToDate(storeId);
const digPeer = new DigPeer(peerIp, storeId);
const hasRootHash = await digPeer.contentServer.hasRootHash(rootHash);

if (isUpToDate) {
console.log(`Store ${storeId} is already up to date.`);
if (hasRootHash) {
console.log(`Peer ${peerIp} already has rootHash ${rootHash}. Marking as checked.`);
checkedPeers.add(peerIp); // Mark as checked only if peer has the rootHash
} else {
console.log(`Peer ${peerIp} does not have rootHash ${rootHash}. Pinging update.`);
await digPeer.propagationServer.pingUpdate(rootHash);
// Do NOT mark as checked if peer lacks the rootHash
}
} catch (error: any) {
console.error(`Error interacting with peer ${peerIp}: ${error.message}`);
}
};

/**
* Clean the checkedPeersMap to retain only the current rootHash.
* @param currentRootHash - The rootHash to retain in the map.
*/
const cleanCheckedPeersMap = (currentRootHash: string): void => {
for (const [rootHash, _] of checkedPeersMap.entries()) {
if (rootHash !== currentRootHash) {
checkedPeersMap.delete(rootHash);
console.log(`Removed outdated rootHash ${rootHash} from checkedPeersMap.`);
}
}
};

/**
* Handle a store that is already synced by checking peers for updates sequentially.
* @param storeId - The ID of the synced store.
* @param serverCoin - The ServerCoin instance associated with the store.
*/
const handleSyncedStore = async (storeId: string, serverCoin: ServerCoin): Promise<void> => {
try {
const dataStore = await DataStore.from(storeId);
const rootHistory = await dataStore.getRootHistory();

if (rootHistory.length === 0) {
console.warn(`No root history found for store ${storeId}. Skipping peer checks.`);
return;
}

console.log(`Store ${storeId} is out of date. Syncing...`);
await syncStoreFromNetwork(storeId);
// Get the current rootHash from the last entry of rootHistory
const currentRootHashObj = rootHistory[rootHistory.length - 1];
const currentRootHash = currentRootHashObj.root_hash;
console.log(`Current rootHash for store ${storeId}: ${currentRootHash}`);

// Clean checkedPeersMap to only retain peers checked for the current rootHash
cleanCheckedPeersMap(currentRootHash);

// Initialize the set for the current rootHash if not present
if (!checkedPeersMap.has(currentRootHash)) {
checkedPeersMap.set(currentRootHash, new Set<string>());
console.log(`Initialized checkedPeersMap for current rootHash ${currentRootHash}.`);
}

const checkedPeers = checkedPeersMap.get(currentRootHash)!;

// Pass the checkedPeers as the blocklist to getActiveEpochPeers
const blocklist = Array.from(checkedPeers);
const peerIps: string[] = await serverCoin.getActiveEpochPeers(blocklist);
console.log(`Active epoch peers for store ${storeId}:`, peerIps);

if (peerIps.length === 0) {
console.log(`No new peers to process for rootHash ${currentRootHash} in store ${storeId}.`);
return;
}

// Process peers one at a time sequentially
for (const peerIp of peerIps) {
if (checkedPeers.has(peerIp)) {
console.log(`Peer ${peerIp} has already been checked for rootHash ${currentRootHash}. Skipping.`);
continue;
}

await processPeer(peerIp, storeId, currentRootHash, checkedPeers);
}

console.log(`Completed processing peers for rootHash ${currentRootHash} in store ${storeId}.`);

} catch (error: any) {
console.trace(`Error processing store ${storeId}: ${error.message}`);
} finally {
await finalizeStoreSync(storeId);
console.error(`Error handling synced store ${storeId}: ${error.message}`);
}
};

/**
* Synchronize a single store based on its sync status.
* @param storeId - The ID of the store to synchronize.
* @param serverCoin - The ServerCoin instance associated with the store.
*/
const synchronizeStore = async (storeId: string, serverCoin: ServerCoin): Promise<void> => {
console.log(`Starting synchronization for store ${storeId}...`);

const isSynced = await isStoreSynced(storeId);

if (isSynced) {
console.log(`Store ${storeId} is synced. Proceeding with peer checks.`);
await handleSyncedStore(storeId, serverCoin);
} else {
console.log(`Store ${storeId} is not synced. Initiating synchronization from peers.`);
await syncStoreFromNetwork(storeId);
}
};

const isStoreUpToDate = async (storeId: string): Promise<boolean> => {
console.log(`Checking if store ${storeId} is up to date...`);
/**
* Check if the store is synced by verifying root history entries.
* @param storeId - The ID of the store to check.
* @returns A boolean indicating whether the store is up to date.
*/
const isStoreSynced = async (storeId: string): Promise<boolean> => {
console.log(`Checking synchronization status for store ${storeId}...`);
const dataStore = await DataStore.from(storeId);

const rootHistory = await dataStore.getRootHistory();
const storePath = path.join(STORE_PATH, storeId);

if (!fs.existsSync(storePath)) {
console.log(`Store path not found for store ${storeId}.`);
console.log(`Store path not found for store ${storeId}. Considering it as not synced.`);
return false;
}

// Get the count of .dat files in the store directory
const datFiles = fs.readdirSync(storePath).filter(file => file.endsWith(".dat") && !file.includes('manifest'));
const datFileCount = datFiles.length;
// Check if any entry in rootHistory has synced = false
const hasUnsyncedEntries = rootHistory.some(entry => entry.synced === false);

console.log(`Root history count: ${rootHistory.length}, .dat files count: ${datFileCount}`);

return rootHistory.length === datFileCount;
if (hasUnsyncedEntries) {
console.log(`Store ${storeId} has unsynced entries in root history.`);
return false;
} else {
console.log(`All entries in root history for store ${storeId} are synced.`);
return true;
}
};


/**
* Synchronize the store from the network.
* @param storeId - The ID of the store to synchronize.
*/
const syncStoreFromNetwork = async (storeId: string): Promise<void> => {
try {
console.log(`Attempting to sync store ${storeId} from the network...`);
const digNetwork = new DigNetwork(storeId);
await digNetwork.syncStoreFromPeers();
console.log(`Successfully synced store ${storeId} from peers.`);
} catch (error: any) {
console.warn(
`Initial sync attempt failed for ${storeId}: ${error.message}`
Expand All @@ -74,9 +183,14 @@ const syncStoreFromNetwork = async (storeId: string): Promise<void> => {
console.error(`No DIG Peers found for store ${storeId}. Skipping...`);
return;
}
// Optionally, implement retry logic or additional error handling here
}
};

/**
* Finalize the synchronization process for a store.
* @param storeId - The ID of the store to finalize.
*/
const finalizeStoreSync = async (storeId: string): Promise<void> => {
try {
console.log(`Finalizing sync for store ${storeId}...`);
Expand All @@ -88,19 +202,12 @@ const finalizeStoreSync = async (storeId: string): Promise<void> => {
}
};

/**
* Main task to synchronize all stores.
*/
const task = new Task("sync-stores", async () => {
if (!mutex.isLocked()) {
const releaseMutex = await mutex.acquire();
let mnemonic: string | undefined;

try {
const wallet = await Wallet.load("default", false);
mnemonic = await wallet.getMnemonic();
} catch (error: any) {
console.error(`Error in sync-stores task: ${error.message}`);
releaseMutex();
return;
}

try {
console.log("Starting sync-stores task...");
Expand All @@ -121,23 +228,34 @@ const task = new Task("sync-stores", async () => {
console.error(
`Failed to retrieve public IP from configuration: ${error.message}`
);
releaseMutex();
return; // Exit the task if we can't retrieve the public IP
}

for (const storeId of storeList) {
try {
await syncStore(storeId);
if (publicIp) {
const serverCoin = new ServerCoin(storeId);
await serverCoin.ensureServerCoinExists(publicIp);
await serverCoin.meltOutdatedEpochs(publicIp);

// Synchronize the store based on its sync status
await synchronizeStore(storeId, serverCoin);

// After synchronization, ensure the server coin is updated
await serverCoin.ensureServerCoinExists(publicIp);
} else {
console.warn(
`Skipping server coin check for store ${storeId} due to missing public IP.`
`Skipping server coin operations for store ${storeId} due to missing public IP.`
);

// Even if public IP is missing, you might still want to synchronize the store
await synchronizeStore(storeId, new ServerCoin(storeId));
}

// Finalize synchronization
await finalizeStoreSync(storeId);
} catch (error: any) {
console.error(`Failed to sync store ${storeId}: ${error.message}`);
console.error(`Failed to synchronize store ${storeId}: ${error.message}`);
}
}

Expand All @@ -149,9 +267,12 @@ const task = new Task("sync-stores", async () => {
} finally {
releaseMutex();
}
} else {
console.log("Sync-stores task is already running. Skipping this run.");
}
});

// Schedule the task to run every 60 seconds, starting immediately
const job = new SimpleIntervalJob(
{
seconds: 60,
Expand All @@ -161,4 +282,4 @@ const job = new SimpleIntervalJob(
{ id: "sync-stores", preventOverrun: true }
);

export default job;
export default job;
Loading