Skip to content

Commit

Permalink
Merge pull request #1966 from ChainSafe/cayman/tweak-chain-segment
Browse files Browse the repository at this point in the history
Update block/chain segment processing
  • Loading branch information
mpetrunic authored Jan 19, 2021
2 parents 1d82b13 + 28b2a88 commit e06cdd3
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 153 deletions.
163 changes: 148 additions & 15 deletions packages/lodestar/src/chain/blocks/process.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,181 @@
import {computeEpochAtSlot} from "@chainsafe/lodestar-beacon-state-transition";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {IForkChoice} from "@chainsafe/lodestar-fork-choice";
import {
getAllBlockSignatureSets,
getAllBlockSignatureSetsExceptProposer,
ISignatureSet,
} from "@chainsafe/lodestar-beacon-state-transition/lib/fast/signatureSets";

import {ChainEventEmitter} from "../emitter";
import {IBlockJob} from "../interface";
import {IBlockJob, IChainSegmentJob} from "../interface";
import {runStateTransition} from "./stateTransition";
import {IStateRegenerator} from "../regen";
import {BlockError, BlockErrorCode} from "../errors";
import {IStateRegenerator, RegenError} from "../regen";
import {BlockError, BlockErrorCode, ChainSegmentError} from "../errors";
import {IBeaconDb} from "../../db";
import {ITreeStateContext} from "../../db/api/beacon/stateContextCache";
import {verifySignatureSetsBatch} from "../bls";
import {findLastIndex} from "../../util/array";

