Skip to content

Commit

Permalink
Small refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
t-bast committed Feb 14, 2024
1 parent bc07548 commit ce50751
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ trait CommonFundingHandlers extends CommonHandlers {
// previous funding transaction. Our peer cannot publish the corresponding revoked commitments anymore, so we can
// clean-up the htlc data that we were storing for the matching penalty transactions.
d.commitments.all.find(_.fundingTxId == w.tx.txid).map(_.firstRemoteCommitIndex).foreach {
commitIndex => context.system.eventStream.publish(RevokedHtlcInfoCleaner.ForgetHtlcInfos(d.channelId, commitIndex))
commitIndex => context.system.eventStream.publish(RevokedHtlcInfoCleaner.ForgetHtlcInfos(d.channelId, beforeCommitIndex = commitIndex))
}
d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus).map {
case (commitments1, commitment) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ trait ChannelsDb {
def removeChannel(channelId: ByteVector32): Unit

/** Mark revoked HTLC information as obsolete. It will be removed from the DB once [[removeHtlcInfos]] is called. */
def forgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long): Unit
def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit

/** Remove up to batchSize obsolete revoked HTLC information. */
def removeHtlcInfos(batchSize: Int): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch
primary.removeChannel(channelId)
}

override def forgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long): Unit = {
runAsync(secondary.forgetHtlcInfos(channelId, beforeCommitIndex))
primary.forgetHtlcInfos(channelId, beforeCommitIndex)
override def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit = {
runAsync(secondary.markHtlcInfosForRemoval(channelId, beforeCommitIndex))
primary.markHtlcInfosForRemoval(channelId, beforeCommitIndex)
}

override def removeHtlcInfos(batchSize: Int): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object RevokedHtlcInfoCleaner {
timers.startTimerWithFixedDelay(DeleteBatch, config.interval)
Behaviors.receiveMessage {
case ForgetHtlcInfos(channelId, beforeCommitIndex) =>
db.forgetHtlcInfos(channelId, beforeCommitIndex)
db.markHtlcInfosForRemoval(channelId, beforeCommitIndex)
Behaviors.same
case DeleteBatch =>
db.removeHtlcInfos(config.batchSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit

// The htlc_infos may contain millions of rows, which is very expensive to delete synchronously.
// We instead run an asynchronous job to clean up that data in small batches.
forgetHtlcInfos(channelId, Long.MaxValue)
markHtlcInfosForRemoval(channelId, Long.MaxValue)

using(pg.prepareStatement("UPDATE local.channels SET is_closed=TRUE, closed_timestamp=? WHERE channel_id=?")) { statement =>
statement.setTimestamp(1, Timestamp.from(Instant.now()))
Expand All @@ -245,7 +245,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
}
}

override def forgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long): Unit = withMetrics("channels/forget-htlc-infos", DbBackends.Postgres) {
override def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit = withMetrics("channels/forget-htlc-infos", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("INSERT INTO local.htlc_infos_to_remove (channel_id, before_commitment_number) VALUES(?, ?) ON CONFLICT (channel_id) DO UPDATE SET before_commitment_number = EXCLUDED.before_commitment_number")) { statement =>
statement.setString(1, channelId.toHex)
Expand Down Expand Up @@ -273,6 +273,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
statement.setLong(3, beforeCommitmentNumber)
statement.executeUpdate()
}
logger.info(s"deleted $deletedCount rows from htlc_infos for channelId=$channelId beforeCommitmentNumber=$beforeCommitmentNumber")
// If we've deleted all HTLC information for that channel, we can now remove it from the DB.
if (deletedCount < batchSize) {
using(pg.prepareStatement("DELETE FROM local.htlc_infos_to_remove WHERE channel_id=?")) { statement =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging {

// The htlc_infos may contain millions of rows, which is very expensive to delete synchronously.
// We instead run an asynchronous job to clean up that data in small batches.
forgetHtlcInfos(channelId, Long.MaxValue)
markHtlcInfosForRemoval(channelId, Long.MaxValue)

using(sqlite.prepareStatement("UPDATE local_channels SET is_closed=1, closed_timestamp=? WHERE channel_id=?")) { statement =>
statement.setLong(1, TimestampMilli.now().toLong)
Expand All @@ -171,7 +171,7 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging {
}
}

override def forgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long): Unit = withMetrics("channels/forget-htlc-infos", DbBackends.Sqlite) {
override def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit = withMetrics("channels/forget-htlc-infos", DbBackends.Sqlite) {
using(sqlite.prepareStatement("UPDATE htlc_infos_to_remove SET before_commitment_number=? WHERE channel_id=?")) { update =>
update.setLong(1, beforeCommitIndex)
update.setBytes(2, channelId.toArray)
Expand Down Expand Up @@ -202,6 +202,7 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging {
statement.setLong(3, beforeCommitmentNumber)
statement.executeUpdate()
}
logger.info(s"deleted $deletedCount rows from htlc_infos for channelId=$channelId beforeCommitmentNumber=$beforeCommitmentNumber")
// If we've deleted all HTLC information for that channel, we can now remove it from the DB.
if (deletedCount < batchSize) {
using(sqlite.prepareStatement("DELETE FROM htlc_infos_to_remove WHERE channel_id=?")) { statement =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class ChannelsDbSpec extends AnyFunSuite {
db.addHtlcInfo(channel1.channelId, 49, randomBytes32(), CltvExpiry(561))
db.addHtlcInfo(channel1.channelId, 50, randomBytes32(), CltvExpiry(561))
db.addHtlcInfo(channel1.channelId, 50, randomBytes32(), CltvExpiry(561))
db.forgetHtlcInfos(channel1.channelId, commitNumberSplice1)
db.markHtlcInfosForRemoval(channel1.channelId, commitNumberSplice1)
db.addHtlcInfo(channel1.channelId, 51, randomBytes32(), CltvExpiry(561))
db.addHtlcInfo(channel1.channelId, 52, randomBytes32(), CltvExpiry(561))
db.removeChannel(channel1.channelId)
Expand All @@ -129,11 +129,11 @@ class ChannelsDbSpec extends AnyFunSuite {
db.addHtlcInfo(channel2.channelId, 48, randomBytes32(), CltvExpiry(561))
db.addHtlcInfo(channel2.channelId, 49, randomBytes32(), CltvExpiry(561))
db.addHtlcInfo(channel2.channelId, 50, randomBytes32(), CltvExpiry(561))
db.forgetHtlcInfos(channel2.channelId, commitNumberSplice1)
db.markHtlcInfosForRemoval(channel2.channelId, commitNumberSplice1)
db.addHtlcInfo(channel2.channelId, 74, randomBytes32(), CltvExpiry(561))
db.addHtlcInfo(channel2.channelId, 75, randomBytes32(), CltvExpiry(561))
db.addHtlcInfo(channel2.channelId, 76, randomBytes32(), CltvExpiry(561))
db.forgetHtlcInfos(channel2.channelId, commitNumberSplice2)
db.markHtlcInfosForRemoval(channel2.channelId, commitNumberSplice2)

// We asynchronously clean-up the HTLC data from the DB in small batches.
val obsoleteHtlcInfo = Seq(
Expand Down

0 comments on commit ce50751

Please sign in to comment.