Skip to content

Commit

Permalink
Merge pull request #2193 from ergoplatform/indexer-refactoring
Browse files Browse the repository at this point in the history
Indexer code refactoring
  • Loading branch information
kushti authored Feb 4, 2025
2 parents 820b5fa + 64f7962 commit ad564c9
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {

protected def historyStorage: HistoryStorage = _history.historyStorage

/**
* Used in tests to indicate the indexer has caught up to the chain
*/
protected def caughtUpHook(height: Int = 0): Unit = {}

// fast access buffers
protected val general: ArrayBuffer[ExtraIndex] = ArrayBuffer.empty[ExtraIndex]
protected val boxes: mutable.HashMap[ModifierId, IndexedErgoBox] = mutable.HashMap.empty[ModifierId, IndexedErgoBox]
Expand All @@ -86,7 +91,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
* @param height - blockheight to get transations from
* @return transactions at height
*/
private def getBlockTransactionsAt(height: Int): Option[BlockTransactions] =
private def getBlockTransactionsAt(height: Int): Option[BlockTransactions] = {
blockCache.remove(height).orElse(history.bestBlockTransactionsAt(height)).map { txs =>
if (height % 1000 == 0) blockCache.keySet.filter(_ < height).map(blockCache.remove)
if (readingUpTo - height < 300 && chainHeight - height > 1000) {
Expand All @@ -112,6 +117,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
}
txs
}
}

/**
* Spend an IndexedErgoBox from buffer or database. Also record tokens for later use in balance tracking logic.
Expand Down Expand Up @@ -143,7 +149,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
* @param id - hash of the (ergotree) address
* @param spendOrReceive - IndexedErgoBox to receive (Right) or spend (Left)
*/
private def findAndUpdateTree(id: ModifierId, spendOrReceive: Either[IndexedErgoBox, IndexedErgoBox])(implicit state: IndexerState): Unit = {
private def findAndUpdateTree(id: ModifierId, spendOrReceive: Either[IndexedErgoBox, IndexedErgoBox])(state: IndexerState): Unit = {
trees.get(id).map { tree =>
spendOrReceive match {
case Left(iEb) => tree.addTx(state.globalTxIndex).spendBox(iEb, Some(history)) // spend box
Expand Down Expand Up @@ -198,7 +204,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
/**
* Write buffered indexes to database and clear buffers.
*/
private def saveProgress(state: IndexerState, writeLog: Boolean = true): Unit = {
private def saveProgress(state: IndexerState): Unit = {

val start: Long = System.currentTimeMillis

Expand All @@ -225,9 +231,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
((((general ++= boxes.values) ++= trees.values) ++= tokens.values) ++= segments.values).toArray
)

if (writeLog) {
log.info(s"Processed ${trees.size} ErgoTrees with ${boxes.size} boxes and inserted them to database in ${System.currentTimeMillis - start}ms")
}
log.debug(s"Processed ${trees.size} ErgoTrees with ${boxes.size} boxes and inserted them to database in ${System.currentTimeMillis - start}ms")

// clear buffers for next batch
general.clear()
Expand All @@ -241,7 +245,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
* Process a batch of BlockTransactions into memory and occasionally write them to database.
*
* @param state - current indexer state
* @param headerOpt - header to index blocktransactions of (used after caught up with chain)
* @param headerOpt - header to index block transactions of (used after caught up with chain)
*/
protected def index(state: IndexerState, headerOpt: Option[Header] = None): IndexerState = {
val btOpt = headerOpt.flatMap { header =>
Expand All @@ -257,7 +261,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
val txs: Seq[ErgoTransaction] = btOpt.get.txs

var boxCount: Int = 0
implicit var newState: IndexerState = state
var newState: IndexerState = state

// record transactions and boxes
cfor(0)(_ < txs.length, _ + 1) { n =>
Expand All @@ -269,16 +273,18 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
inputTokens.clear()

//process transaction inputs
if (height != 1) { //only after 1st block (skip genesis box)
if (height > 1) { //only after 1st block (skip genesis box)
cfor(0)(_ < tx.inputs.size, _ + 1) { i =>
val boxId = bytesToId(tx.inputs(i).boxId)
if (findAndSpendBox(boxId, tx.id, height)) { // spend box and add tx
val iEb = boxes(boxId)
findAndUpdateTree(hashErgoTree(iEb.box.ergoTree), Left(iEb))
findAndUpdateTree(hashErgoTree(iEb.box.ergoTree), Left(iEb))(newState)
cfor(0)(_ < iEb.box.additionalTokens.length, _ + 1) { j =>
findAndUpdateToken(iEb.box.additionalTokens(j)._1.toModifierId, Left(iEb))
}
inputs(i) = iEb.globalIndex
} else {
log.warn(s"Not found input box: $boxId")
}
}
}
Expand All @@ -291,7 +297,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
outputs(i) = iEb.globalIndex

// box by address
findAndUpdateTree(hashErgoTree(iEb.box.ergoTree), Right(boxes(iEb.id)))
findAndUpdateTree(hashErgoTree(iEb.box.ergoTree), Right(boxes(iEb.id)))(newState)

// check if box is creating new tokens, if yes record them
cfor(0)(_ < iEb.box.additionalTokens.length, _ + 1) { j =>
Expand Down Expand Up @@ -329,13 +335,13 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
* Remove all indexes after a given height and revert address balances.
*
* @param state - current state of indexer
* @param height - starting height
* @param height - forking height (height of last common block)
*/
private def removeAfter(state: IndexerState, height: Int): IndexerState = {

var newState: IndexerState = state

saveProgress(newState, writeLog = false)
saveProgress(newState)
log.info(s"Rolling back indexes from ${state.indexedHeight} to $height")

try {
Expand Down Expand Up @@ -396,7 +402,7 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
// Save changes
newState = newState.copy(indexedHeight = height, rollbackTo = 0, caughtUp = true)
historyStorage.removeExtra(toRemove.toArray)
saveProgress(newState, writeLog = false)
saveProgress(newState)
} catch {
case t: Throwable => log.error(s"removeAfter during rollback failed due to: ${t.getMessage}", t)
}
Expand All @@ -409,22 +415,24 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
case Index() if !state.caughtUp && !state.rollbackInProgress =>
val newState = index(state.incrementIndexedHeight)
if (modCount >= saveLimit) saveProgress(newState)
context.become(loaded(newState))
context.become(receive.orElse(loaded(newState)))
self ! Index()

case Index() if state.caughtUp =>
if (modCount > 0) saveProgress(state)
blockCache.clear()
caughtUpHook()
log.info("Indexer caught up with chain")

// after the indexer caught up with the chain, stay up to date
case FullBlockApplied(header: Header) if state.caughtUp && !state.rollbackInProgress =>
if (header.height == state.indexedHeight + 1) { // applied block is next in line
val newState: IndexerState = index(state.incrementIndexedHeight, Some(header))
saveProgress(newState)
context.become(loaded(newState))
context.become(receive.orElse(loaded(newState)))
caughtUpHook(header.height)
} else if (header.height > state.indexedHeight + 1) { // applied block is ahead of indexer
context.become(loaded(state.copy(caughtUp = false)))
context.become(receive.orElse(loaded(state.copy(caughtUp = false))))
self ! Index()
} else // applied block has already been indexed, skipping duplicate
log.warn(s"Skipping block ${header.id} applied at height ${header.height}, indexed height is ${state.indexedHeight}")
Expand All @@ -437,13 +445,13 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
history.heightOf(branchPoint) match {
case Some(branchHeight) =>
if (branchHeight < state.indexedHeight) {
context.become(loaded(state.copy(rollbackTo = branchHeight)))
context.become(receive.orElse(loaded(state.copy(rollbackTo = branchHeight))))
self ! RemoveAfter(branchHeight)
}
case None =>
log.error(s"No rollback height found for $branchPoint")
val newState = state.copy(rollbackTo = 0)
context.become(loaded(newState))
context.become(receive.orElse(loaded(newState)))
unstashAll()
}
}
Expand All @@ -452,7 +460,8 @@ trait ExtraIndexerBase extends Actor with Stash with ScorexLogging {
blockCache.clear()
readingUpTo = 0
val newState = removeAfter(state, branchHeight)
context.become(loaded(newState))
context.become(receive.orElse(loaded(newState)))
caughtUpHook()
log.info(s"Successfully rolled back indexes to $branchHeight")
unstashAll()

Expand Down Expand Up @@ -503,13 +512,11 @@ class ExtraIndexer(cacheSettings: CacheSettings,
case StartExtraIndexer(history: ErgoHistory) =>
_history = history
val state = IndexerState.fromHistory(history)
context.become(loaded(state))
context.become(receive.orElse(loaded(state)))
log.info(s"Started extra indexer at height ${state.indexedHeight}")
self ! Index()
unstashAll()

case _ => stash()

}
}

Expand Down Expand Up @@ -592,7 +599,9 @@ object ExtraIndexer {
0
}))

def getIndex(key: Array[Byte], history: ErgoHistoryReader): ByteBuffer = getIndex(key, history.historyStorage)
def getIndex(key: Array[Byte], history: ErgoHistoryReader): ByteBuffer = {
getIndex(key, history.historyStorage)
}

def apply(chainSettings: ChainSettings, cacheSettings: CacheSettings)(implicit system: ActorSystem): ActorRef = {
val props = Props.create(classOf[ExtraIndexer], cacheSettings, chainSettings.addressEncoder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ case class IndexedErgoAddress(treeHash: ModifierId,
* @return this address
*/
override private[extra] def spendBox(iEb: IndexedErgoBox, historyOpt: Option[ErgoHistoryReader] = None)(implicit ae: ErgoAddressEncoder): IndexedErgoAddress = {
if(historyOpt.isDefined)
if(historyOpt.isDefined) {
findAndModBox(iEb.globalIndex, historyOpt.get)
}
balanceInfo.foreach(_.subtract(iEb.box))
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ abstract class Segment[T <: Segment[_] : ClassTag](val parentId: ModifierId,
/**
* Internal segment buffer
*/
private[extra] val buffer: mutable.HashMap[ModifierId,T] = new mutable.HashMap[ModifierId,T]
private[extra] val buffer: mutable.HashMap[ModifierId, T] = new mutable.HashMap[ModifierId, T]

/**
* Number of segments in database containing box numbers
Expand Down Expand Up @@ -81,13 +81,11 @@ abstract class Segment[T <: Segment[_] : ClassTag](val parentId: ModifierId,
val mid = (low + high) >>> 1
segmentId = boxSegmentId(parentId, mid)
buffer.get(segmentId).orElse(history.typedExtraIndexById[T](idMod(segmentId))).foreach { segment =>
if(abs(segment.boxes.head) < boxNumAbs &&
abs(segment.boxes.last) < boxNumAbs)
if (abs(segment.boxes.head) < boxNumAbs && abs(segment.boxes.last) < boxNumAbs) {
low = mid + 1
else if(abs(segment.boxes.head) > boxNumAbs &&
abs(segment.boxes.last) > boxNumAbs)
} else if (abs(segment.boxes.head) > boxNumAbs && abs(segment.boxes.last) > boxNumAbs) {
high = mid - 1
else {
} else {
low = high + 1 // break
buffer.put(segmentId, segment)
}
Expand Down Expand Up @@ -175,7 +173,7 @@ abstract class Segment[T <: Segment[_] : ClassTag](val parentId: ModifierId,
* @param offset - number of items to skip from the start
* @param limit - max number of item to be returned
* @param segmentCount - number of segments of the parent address
* @param array - the indexes already in memory
* @param arr - the indexes already in memory
* @param idOf - function to calculate segment ids, either [[txSegmentId]] or [[boxSegmentId]]
* @param arraySelector - function to select index array from retreived segments
* @param retrieve - function to retrieve indexes from database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ class HistoryStorage(indexStore: LDBKVStore, objectsStore: LDBKVStore, extraStor
ExtraIndexSerializer.parseBytesTry(bytes) match {
case Success(pm) =>
log.trace(s"Cache miss for existing index $id")
if(pm.isInstanceOf[Segment[_]]){
extraCache.put(pm.id, pm) // cache all segment type objects
if(!pm.isInstanceOf[Segment[_]]){
extraCache.put(pm.id, pm) // cache non-segment objects
}
Some(pm)
case Failure(_) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object ChainGenerator extends ErgoTestHelpers with Matchers {
import org.ergoplatform.utils.generators.ErgoCoreTransactionGenerators._

val pow: AutolykosPowScheme = new AutolykosPowScheme(powScheme.k, powScheme.n)
val blockInterval: FiniteDuration = 2.minute
val blockInterval: FiniteDuration = 1.minute
val EmissionTxCost: Long = 20000
val MinTxAmount: Long = 2000000
val RewardDelay: Int = initSettings.chainSettings.monetary.minerRewardDelay
Expand All @@ -46,20 +46,26 @@ object ChainGenerator extends ErgoTestHelpers with Matchers {
val minimalSuffix = 2
val txCostLimit: Height = initSettings.nodeSettings.maxTransactionCost
val txSizeLimit: Height = initSettings.nodeSettings.maxTransactionSize
val startTime: Long = System.currentTimeMillis() - ((5000 - 1) * blockInterval.toMillis)

var startTime: Long = 0
var endTime: Long = 0

def generate(length: Int, dir: File)(history: ErgoHistory): Unit = {
val stateDir = new File(s"${dir.getAbsolutePath}/state")
stateDir.mkdirs()
val (state, _) = ErgoState.generateGenesisUtxoState(stateDir, initSettings)
System.out.println(s"Going to generate a chain at ${dir.getAbsolutePath} starting from ${history.bestFullBlockOpt}")
startTime = System.currentTimeMillis() - (blockInterval * (length - 1)).toMillis
val chain = loop(state, None, None, Seq())(history)
System.out.println(s"Chain of length ${chain.length} generated")
def generate(length: Int, dir: File, history: ErgoHistory, stateOpt: Option[UtxoState]): UtxoState = {
val state = stateOpt.getOrElse {
val stateDir = new File(s"${dir.getAbsolutePath}/state")
stateDir.mkdirs()
ErgoState.generateGenesisUtxoState(stateDir, initSettings)._1
}
System.out.println(s"Going to ${if(stateOpt.isEmpty) "generate" else "extend"} chain at " +
s"${dir.getAbsolutePath} starting from ${history.fullBlockHeight}")
endTime = startTime + (blockInterval * length).toMillis
val initBox = history.bestFullBlockOpt.map(_.transactions.last.outputs.head)
val chain = loop(state, initBox, history.bestHeaderOpt, Seq())(history)
history.bestHeaderOpt shouldBe history.bestFullBlockOpt.map(_.header)
history.bestFullBlockOpt.get.id shouldBe chain.last
System.out.println("History was generated successfully")
System.out.println(s"History ${if(stateOpt.isEmpty) "generated" else "extended"} successfully, " +
s"blocks: ${history.fullBlockHeight}")
state
}

@tailrec
Expand All @@ -68,15 +74,15 @@ object ChainGenerator extends ErgoTestHelpers with Matchers {
last: Option[Header],
acc: Seq[ModifierId])(history: ErgoHistory): Seq[ModifierId] = {
val time: Long = last.map(_.timestamp + blockInterval.toMillis).getOrElse(startTime)
if (time < System.currentTimeMillis()) {
if (time < endTime) {
val (txs, lastOut) = genTransactions(last.map(_.height).getOrElse(GenesisHeight),
initBox, state.stateContext)

val candidate = genCandidate(defaultProver.hdPubKeys.head.key, last, time, txs, state)(history)
val block = proveCandidate(candidate.get)

history.append(block.header).get
block.blockSections.foreach(s => if (!history.contains(s)) history.append(s).get)
assert(history.append(block.header).isSuccess)
block.blockSections.foreach(s => if (!history.contains(s)) assert(history.append(s).isSuccess))

val outToPassNext = if (last.isEmpty) {
block.transactions.flatMap(_.outputs).find(_.ergoTree == minerProp)
Expand All @@ -86,8 +92,7 @@ object ChainGenerator extends ErgoTestHelpers with Matchers {

assert(outToPassNext.isDefined)

log.info(
s"Block ${block.id} with ${block.transactions.size} transactions at height ${block.header.height} generated")
System.out.println(s"Block ${block.id} with ${block.transactions.size} transactions at height ${block.header.height} generated")

loop(state.applyModifier(block, None)(_ => ()).get, outToPassNext, Some(block.header), acc :+ block.id)(history)
} else {
Expand Down
Loading

0 comments on commit ad564c9

Please sign in to comment.