export async function processBlocks({
export async function processBlock({
forkChoice,
regen,
emitter,
db,
jobs,
job,
}: {
forkChoice: IForkChoice;
regen: IStateRegenerator;
emitter: ChainEventEmitter;
db: IBeaconDb;
jobs: IBlockJob[];
job: IBlockJob;
}): Promise<void> {
let preStateContext: ITreeStateContext;
if (!forkChoice.hasBlock(job.signedBlock.message.parentRoot)) {
throw new BlockError({
code: BlockErrorCode.PARENT_UNKNOWN,
parentRoot: job.signedBlock.message.parentRoot.valueOf() as Uint8Array,
job,
});
}

try {
preStateContext = await regen.getPreState(jobs[0].signedBlock.message);
const preStateContext = await regen.getPreState(job.signedBlock.message);

if (!job.validSignatures) {
const {epochCtx, state} = preStateContext;
const signatureSets = job.validProposerSignature
? getAllBlockSignatureSetsExceptProposer(epochCtx, state, job.signedBlock)
: getAllBlockSignatureSets(epochCtx, state, job.signedBlock);

if (!verifySignatureSetsBatch(signatureSets)) {
throw new BlockError({
code: BlockErrorCode.INVALID_SIGNATURE,
job,
});
}

job.validProposerSignature = true;
job.validSignatures = true;
}

await runStateTransition(emitter, forkChoice, db, preStateContext, job);
} catch (e) {
if (e instanceof RegenError) {
throw new BlockError({
code: BlockErrorCode.PRESTATE_MISSING,
job,
});
}

if (e instanceof BlockError) {
throw e;
}

throw new BlockError({
code: BlockErrorCode.PRESTATE_MISSING,
job: jobs[0],
code: BlockErrorCode.BEACON_CHAIN_ERROR,
error: e,
job,
});
}
}

export async function processChainSegment({
config,
forkChoice,
regen,
emitter,
db,
job,
}: {
config: IBeaconConfig;
forkChoice: IForkChoice;
regen: IStateRegenerator;
emitter: ChainEventEmitter;
db: IBeaconDb;
job: IChainSegmentJob;
}): Promise<void> {
let importedBlocks = 0;
let blocks = job.signedBlocks;

// Process segment epoch by epoch
while (blocks.length) {
const firstBlock = blocks[0];
// First ensure that the segment's parent has been processed
if (!forkChoice.hasBlock(firstBlock.message.parentRoot)) {
throw new ChainSegmentError({
code: BlockErrorCode.PARENT_UNKNOWN,
parentRoot: firstBlock.message.parentRoot.valueOf() as Uint8Array,
job,
importedBlocks,
});
}
const startEpoch = computeEpochAtSlot(config, firstBlock.message.slot);

// The `lastIndex` indicates the position of the last block that is in the current
// epoch of `startEpoch`.
const lastIndex = findLastIndex(blocks, (block) => computeEpochAtSlot(config, block.message.slot) === startEpoch);

// Split off the first section blocks that are all either within the current epoch of
// the first block. These blocks can all be signature-verified with the same
// `BeaconState`.
const blocksInEpoch = blocks.slice(0, lastIndex);
blocks = blocks.slice(lastIndex);

for (const job of jobs) {
try {
preStateContext = await runStateTransition(emitter, forkChoice, db, preStateContext, job);
let preStateContext = await regen.getPreState(firstBlock.message);

// Verify the signature of the blocks, returning early if the signature is invalid.
if (!job.validSignatures) {
const signatureSets: ISignatureSet[] = [];
for (const block of blocksInEpoch) {
const {epochCtx, state} = preStateContext;
signatureSets.push(
...(job.validProposerSignature
? getAllBlockSignatureSetsExceptProposer(epochCtx, state, block)
: getAllBlockSignatureSets(epochCtx, state, block))
);
}

if (!verifySignatureSetsBatch(signatureSets)) {
throw new ChainSegmentError({
code: BlockErrorCode.INVALID_SIGNATURE,
job,
importedBlocks,
});
}
}

for (const block of blocksInEpoch) {
preStateContext = await runStateTransition(emitter, forkChoice, db, preStateContext, {
reprocess: job.reprocess,
prefinalized: job.prefinalized,
signedBlock: block,
validProposerSignature: true,
validSignatures: true,
});
importedBlocks++;
}
} catch (e) {
if (e instanceof RegenError) {
throw new ChainSegmentError({
code: BlockErrorCode.PRESTATE_MISSING,
job,
importedBlocks,
});
}

if (e instanceof BlockError) {
throw e;
throw new ChainSegmentError({
...e.type,
job,
importedBlocks,
});
}

throw new BlockError({
throw new ChainSegmentError({
code: BlockErrorCode.BEACON_CHAIN_ERROR,
error: e,
job,
importedBlocks,
});
}
}
Expand Down
107 changes: 87 additions & 20 deletions packages/lodestar/src/chain/blocks/processor.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import {AbortSignal} from "abort-controller";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {IForkChoice} from "@chainsafe/lodestar-fork-choice";
import {SignedBeaconBlock} from "@chainsafe/lodestar-types";

import {IBlockJob, IChainSegmentJob} from "../interface";
import {ChainEvent, ChainEventEmitter} from "../emitter";
import {IBeaconClock} from "../clock";
import {IStateRegenerator} from "../regen";
import {JobQueue} from "../../util/queue";

import {processBlocks} from "./process";
import {validateBlocks} from "./validate";
import {IBeaconDb} from "../../db";
import {BlockError, BlockErrorCode, ChainSegmentError} from "../errors";

import {processBlock, processChainSegment} from "./process";
import {validateBlock} from "./validate";

type BlockProcessorModules = {
config: IBeaconConfig;
Expand Down Expand Up @@ -61,8 +63,8 @@ export class BlockProcessor {
*/
export async function processBlockJob(modules: BlockProcessorModules, job: IBlockJob): Promise<void> {
try {
await validateBlocks({...modules, jobs: [job]});
await processBlocks({...modules, jobs: [job]});
validateBlock({...modules, job});
await processBlock({...modules, job});
} catch (e) {
// above functions only throw BlockError
modules.emitter.emit(ChainEvent.errorBlock, e);
Expand All @@ -72,20 +74,85 @@ export async function processBlockJob(modules: BlockProcessorModules, job: IBloc
/**
* Similar to processBlockJob but this process a chain segment
*/
export async function processChainSegmentJob(
modules: BlockProcessorModules,
chainSegmentJob: IChainSegmentJob
): Promise<void> {
try {
const blockJobs: IBlockJob[] = chainSegmentJob.signedBlocks.map((signedBlock) => ({
signedBlock,
...chainSegmentJob,
}));
await validateBlocks({...modules, jobs: blockJobs});
await processBlocks({...modules, jobs: blockJobs});
} catch (e) {
// above functions only throw BlockError
modules.emitter.emit(ChainEvent.errorBlock, e);
throw e;
export async function processChainSegmentJob(modules: BlockProcessorModules, job: IChainSegmentJob): Promise<void> {
const blocks = job.signedBlocks;

// Validate and filter out irrelevant blocks
const filteredChainSegment: SignedBeaconBlock[] = [];
for (const [i, block] of blocks.entries()) {
const child = blocks[i + 1];
if (child) {
// If this block has a child in this chain segment, ensure that its parent root matches
// the root of this block.
//
// Without this check it would be possible to have a block verified using the
// incorrect shuffling. That would be bad, mmkay.
if (
!modules.config.types.Root.equals(
modules.config.types.BeaconBlock.hashTreeRoot(block.message),
child.message.parentRoot
)
) {
throw new ChainSegmentError({
code: BlockErrorCode.NON_LINEAR_PARENT_ROOTS,
job,
importedBlocks: 0,
});
}
// Ensure that the slots are strictly increasing throughout the chain segment.
if (child.message.slot <= block.message.slot) {
throw new ChainSegmentError({
code: BlockErrorCode.NON_LINEAR_SLOTS,
job,
importedBlocks: 0,
});
}
}

try {
validateBlock({...modules, job: {...job, signedBlock: block}});
// If the block is relevant, add it to the filtered chain segment.
filteredChainSegment.push(block);
} catch (e) {
switch ((e as BlockError).type.code) {
// If the block is already known, simply ignore this block.
case BlockErrorCode.BLOCK_IS_ALREADY_KNOWN:
continue;
// If the block is the genesis block, simply ignore this block.
case BlockErrorCode.GENESIS_BLOCK:
continue;
// If the block is is for a finalized slot, simply ignore this block.
//
// The block is either:
//
// 1. In the canonical finalized chain.
// 2. In some non-canonical chain at a slot that has been finalized already.
//
// In the case of (1), there's no need to re-import and later blocks in this
// segement might be useful.
//
// In the case of (2), skipping the block is valid since we should never import it.
// However, we will potentially get a `ParentUnknown` on a later block. The sync
// protocol will need to ensure this is handled gracefully.
case BlockErrorCode.WOULD_REVERT_FINALIZED_SLOT:
continue;
// Any other error whilst determining if the block was invalid, return that
// error.
default:
throw new ChainSegmentError({
job: e.job,
...e.type,
importedBlocks: 0,
});
}
}
}

await processChainSegment({
...modules,
job: {
...job,
signedBlocks: filteredChainSegment,
},
});
}
26 changes: 1 addition & 25 deletions packages/lodestar/src/chain/blocks/stateTransition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,13 @@ import {
toIStateContext,
} from "@chainsafe/lodestar-beacon-state-transition";
import {processSlots} from "@chainsafe/lodestar-beacon-state-transition/lib/fast/slot";
import {
getAllBlockSignatureSets,
getAllBlockSignatureSetsExceptProposer,
} from "@chainsafe/lodestar-beacon-state-transition/lib/fast/signatureSets";
import {IBlockSummary, IForkChoice} from "@chainsafe/lodestar-fork-choice";

import {LodestarEpochContext, ITreeStateContext} from "../../db/api/beacon/stateContextCache";
import {ChainEvent, ChainEventEmitter} from "../emitter";
import {IBlockJob} from "../interface";
import {sleep} from "@chainsafe/lodestar-utils";
import {IBeaconDb} from "../../db";
import {BlockError, BlockErrorCode} from "../errors";
import {verifySignatureSetsBatch} from "../bls";
import {StateTransitionEpochContext} from "@chainsafe/lodestar-beacon-state-transition/lib/fast/util/epochContext";

/**
Expand Down Expand Up @@ -145,28 +139,10 @@ export async function runStateTransition(
const config = stateContext.epochCtx.config;
const {SLOTS_PER_EPOCH} = config.params;
const postSlot = job.signedBlock.message.slot;
const checkpointStateContext = await processSlotsToNearestCheckpoint(emitter, stateContext, postSlot - 1);

if (!job.validSignatures) {
const {epochCtx, state} = checkpointStateContext;
const signatureSets = job.validProposerSignature
? getAllBlockSignatureSetsExceptProposer(epochCtx, state, job.signedBlock)
: getAllBlockSignatureSets(epochCtx, state, job.signedBlock);

if (!verifySignatureSetsBatch(signatureSets)) {
throw new BlockError({
code: BlockErrorCode.INVALID_SIGNATURE,
job,
});
}

job.validProposerSignature = true;
job.validSignatures = true;
}

// if block is trusted don't verify proposer or op signature
const postStateContext = toTreeStateContext(
fastStateTransition(checkpointStateContext, job.signedBlock, {
fastStateTransition(stateContext, job.signedBlock, {
verifyStateRoot: true,
verifyProposer: !job.validSignatures && !job.validProposerSignature,
verifySignatures: !job.validSignatures,
Expand Down
Loading

0 comments on commit e06cdd3

Please sign in to comment.