From c154628534bc1c61eb7f1350ac3fbc546e035c2c Mon Sep 17 00:00:00 2001 From: Owen Date: Sat, 4 Aug 2018 00:57:46 +0900 Subject: [PATCH] Gregarious Goose --- src/consensus/consensus.ts | 7 +- src/consensus/database/database.ts | 14 - src/consensus/database/worldState.ts | 3 + src/consensus/iconsensus.ts | 2 +- src/consensus/sync.ts | 425 +++++-------------------- src/miner/minerServer.ts | 2 +- src/network/ipeer.ts | 9 +- src/network/rabbit/networkConstants.ts | 26 ++ src/network/rabbit/rabbitPeer.ts | 210 ++++++++++-- src/network/rabbit/socketParser.ts | 19 +- src/server.ts | 32 +- 11 files changed, 324 insertions(+), 425 deletions(-) create mode 100644 src/network/rabbit/networkConstants.ts diff --git a/src/consensus/consensus.ts b/src/consensus/consensus.ts index 6543b649..ca65cacb 100644 --- a/src/consensus/consensus.ts +++ b/src/consensus/consensus.ts @@ -9,6 +9,7 @@ import { DelayQueue } from "../common/delayQueue" import { ITxPool } from "../common/itxPool" import { SignedTx } from "../common/txSigned" import { globalOptions } from "../main" +import { MAX_HEADER_SIZE } from "../network/rabbit/networkConstants" import { Hash } from "../util/hash" import { Account } from "./database/account" import { Database } from "./database/database" @@ -21,7 +22,7 @@ import { TxDatabase } from "./database/txDatabase" import { TxValidity, WorldState } from "./database/worldState" import { DifficultyAdjuster } from "./difficultyAdjuster" import { IConsensus, IStatusChange } from "./iconsensus" -import { BlockStatus, MAX_HEADER_SIZE } from "./sync" +import { BlockStatus } from "./sync" import { Verify } from "./verify" const logger = getLogger("Consensus") @@ -318,7 +319,7 @@ export class Consensus extends EventEmitter implements IConsensus { + ` ${hash}(${dbBlock.height}, ${dbBlock.totalWork.toExponential()}),` + ` BTip(${this.blockTip.height}, ${this.blockTip.totalWork.toExponential()}),` + ` HTip(${this.headerTip.height}, ${this.headerTip.totalWork.toExponential()})`) - return { oldStatus, status } + return { oldStatus, status, height: dbBlock.height } } if (block !== undefined && (this.blockTip === undefined || this.forkChoice(dbBlock, this.blockTip))) { @@ -332,7 +333,7 @@ export class Consensus extends EventEmitter implements IConsensus { + ` BTip(${this.blockTip.height}, ${this.blockTip.totalWork.toExponential()}),` + ` HTip(${this.headerTip.height}, ${this.headerTip.totalWork.toExponential()})`) - return { oldStatus, status } + return { oldStatus, status, height: dbBlock.height } }) } private async process(hash: Hash, header: BlockHeader, block?: Block): Promise { diff --git a/src/consensus/database/database.ts b/src/consensus/database/database.ts index 863e0f78..c3845573 100644 --- a/src/consensus/database/database.ts +++ b/src/consensus/database/database.ts @@ -1,7 +1,6 @@ import levelup = require("levelup") import { getLogger } from "log4js" import rocksdb = require("rocksdb") -import { AsyncLock } from "../../common/asyncLock" import { AnyBlock, Block } from "../../common/block" import { GenesisBlock } from "../../common/blockGenesis" import { Hash } from "../../util/hash" @@ -11,15 +10,6 @@ import { DBBlock } from "./dbblock" const logger = getLogger("Database") -function uint8ArrayEqual(first: Uint8Array, second: Uint8Array): boolean { - if (first.length !== second.length) { return false } - for (let i = 0; i < second.length; i++) { - if (first[i] !== second[i]) { - return false - } - } - return true -} export class DecodeError extends Error { public hash: Hash } @@ -29,16 +19,12 @@ export class DecodeError extends Error { export class Database { private database: levelup.LevelUp private blockFile: BlockFile - private headerLock: AsyncLock - private blockLock: AsyncLock private fileNumber: number constructor(dbPath: string, filePath: string) { const rocks: any = rocksdb(dbPath) this.database = levelup(rocks) - this.headerLock = new AsyncLock() - this.blockLock = new AsyncLock() this.blockFile = new BlockFile(filePath) } diff --git a/src/consensus/database/worldState.ts b/src/consensus/database/worldState.ts index af311c00..4ca639af 100644 --- a/src/consensus/database/worldState.ts +++ b/src/consensus/database/worldState.ts @@ -88,6 +88,9 @@ export class WorldState { } } public async validateTx(stateRoot: Hash, tx: SignedTx): Promise { + if (tx.from.equals(tx.to)) { + return TxValidity.Invalid + } const fromAccount = await this.getAccount(stateRoot, tx.from) if (fromAccount === undefined || fromAccount.nonce >= tx.nonce) { return TxValidity.Invalid diff --git a/src/consensus/iconsensus.ts b/src/consensus/iconsensus.ts index 7816db6e..9a0d6898 100644 --- a/src/consensus/iconsensus.ts +++ b/src/consensus/iconsensus.ts @@ -13,7 +13,7 @@ import { DBMined } from "./database/dbMined" import { DBTx } from "./database/dbtx" import { TxValidity } from "./database/worldState" -export interface IStatusChange { oldStatus?: BlockStatus, status?: BlockStatus } +export interface IStatusChange { oldStatus?: BlockStatus, status?: BlockStatus, height?: number } export type AnySignedTx = (GenesisSignedTx | SignedTx) diff --git a/src/consensus/sync.ts b/src/consensus/sync.ts index c2b4c3cd..9ab468f4 100644 --- a/src/consensus/sync.ts +++ b/src/consensus/sync.ts @@ -1,36 +1,10 @@ import { getLogger } from "log4js" -import { AnyBlock, Block } from "../common/block" -import { BlockHeader } from "../common/blockHeader" -import { BaseBlockHeader } from "../common/genesisHeader" +import { INetwork } from "../network/inetwork" import { IPeer } from "../network/ipeer" -import { MAX_PACKET_SIZE } from "../network/rabbit/socketParser" import { Hash } from "../util/hash" -import { IConsensus, IStatusChange } from "./iconsensus" +import { IConsensus } from "./iconsensus" const logger = getLogger("Sync") -export const REPEATED_OVERHEAD = 6 -export const BYTES_OVERHEAD = 6 - -export const HASH_SIZE = 32 - -// Header sizes -export const DIFFICULTY_SIZE = 4 -export const TIMESTAMP_SIZE = 8 -export const NONCE_SIZE = 8 -export const MINER_SIZE = 20 - -// Tx sizes -export const AMOUNT_SIZE = 8 -export const FEE_SIZE = 8 -export const TX_NONCE_SIZE = 4 -export const SIGNATURE_SIZE = 64 -export const RECOVERY_SIZE = 4 - -export const MAX_HEADER_SIZE = 3 * (HASH_SIZE + BYTES_OVERHEAD) + REPEATED_OVERHEAD + DIFFICULTY_SIZE + TIMESTAMP_SIZE + NONCE_SIZE + MINER_SIZE + BYTES_OVERHEAD -export const MAX_TX_SIZE = 2 * (HASH_SIZE + BYTES_OVERHEAD) + AMOUNT_SIZE + FEE_SIZE + TX_NONCE_SIZE + SIGNATURE_SIZE + BYTES_OVERHEAD + RECOVERY_SIZE -export const MAX_TXS_PER_BLOCK = 4096 -export const MAX_BLOCK_SIZE = MAX_HEADER_SIZE + MAX_TXS_PER_BLOCK * MAX_TX_SIZE - export enum BlockStatus { Rejected = -1, Nothing = 0, @@ -39,343 +13,116 @@ export enum BlockStatus { MainChain = 3, } -const headerCount = Math.floor(MAX_PACKET_SIZE / MAX_HEADER_SIZE) -const blockCount = Math.floor(MAX_PACKET_SIZE / MAX_BLOCK_SIZE) - -export interface ITip { - hash: Hash, - height: number, - totalwork: number -} - -interface ISyncInfo { - hash: Hash, - height: number - status: BlockStatus -} - -export class Sync { - public peerInfo: { peer: IPeer, tip: ITip } - private version: number - private consensus: IConsensus - private differentHeight: number - private commonMainChain: ISyncInfo - private commonBlock: ISyncInfo - private commonHeader: ISyncInfo - - constructor(peerInfo: { peer: IPeer, tip: ITip }, consensus: IConsensus, version: number) { - this.peerInfo = peerInfo - this.consensus = consensus - this.version = version - } - - public async sync() { - logger.debug(`Start Syncing`) - let remoteVersion: number - try { - const localBlockTip = this.consensus.getBlocksTip() - const localHeaderTip = this.consensus.getHeadersTip() - - logger.debug(`Get remote tip`) - - const remoteHeaderTip = this.peerInfo.tip - const remoteBlockTip = await this.peerInfo.peer.getTip() - remoteVersion = (await this.peerInfo.peer.status()).version - - if (remoteVersion < 4) { - logger.debug("Peer is an old version, using fallback sync") - remoteBlockTip.totalwork = 0 - remoteHeaderTip.totalwork = 0 - } - - if (!localHeaderTip.hash.equals(remoteHeaderTip.hash)) { - logger.debug(`Local=${localHeaderTip.height}:${localHeaderTip.hash} Remote=${remoteHeaderTip.height}:${remoteHeaderTip.hash}`) - } - - await this.findCommons(localBlockTip, remoteBlockTip) - - const syncHeaderResult = await this.syncHeaders(remoteHeaderTip, localHeaderTip, remoteVersion) - if (!syncHeaderResult.ahead) { - return - } - - if (remoteVersion > 5 && this.version > 5) { - await this.syncTxs(remoteBlockTip, localBlockTip, localHeaderTip, remoteVersion) - } else { - await this.syncBlocks(syncHeaderResult.startHeaderHeight, remoteBlockTip, localBlockTip, remoteVersion) - } - - this.peerInfo = undefined - } catch (e) { - logger.warn(`Syncing ${this.peerInfo.peer.getInfo()} failed: ${e}`) - } - return - } - - private async syncHeaders(remoteHeaderTip: ITip, localHeaderTip: ITip, remoteVersion: number) { - let remoteHeaderWork: number - let localHeaderWork: number - if (remoteVersion > 7) { - remoteHeaderWork = remoteHeaderTip.totalwork - localHeaderWork = localHeaderTip.totalwork - } else { - remoteHeaderWork = remoteHeaderTip.height - remoteHeaderTip.totalwork - localHeaderWork = localHeaderTip.height - (1 / localHeaderTip.totalwork) - } - const startHeaderHeight = await this.findStartHeader() - logger.debug(`Find Start Header=${startHeaderHeight}`) - if (remoteHeaderWork > localHeaderWork) { - logger.debug(`Receiving Headers from ${this.peerInfo.peer.getInfo()}`) - await this.getHeaders(startHeaderHeight) - } - return { startHeaderHeight, ahead: remoteHeaderWork > localHeaderWork } - } - - private async syncBlocks(startHeaderHeight: number, remoteBlockTip: ITip, localBlockTip: ITip, remoteVersion: number) { - let remoteBlockWork: number - let localBlockWork: number - if (remoteVersion > 7) { - remoteBlockWork = remoteBlockTip.totalwork - localBlockWork = localBlockTip.totalwork - } else { - remoteBlockWork = remoteBlockTip.height - remoteBlockTip.totalwork - localBlockWork = localBlockTip.height - (1 / localBlockTip.totalwork) - } - const startBlockHeight = await this.findStartBlock(startHeaderHeight) - logger.debug(`Find Start Block=${startBlockHeight}`) - if (remoteBlockWork > localBlockWork) { - logger.debug(`Receiving Blocks from ${this.peerInfo.peer.getInfo()}`) - await this.getBlocks(startBlockHeight) - } +class EliminationRace { + private promises: Array> + constructor() { + this.promises = [] } - private async syncTxs(remoteBlockTip: ITip, localBlockTip: ITip, localHeaderTip: ITip, remoteVersion: number) { - let remoteBlockWork: number - let localBlockWork: number - if (remoteVersion > 7) { - remoteBlockWork = remoteBlockTip.totalwork - localBlockWork = localBlockTip.totalwork - } else { - remoteBlockWork = remoteBlockTip.height - remoteBlockTip.totalwork - localBlockWork = localBlockTip.height - (1 / localBlockTip.totalwork) - } - if (remoteBlockWork > localBlockWork) { - const blockHashes = await this.scanHeaders(localHeaderTip) - if (blockHashes.length > 0) { - logger.debug(`Receiving transactions from ${this.peerInfo.peer.getInfo()}`) - return await this.getTxBlocks(blockHashes) - } - } + public add(promise: Promise) { + this.promises.push(promise) + promise.then((v) => this.remove(promise)) + promise.catch((e) => this.remove(promise)) } - private async scanHeaders(headerTip: ITip) { - const blockHashes: Hash[] = [] - let header = await this.consensus.getHeaderByHash(headerTip.hash) - let status: BlockStatus - do { - status = await this.consensus.getBlockStatus(new Hash(header)) - if (status >= BlockStatus.Block) { break } - if (!(header instanceof BlockHeader)) { continue } - if (!header.merkleRoot.equals(Hash.emptyHash)) { - blockHashes.push(new Hash(header)) - } - if (blockHashes.length > headerCount) { - blockHashes.shift() + public race(test: (t: T) => boolean = () => true) { + return new Promise(async (resolve, reject) => { + while (this.promises.length > 0) { + try { + const p = Promise.race(this.promises) + const value = await p + if (test(value)) { + resolve(value) + } + } catch (e) { + logger.debug(e) + } } - header = await this.consensus.getHeaderByHash(header.previousHash[0]) - } while (status < BlockStatus.Block) - blockHashes.reverse() - return blockHashes + reject(`No promise resolved`) + }) } - private async findCommons(localTip: ITip, remoteTip: ITip) { - let startHeight: number - if (remoteTip.height <= localTip.height) { - this.differentHeight = remoteTip.height - if (await this.updateCommons(remoteTip.height, remoteTip.hash)) { - return + private remove(promise: Promise) { + let i = 0 + while (i < this.promises.length) { + if (promise === this.promises[i]) { + this.promises.splice(i, 1) + } else { + i++ } - startHeight = remoteTip.height - 1 - } else if (remoteTip.height > localTip.height) { - this.differentHeight = remoteTip.height - startHeight = localTip.height } + } +} - let i = 1 - let height = startHeight - while (height > 0) { - if (await this.updateCommons(height)) { - return - } - height = startHeight - i - i *= 2 - } +export interface ITip { + hash: Hash, + height: number, + totalwork: number +} - if (!(await this.updateCommons(0))) { - logger.fatal("Peer is different genesis block") - throw new Error("Peer using different genesis block tried to sync") - } +export interface ICandidatePeer { + peer: IPeer + tip: { + hash: Hash; + height: number; + totalwork: number; } +} - // hash: peer hash of this height - private async updateCommons(height: number, hash?: Hash) { - if (hash === undefined) { - hash = await this.peerInfo.peer.getHash(height) - } - const status = await this.consensus.getBlockStatus(hash) - const syncInfo = { status, height, hash } - switch (status) { - case BlockStatus.MainChain: - if (this.commonMainChain === undefined) { - this.commonMainChain = syncInfo - } - // MainChain implies Block - case BlockStatus.Block: - if (this.commonBlock === undefined) { - this.commonBlock = syncInfo - } - // Block implies Header - case BlockStatus.Header: - if (this.commonHeader === undefined) { - this.commonHeader = syncInfo - } - return (this.commonMainChain !== undefined) && (this.commonBlock !== undefined) - case BlockStatus.Nothing: - this.differentHeight = syncInfo.height - return (this.commonMainChain !== undefined) && (this.commonBlock !== undefined) && (this.commonHeader !== undefined) - case BlockStatus.Rejected: - logger.fatal("Peer is using rejected block") - throw new Error("Rejected block found during sync") - } +// tslint:disable-next-line:max-classes-per-file +export class Sync { + private go: boolean + private consensus: IConsensus + private network: INetwork + constructor(consensus: IConsensus, network: INetwork) { + this.consensus = consensus + this.network = network + this.go = false } - private async findStartHeader(): Promise { - let height = this.differentHeight - let min = this.commonHeader.height - while (min + 1 < height) { - // tslint:disable-next-line:no-bitwise - const mid = (min + height) >> 1 - const hash = await this.peerInfo.peer.getHash(mid) - switch (await this.consensus.getBlockStatus(hash)) { - case BlockStatus.MainChain: - case BlockStatus.Block: - case BlockStatus.Header: - min = mid - break - case BlockStatus.Nothing: - height = mid - break - case BlockStatus.Rejected: - logger.fatal("Peer is using rejected block") - throw new Error("Rejected block found during sync") - } - } - if (height < 1) { - height = 1 - } - return height + public start(delay: number = 1000) { + this.go = true + setImmediate(() => this.headerSync(delay)) + setImmediate(() => this.blockSync(delay)) } - private async getHeaders(height: number) { - let headers: BaseBlockHeader[] - try { - do { - headers = await this.peerInfo.peer.getHeadersByRange(height, headerCount) - for (const header of headers) { - if (!(header instanceof BlockHeader)) { - continue - } - const result = await this.consensus.putHeader(header) - - if (result.status === undefined || result.status < BlockStatus.Header) { - throw new Error(`Header Rejected ${result.status}`) - } - - if (result.oldStatus >= result.status) { - logger.debug(`Received header has already been received`) - break - } - } - height += headers.length - } while (headers.length > 0) - } catch (e) { - throw new Error(`Could not completely sync headers: ${e}`) - } + public stop(delay: number = 1000) { + this.go = false } - - private async getTxBlocks(blockHashes: Hash[]) { - const requestSize = blockHashes.length * (HASH_SIZE + BYTES_OVERHEAD) + REPEATED_OVERHEAD - const numRequests = Math.ceil(requestSize / MAX_PACKET_SIZE) - const statusChanges: IStatusChange[] = [] + private async headerSync(delay: number = 1000) { try { - const txBlocksPerRequest = Math.floor(blockHashes.length / numRequests) - for (let i = 0; i < numRequests; i++) { - const requestHashes = blockHashes.slice(i * txBlocksPerRequest, (i + 1) * txBlocksPerRequest) - while (requestHashes.length > 0) { - const receivedTxBlock = await this.peerInfo.peer.getBlockTxs(requestHashes) - requestHashes.splice(0, receivedTxBlock.length) - const changes = await this.consensus.putTxBlocks(receivedTxBlock) - statusChanges.concat(changes) - if (!changes.every((change) => { - return change.oldStatus < change.status - })) { - logger.debug(`Received txBlock has already been received`) - return statusChanges - } - } + const promise = new EliminationRace() + for (const peer of await this.network.getPeers()) { + promise.add(peer.getBTip().then((tip) => ({ peer, tip })).catch((e) => logger.debug(e))) } - return statusChanges + const filter = (peer: ICandidatePeer) => peer !== undefined && peer.tip !== undefined && peer.tip.totalwork > this.consensus.getHtip().totalWork + const { peer, tip } = await promise.race(filter) + await peer.headerSync(tip) } catch (e) { - throw new Error(`Could not completely sync txs: ${e}`) - } - } - private async findStartBlock(height: number): Promise { - let min = this.commonBlock.height - while (min + 1 < height) { - // tslint:disable-next-line:no-bitwise - const mid = (min + height) >> 1 - const hash = await this.peerInfo.peer.getHash(mid) - switch (await this.consensus.getBlockStatus(hash)) { - case BlockStatus.MainChain: - case BlockStatus.Block: - min = mid - break - case BlockStatus.Header: - case BlockStatus.Nothing: - height = mid - break - case BlockStatus.Rejected: - logger.fatal("Peer is using rejected block") - throw new Error("Rejected block found during sync") - } + logger.debug(e) } - if (height < 1) { - height = 1 + if (this.go) { + setTimeout(() => this.headerSync().catch((e) => logger.fatal(`??? ${e}`)), delay) } - return height } - - private async getBlocks(height: number) { - let blocks: AnyBlock[] + private async blockSync(delay: number = 1000) { try { - do { - blocks = await this.peerInfo.peer.getBlocksByRange(height, blockCount) - for (const block of blocks) { - if (block instanceof Block) { - const result = await this.consensus.putBlock(block) - if (result.status < BlockStatus.Block) { - throw new Error(`Block rejected ${status}`) - } - if (result.oldStatus >= result.status) { - logger.debug(`Received block has already been received`) - return - } - } - } - height += blocks.length - } while (blocks.length > 0) + const promise = new EliminationRace() + for (const peer of await this.network.getPeers()) { + promise.add(peer.getBTip().then((tip) => ({ peer, tip })).catch((e) => logger.debug(e))) + } + const filter = (peer: ICandidatePeer) => peer !== undefined && peer.tip !== undefined && peer.tip.totalwork > this.consensus.getBtip().totalWork + const { peer, tip } = await promise.race(filter) + if (peer.getVersion() > 5) { + await peer.txSync(tip) + } else { + await peer.blockSync(tip) + } } catch (e) { - throw new Error(`Could not completely sync blocks: ${e}`) + logger.debug(e) + } + if (this.go) { + setTimeout(() => this.blockSync().catch((e) => logger.fatal(`??? ${e}`)), delay) } } } diff --git a/src/miner/minerServer.ts b/src/miner/minerServer.ts index 083cf3c1..5f910019 100644 --- a/src/miner/minerServer.ts +++ b/src/miner/minerServer.ts @@ -72,7 +72,7 @@ export class MinerServer { } if (!globalOptions.bootstrap && ((Date.now() - previousDBBlock.header.timeStamp) > 86400000)) { - logger.info("Last block is more than a day old, waiting for synchronization prior to mining.") + logger.debug("Last block is more than a day old, waiting for synchronization prior to mining.") return } diff --git a/src/network/ipeer.ts b/src/network/ipeer.ts index 89c4baa6..67f71f52 100644 --- a/src/network/ipeer.ts +++ b/src/network/ipeer.ts @@ -2,6 +2,7 @@ import { Block } from "../common/block" import { AnyBlockHeader } from "../common/blockHeader" import { SignedTx } from "../common/txSigned" +import { ITip } from "../consensus/sync" import * as proto from "../serialization/proto" import { IStatus } from "../serialization/proto" import { IPeer } from "../serialization/proto" @@ -10,6 +11,7 @@ import { IBlockTxs } from "./rabbit/rabbitPeer" export interface IPeer { status(): Promise + getVersion(): number ping(): Promise putTxs(txs: SignedTx[]): Promise getTxs(minFee?: number): Promise @@ -19,7 +21,10 @@ export interface IPeer { getHeadersByHashes(hashes: Hash[]): Promise getBlocksByRange(fromHeight: number, count: number): Promise getHeadersByRange(fromHeight: number, count: number): Promise - getTip(header?: boolean): Promise<{ hash: Hash, height: number, totalwork: number }> + getHTip(header?: boolean): Promise<{ hash: Hash, height: number, totalwork: number }> + getBTip(header?: boolean): Promise<{ hash: Hash, height: number, totalwork: number }> getPeers(count: number): Promise - getInfo(): string + headerSync(remoteTip: ITip): Promise + txSync(remoteTip: ITip): Promise + blockSync(remoteBlockTip: ITip): Promise } diff --git a/src/network/rabbit/networkConstants.ts b/src/network/rabbit/networkConstants.ts new file mode 100644 index 00000000..9bbda003 --- /dev/null +++ b/src/network/rabbit/networkConstants.ts @@ -0,0 +1,26 @@ +export const ZERO = 0 + +export const MAX_PACKET_SIZE = 10 * 1024 * 1024 +export const REPEATED_OVERHEAD = 6 +export const BYTES_OVERHEAD = 6 +export const HASH_SIZE = 32 + +// Header sizes +export const DIFFICULTY_SIZE = 4 +export const TIMESTAMP_SIZE = 8 +export const NONCE_SIZE = 8 +export const MINER_SIZE = 20 + +// Tx sizes +export const AMOUNT_SIZE = 8 +export const FEE_SIZE = 8 +export const TX_NONCE_SIZE = 4 +export const SIGNATURE_SIZE = 64 +export const RECOVERY_SIZE = 4 + +export const MAX_HEADER_SIZE = 3 * (HASH_SIZE + BYTES_OVERHEAD) + REPEATED_OVERHEAD + DIFFICULTY_SIZE + TIMESTAMP_SIZE + NONCE_SIZE + MINER_SIZE + BYTES_OVERHEAD +export const MAX_TX_SIZE = 2 * (HASH_SIZE + BYTES_OVERHEAD) + AMOUNT_SIZE + FEE_SIZE + TX_NONCE_SIZE + SIGNATURE_SIZE + BYTES_OVERHEAD + RECOVERY_SIZE +export const MAX_TXS_PER_BLOCK = 4096 +export const MAX_BLOCK_SIZE = MAX_HEADER_SIZE + MAX_TXS_PER_BLOCK * MAX_TX_SIZE +export const MAX_HEADERS_PER_PACKET = Math.floor(MAX_PACKET_SIZE / MAX_HEADER_SIZE) +export const MAX_BLOCKS_PER_PACKET = Math.floor(MAX_PACKET_SIZE / MAX_BLOCK_SIZE) diff --git a/src/network/rabbit/rabbitPeer.ts b/src/network/rabbit/rabbitPeer.ts index 599fa737..ad35d5a0 100644 --- a/src/network/rabbit/rabbitPeer.ts +++ b/src/network/rabbit/rabbitPeer.ts @@ -3,11 +3,12 @@ import * as Long from "long" import { Socket } from "net" import { AnyBlock, Block } from "../../common/block" import { AnyBlockHeader, BlockHeader } from "../../common/blockHeader" +import { BaseBlockHeader } from "../../common/genesisHeader" import { ITxPool } from "../../common/itxPool" import { SignedTx } from "../../common/txSigned" import { TIMESTAMP_TOLERANCE } from "../../consensus/consensus" -import { IConsensus } from "../../consensus/iconsensus" -import { BlockStatus, BYTES_OVERHEAD, HASH_SIZE, MAX_TX_SIZE, MAX_TXS_PER_BLOCK, REPEATED_OVERHEAD } from "../../consensus/sync" +import { IConsensus, IStatusChange } from "../../consensus/iconsensus" +import { BlockStatus, ITip } from "../../consensus/sync" import { globalOptions } from "../../main" import { MinerServer } from "../../miner/minerServer" import * as proto from "../../serialization/proto" @@ -15,8 +16,8 @@ import { Hash } from "../../util/hash" import { IPeer } from "../ipeer" import { IPeerDatabase } from "../ipeerDatabase" import { BasePeer } from "./basePeer" +import { BYTES_OVERHEAD, HASH_SIZE, MAX_BLOCKS_PER_PACKET, MAX_HEADERS_PER_PACKET, MAX_PACKET_SIZE, MAX_TX_SIZE, MAX_TXS_PER_BLOCK, REPEATED_OVERHEAD } from "./networkConstants" import { RabbitNetwork } from "./rabbitNetwork" -import { MAX_PACKET_SIZE } from "./socketParser" const logger = getLogger("NetPeer") const DIFFICULTY_TOLERANCE = 0.05 @@ -33,6 +34,7 @@ export class RabbitPeer extends BasePeer implements IPeer { private network: RabbitNetwork private receivedBroadcasts: number private lastReceivedTime: number + private version: number constructor(socket: Socket, network: RabbitNetwork, consensus: IConsensus, txPool: ITxPool, peerDB: IPeerDatabase) { super(socket) @@ -65,6 +67,7 @@ export class RabbitPeer extends BasePeer implements IPeer { this.listenPort = status.port this.guid = status.guid + this.version = status.version if (status.version > this.network.version) { logger.warn(`Peer is using a higher version number(${status.version}) than current version(${this.network.version})`) } @@ -74,18 +77,14 @@ export class RabbitPeer extends BasePeer implements IPeer { this.disconnect() throw e } - } - public async getTip(header = false): Promise<{ hash: Hash, height: number, totalwork: number }> { - // Deprecated in favor of getHeaderTip and getBlockTip - const { reply, packet } = await this.sendRequest({ getTip: { dummy: 0, header } }) - if (reply.getTipReturn === undefined) { - this.protocolError(new Error(`Reply has no 'getTipReturn': ${JSON.stringify(reply)}`)) - throw new Error("Invalid response") - } + public async getHTip() { + return this.getTip(true) + } - return { hash: new Hash(reply.getTipReturn.hash), height: Number(reply.getTipReturn.height), totalwork: reply.getTipReturn.totalwork } + public async getBTip() { + return this.getTip(false) } public async getHash(height: number): Promise { @@ -130,6 +129,10 @@ export class RabbitPeer extends BasePeer implements IPeer { } } + public getVersion(): number { + return this.version + } + public async status(): Promise { const { reply, packet } = await this.sendRequest({ status: { @@ -245,13 +248,49 @@ export class RabbitPeer extends BasePeer implements IPeer { return headers } + public async headerSync(remoteTip: ITip) { + const startHeight = Math.min(this.consensus.getHtip().height, remoteTip.height) + const { height: commonHeight, hash: commonHash } = await this.commonSearch(startHeight, remoteTip.height - 1, BlockStatus.Header) + logger.debug(`Found Start Header=${commonHeight}`) + return this.getHeaders(commonHeight, commonHash, remoteTip.height) + } + public async blockSync(remoteBlockTip: ITip) { + const startHeight = Math.min(this.consensus.getBtip().height, remoteBlockTip.height) + const { height: commonHeight, hash: commonHash } = await this.commonSearch(startHeight, remoteBlockTip.height - 1, BlockStatus.Block) + logger.debug(`Found Start Block=${commonHeight}`) + return this.getBlocks(commonHeight, commonHash, remoteBlockTip.height) + } + + public async txSync(remoteTip: ITip) { + const blockHashes: Hash[] = [] + let header = this.consensus.getHtip().header + let status: BlockStatus + do { + status = await this.consensus.getBlockStatus(new Hash(header)) + if (status >= BlockStatus.Block) { break } + if (!(header instanceof BlockHeader)) { continue } + if (!header.merkleRoot.equals(Hash.emptyHash)) { + blockHashes.unshift(new Hash(header)) + } + if (blockHashes.length > MAX_HEADERS_PER_PACKET) { + blockHashes.pop() + } + header = await this.consensus.getHeaderByHash(header.previousHash[0]) + } while (status < BlockStatus.Block) + + if (blockHashes.length > 0) { + logger.debug(`Receiving transactions from ${this.socketBuffer.getIp()}:${this.socketBuffer.getPort()}`) + return this.getTxBlocks(blockHashes) + } + } + // this is called in BasePeer's onPacket protected async respond(id: number, request: proto.Network, packet: Buffer): Promise { let response: proto.INetwork const reply = id !== 0 const rebroadcast = () => { if (id === 0) { - setTimeout(() => this.network.broadcast, packet, this) + setImmediate(() => this.network.broadcast(packet)) } } switch (request.request) { @@ -311,6 +350,16 @@ export class RabbitPeer extends BasePeer implements IPeer { } } + private async getTip(header = false): Promise<{ hash: Hash, height: number, totalwork: number }> { + const { reply } = await this.sendRequest({ getTip: { header } }) + if (reply.getTipReturn === undefined) { + this.protocolError(new Error(`Reply has no 'getTipReturn': ${JSON.stringify(reply)}`)) + throw new Error("Invalid response") + } + + return { hash: new Hash(reply.getTipReturn.hash), height: Number(reply.getTipReturn.height), totalwork: reply.getTipReturn.totalwork } + } + private async respondStatus(reply: boolean, request: proto.IStatus): Promise { const receviedStatus = new proto.Status(request) const message: proto.INetwork = { @@ -512,16 +561,6 @@ export class RabbitPeer extends BasePeer implements IPeer { } private async respondPutHeaders(reply: boolean, request: proto.IPutHeaders): Promise { - setImmediate(async () => { - try { - request.headers = request.headers.slice(0, 1) - const header = new BlockHeader(request.headers[0]) - await this.consensus.putHeader(header) - } catch (e) { - logger.debug(e) - } - }) - return { putHeadersReturn: { statusChanges: [] } } } @@ -559,4 +598,129 @@ export class RabbitPeer extends BasePeer implements IPeer { } return message } + + private async commonSearch(startHeight: number, max: number, searchStatus: BlockStatus) { + let min: number + let i = 0 + await this.search(startHeight, (height, status) => { + if (status === undefined || status === BlockStatus.Rejected) { + this.disconnect() + return undefined + } + + if (height <= 0 || status >= searchStatus) { + min = height + return undefined + } else { + max = height + } + i++ + return Math.max(startHeight - Math.pow(2, i), 0) + }) + logger.debug(`Found minimum height(${min})`) + + return this.search(Math.floor((min + max) / 2), (height, status) => { + if (height === min) { + return undefined + } + + if (status === undefined || status === BlockStatus.Rejected) { + logger.debug(`Peer supplied rejected information`) + this.disconnect() + return undefined + } + + if (status >= searchStatus) { + min = height + } else { + max = height + } + return Math.floor((min + max) / 2) + }) + } + + private async search(height: number, update: (height: number, status?: BlockStatus) => number) { + while (height !== undefined) { + const hash = await this.getHash(height) + const status = hash === undefined ? undefined : await this.consensus.getBlockStatus(hash) + logger.debug(`Peer's block at height ${height} has status ${status}`) + const newHeight = update(height, status) + if (newHeight === undefined) { + return { height, hash } + } + height = newHeight + } + } + + private async getHeaders(commonHeight: number, commonHash: Hash, maxHeight: number) { + let headers: BaseBlockHeader[] + let previousHash = commonHash + logger.debug(`commonHeight: ${commonHeight}, previousHash: ${previousHash}`) + let height = commonHeight + 1 + do { + headers = await this.getHeadersByRange(height, MAX_HEADERS_PER_PACKET) + for (const header of headers) { + if (!(header instanceof BlockHeader)) { + throw new Error(`Received Genesis Block Header during sync`) + } + const result = await this.consensus.putHeader(header) + this.validatePut(height, previousHash, result, header, BlockStatus.Header) + height++ + previousHash = new Hash(header) + } + } while (height < maxHeight && headers.length > 0) + } + + private async getBlocks(commonHeight: number, commonHash: Hash, maxHeight: number) { + let blocks: AnyBlock[] + let previousHash = commonHash + let height = commonHeight + 1 + do { + blocks = await this.getBlocksByRange(height, MAX_BLOCKS_PER_PACKET) + for (const block of blocks) { + if (!(block instanceof Block)) { + throw new Error(`Received Genesis Block during sync`) + } + const result = await this.consensus.putBlock(block) + this.validatePut(height, previousHash, result, block.header, BlockStatus.Block) + height++ + previousHash = new Hash(block.header) + } + + } while (height < maxHeight && blocks.length > 0) + } + + private validatePut(height: number, previousHash: Hash, result: IStatusChange, header: BlockHeader, expectedStatus: BlockStatus) { + if (result.status === undefined || result.status < expectedStatus) { + throw new Error(`Block rejected: ${JSON.stringify(result)}`) + } + if (result.height !== undefined && result.height !== height) { + throw new Error(`Expected header at height(${height}) but got header at height(${result.height})`) + } + if (!previousHash.equals(header.previousHash[0])) { + throw new Error(`Expected header's previousHash(${header.previousHash[0]}) to be ${previousHash}`) + } + if (result.oldStatus >= result.status) { + logger.debug(`Received block has already been received`) + } + } + + private async getTxBlocks(blockHashes: Hash[]) { + const requestSize = blockHashes.length * (HASH_SIZE + BYTES_OVERHEAD) + REPEATED_OVERHEAD + const numRequests = Math.ceil(requestSize / MAX_PACKET_SIZE) + const txBlocksPerRequest = Math.floor(blockHashes.length / numRequests) + + for (let i = 0; i < numRequests; i++) { + const requestHashes = blockHashes.splice(0, txBlocksPerRequest) + while (requestHashes.length > 0) { + const receivedTxBlock = await this.getBlockTxs(requestHashes) + requestHashes.splice(0, receivedTxBlock.length) + const changes = await this.consensus.putTxBlocks(receivedTxBlock) + if (!changes.every((change) => change.oldStatus < change.status)) { + logger.debug(`Received txBlock has already been received`) + return + } + } + } + } } diff --git a/src/network/rabbit/socketParser.ts b/src/network/rabbit/socketParser.ts index 753ec537..debc78f5 100644 --- a/src/network/rabbit/socketParser.ts +++ b/src/network/rabbit/socketParser.ts @@ -3,9 +3,9 @@ import { Socket } from "net" import { AsyncLock } from "../../common/asyncLock" import { globalOptions } from "../../main" import { Hash } from "../../util/hash" +import { MAX_PACKET_SIZE } from "./networkConstants" const logger = getLogger("SocketBuffer") -export const MAX_PACKET_SIZE = 10 * 1024 * 1024 enum ParseState { HeaderPrefix, HeaderRoute, @@ -39,7 +39,7 @@ export class SocketParser { this.parseReset() this.socket.on("data", (data) => this.receive(data)) this.socket.on("drain", () => { - logger.debug(`Resuming socket ${this.socket.remoteAddress}:${this.socket.remotePort}`) + logger.debug(`Resuming socket(${this.socket.bufferSize}) ${this.socket.remoteAddress}:${this.socket.remotePort}`) this.socket.resume() this.sendLock.releaseLock() }) @@ -50,29 +50,22 @@ export class SocketParser { throw new Error("Buffer too large") } - // true: all queued to kernel buffer - // false: user memory is used - - let kernel = true await this.sendLock.getLock() this.writeBuffer.writeUInt32LE(route, 0) this.writeBuffer.writeUInt32LE(buffer.length, 4) - // using array buffer directly doesn't work - kernel = kernel && this.socket.write(Buffer.from(headerPrefix)) - kernel = kernel && this.socket.write(this.writeBuffer) - kernel = kernel && this.socket.write(Buffer.from(buffer)) - if (kernel) { + this.socket.write(Buffer.from(headerPrefix)) + this.socket.write(this.writeBuffer) + this.socket.write(Buffer.from(buffer)) + if (this.socket.bufferSize === undefined || this.socket.bufferSize < 1024 * 1024) { this.sendLock.releaseLock() } else { // for this case, user memory is used // it will be released in "drain" event - logger.debug(`Pausing socket ${this.socket.remoteAddress}:${this.socket.remotePort}`) this.socket.pause() } } public destroy(e?: Error): void { - this.sendLock.rejectAll() if (this.socket) { logger.debug(`Disconnecting from ${this.socket.remoteAddress}:${this.socket.remotePort} due to protocol error: ${e}, ${e ? e.stack : ""}`) diff --git a/src/server.ts b/src/server.ts index 5e3d7308..23875be9 100644 --- a/src/server.ts +++ b/src/server.ts @@ -5,14 +5,13 @@ import { TxPool } from "./common/txPool" import { Consensus } from "./consensus/consensus" import { WorldState } from "./consensus/database/worldState" import { IConsensus } from "./consensus/iconsensus" -import { Sync, ITip } from "./consensus/sync" +import { Sync } from "./consensus/sync" import { globalOptions } from "./main" import { MinerServer } from "./miner/minerServer" import { INetwork } from "./network/inetwork" import { RabbitNetwork } from "./network/rabbit/rabbitNetwork" import { RestManager } from "./rest/restManager" import { Wallet } from "./wallet/wallet" -import { IPeer } from "./network/ipeer"; const logger = getLogger("Server") @@ -39,6 +38,7 @@ export class Server { this.network = new RabbitNetwork(this.txPool, this.consensus, globalOptions.port, prefix + "peerdb" + postfix, globalOptions.networkid) this.miner = new MinerServer(this.txPool, this.worldState, this.consensus, this.network, globalOptions.cpuMiners, globalOptions.str_port) this.rest = new RestManager(this) + this.sync = new Sync(this.consensus, this.network) } public async run() { await this.consensus.init() @@ -59,32 +59,6 @@ export class Server { this.network.addPeer(ip, port).catch((e) => logger.error(`Failed to connect to client: ${e}`)) } } - await this.runSync() - } - - public async runSync() { - logger.debug(`begin sync`) - const peerPromises = this.network.getPeers().map((peer) => peer.getTip().then((tip) => ({ peer, tip })).catch((e) => logger.debug(e))) - const peers = [] as { peer: IPeer; tip: ITip; }[] - for (const peerPromise of peerPromises) { - try { - const result = await peerPromise - if (result !== undefined) { - peers.push(await result) - } - } catch (e) { - logger.debug(e) - } - } - const syncCandidates = peers.filter((peer) => peer.tip.totalwork > this.consensus.getBtip().totalWork) - - if (syncCandidates.length > 0) { - const syncPeer = syncCandidates[Math.floor(Math.random() * syncCandidates.length)] - const sync = new Sync(syncPeer, this.consensus, this.network.version) - await sync.sync() - } - - setTimeout(() => this.runSync(), 1000) - logger.debug(`end sync`) + this.sync.start() } }