From df48b8985ae1fcbc7c51b85f938fcc95add6cc59 Mon Sep 17 00:00:00 2001 From: t-bast Date: Tue, 4 Jul 2023 15:31:18 +0200 Subject: [PATCH] Asynchronously clean up obsolete HTLC info from DB When a channel is closed, we can forget the data from historical HTLCs sent and received through that channel (which is otherwise required to punish cheating attempts by our peer). We previously synchronously removed that data from the DB when the closing transaction confirmed. However, this could create performance issues as the `htlc_infos` table can be very large for busy nodes and many concurrent writes may be happening at the same time. We don't need to get rid of this data immediately: we only want to remove it to avoid degrading the performance of active channels that read and write to the `htlc_infos` table. We now mark channels as closed in a dedicated table, and run a background actor that deletes batches of obsolete htlc data at regular intervals. This ensures that the table is eventually cleaned up, without impacting the performance of active channels. Fixes #2610 and #2702 --- eclair-core/src/main/resources/reference.conf | 8 ++- .../scala/fr/acinq/eclair/NodeParams.scala | 11 +++- .../scala/fr/acinq/eclair/db/ChannelsDb.scala | 2 + .../eclair/db/ClosedChannelsCleaner.scala | 51 ++++++++++++++ .../fr/acinq/eclair/db/DbEventHandler.scala | 9 ++- .../fr/acinq/eclair/db/DualDatabases.scala | 7 +- .../fr/acinq/eclair/db/pg/PgChannelsDb.scala | 40 ++++++++++- .../eclair/db/sqlite/SqliteChannelsDb.scala | 66 ++++++++++++++----- .../scala/fr/acinq/eclair/TestConstants.scala | 7 +- .../fr/acinq/eclair/db/ChannelsDbSpec.scala | 41 ++++++++---- .../eclair/db/ClosedChannelsCleanerSpec.scala | 54 +++++++++++++++ 11 files changed, 253 insertions(+), 43 deletions(-) create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/db/ClosedChannelsCleaner.scala create mode 100644 eclair-core/src/test/scala/fr/acinq/eclair/db/ClosedChannelsCleanerSpec.scala diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 926f566088..dc072aadcb 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -154,7 +154,7 @@ eclair { max-total-pending-channels-private-nodes = 99 // maximum number of pending channels we will accept from all private nodes channel-opener-whitelist = [] // a list of public keys; we will ignore rate limits on pending channels from these peers } -} + } balance-check-interval = 1 hour @@ -444,6 +444,12 @@ eclair { migrate-on-restart = false // migrate sqlite -> postgres on restart (only applies if sqlite is primary) compare-on-restart = false // compare sqlite and postgres dbs on restart (only applies if sqlite is primary) } + closed-channel-cleaner { + // Number of old HTLCs to delete per batch: a higher value will clean up closed channels faster, but may have a higher impact on performance. + batch-size = 50000 + // Frequency at which batches of HTLCs are deleted: a lower value will clean up closed channels faster, but may have a higher impact on performance. + interval = 5 minutes + } } file-backup { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 3eab339cec..2bb8cf0735 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -33,8 +33,8 @@ import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams} import fr.acinq.eclair.router.Announcements.AddressException import fr.acinq.eclair.router.Graph.{HeuristicsConstants, WeightRatios} +import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.router.{Graph, PathFindingExperimentConf} -import fr.acinq.eclair.router.Router.{MessageRouteParams, MultiPartParams, PathFindingConf, RouterConf, SearchBoundaries} import fr.acinq.eclair.tor.Socks5ProxyParams import fr.acinq.eclair.wire.protocol._ import grizzled.slf4j.Logging @@ -85,7 +85,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager, blockchainWatchdogThreshold: Int, blockchainWatchdogSources: Seq[String], onionMessageConfig: OnionMessageConfig, - purgeInvoicesInterval: Option[FiniteDuration]) { + purgeInvoicesInterval: Option[FiniteDuration], + closedChannelsCleanerConfig: ClosedChannelsCleaner.Config) { val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey val nodeId: PublicKey = nodeKeyManager.nodeId @@ -595,7 +596,11 @@ object NodeParams extends Logging { timeout = FiniteDuration(config.getDuration("onion-messages.reply-timeout").getSeconds, TimeUnit.SECONDS), maxAttempts = config.getInt("onion-messages.max-attempts"), ), - purgeInvoicesInterval = purgeInvoicesInterval + purgeInvoicesInterval = purgeInvoicesInterval, + closedChannelsCleanerConfig = ClosedChannelsCleaner.Config( + batchSize = config.getInt("db.closed-channel-cleaner.batch-size"), + interval = FiniteDuration(config.getDuration("db.closed-channel-cleaner.interval").getSeconds, TimeUnit.SECONDS) + ) ) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala index c312c9786d..b978d240cc 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala @@ -32,6 +32,8 @@ trait ChannelsDb { def removeChannel(channelId: ByteVector32): Unit + def removeHtlcInfos(batchSize: Int): Unit + def listLocalChannels(): Seq[PersistentChannelData] def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/ClosedChannelsCleaner.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/ClosedChannelsCleaner.scala new file mode 100644 index 0000000000..a7d7285303 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/ClosedChannelsCleaner.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db + +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors + +import scala.concurrent.duration.FiniteDuration + +/** + * When a channel is closed, we can remove the information about old HTLCs that was stored in the DB to punish revoked commitments. + * We potentially have millions of rows to delete per channel, and there is no rush to remove them. + * We don't want this to negatively impact active channels, so this actor deletes that data in small batches, at regular intervals. + */ +object ClosedChannelsCleaner { + + // @formatter:off + sealed trait Command + private case object DeleteBatch extends Command + // @formatter:on + + case class Config(batchSize: Int, interval: FiniteDuration) + + def apply(db: ChannelsDb, config: Config): Behavior[Command] = { + Behaviors.setup { _ => + Behaviors.withTimers { timers => + timers.startTimerWithFixedDelay(DeleteBatch, config.interval) + Behaviors.receiveMessage { + case DeleteBatch => + db.removeHtlcInfos(config.batchSize) + Behaviors.same + } + } + } + } + +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala index 1b7eea5aab..130b583372 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala @@ -16,6 +16,9 @@ package fr.acinq.eclair.db +import akka.actor.typed.SupervisorStrategy +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter.ClassicActorContextOps import akka.actor.{Actor, DiagnosticActorLogging, Props} import akka.event.Logging.MDC import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey @@ -33,8 +36,10 @@ import fr.acinq.eclair.{Logs, NodeParams} */ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorLogging { - val auditDb: AuditDb = nodeParams.db.audit - val channelsDb: ChannelsDb = nodeParams.db.channels + private val auditDb: AuditDb = nodeParams.db.audit + private val channelsDb: ChannelsDb = nodeParams.db.channels + + context.spawn(Behaviors.supervise(ClosedChannelsCleaner(channelsDb, nodeParams.closedChannelsCleanerConfig)).onFailure(SupervisorStrategy.restart), name = "closed-channel-cleaner") context.system.eventStream.subscribe(self, classOf[PaymentSent]) context.system.eventStream.subscribe(self, classOf[PaymentFailed]) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index baa6e62fcb..27c328dc57 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -11,7 +11,7 @@ import fr.acinq.eclair.payment._ import fr.acinq.eclair.payment.relay.Relayer.RelayFees import fr.acinq.eclair.router.Router import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement} -import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond} +import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli} import grizzled.slf4j.Logging import java.io.File @@ -225,6 +225,11 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch primary.removeChannel(channelId) } + override def removeHtlcInfos(batchSize: Int): Unit = { + runAsync(secondary.removeHtlcInfos(batchSize)) + primary.removeHtlcInfos(batchSize) + } + override def listLocalChannels(): Seq[PersistentChannelData] = { runAsync(secondary.listLocalChannels()) primary.listLocalChannels() diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index a2f852befc..4b361f7540 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -36,7 +36,7 @@ import javax.sql.DataSource object PgChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 8 + val CURRENT_VERSION = 9 } class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb with Logging { @@ -65,7 +65,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate("ALTER TABLE local_channels ALTER COLUMN last_connected_timestamp SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + last_connected_timestamp * interval '1 millisecond'") statement.executeUpdate("ALTER TABLE local_channels ALTER COLUMN closed_timestamp SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + closed_timestamp * interval '1 millisecond'") - statement.executeUpdate("ALTER TABLE htlc_infos ALTER COLUMN commitment_number SET DATA TYPE BIGINT USING commitment_number::BIGINT") + statement.executeUpdate("ALTER TABLE htlc_infos ALTER COLUMN commitment_number SET DATA TYPE BIGINT USING commitment_number::BIGINT") } def migration45(statement: Statement): Unit = { @@ -115,12 +115,17 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local.channels(remote_node_id)") } + def migration89(statement: Statement): Unit = { + statement.executeUpdate("CREATE TABLE local.closed_channels_to_clean_up (channel_id TEXT NOT NULL PRIMARY KEY)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local") statement.executeUpdate("CREATE TABLE local.channels (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, data BYTEA NOT NULL, json JSONB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp TIMESTAMP WITH TIME ZONE, last_payment_sent_timestamp TIMESTAMP WITH TIME ZONE, last_payment_received_timestamp TIMESTAMP WITH TIME ZONE, last_connected_timestamp TIMESTAMP WITH TIME ZONE, closed_timestamp TIMESTAMP WITH TIME ZONE)") statement.executeUpdate("CREATE TABLE local.htlc_infos (channel_id TEXT NOT NULL, commitment_number BIGINT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local.channels(channel_id))") + statement.executeUpdate("CREATE TABLE local.closed_channels_to_clean_up (channel_id TEXT NOT NULL PRIMARY KEY)") statement.executeUpdate("CREATE INDEX local_channels_type_idx ON local.channels ((json->>'type'))") statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local.channels(remote_node_id)") @@ -145,6 +150,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit if (v < 8) { migration78(statement) } + if (v < 9) { + migration89(statement) + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -225,7 +233,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate() } - using(pg.prepareStatement("DELETE FROM local.htlc_infos WHERE channel_id=?")) { statement => + // The htlc_infos may contain millions of rows, which is very expensive to delete synchronously. + // We instead run an asynchronous job to clean up that data in small batches. + using(pg.prepareStatement("INSERT INTO local.closed_channels_to_clean_up VALUES (?) ON CONFLICT DO NOTHING")) { statement => statement.setString(1, channelId.toHex) statement.executeUpdate() } @@ -238,6 +248,30 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit } } + override def removeHtlcInfos(batchSize: Int): Unit = withMetrics("channels/remove-htlc-infos", DbBackends.Postgres) { + withLock { pg => + // Check if there are channels that need to be cleaned up. + val channelId_opt = using(pg.prepareStatement("SELECT channel_id FROM local.closed_channels_to_clean_up LIMIT 1")) { statement => + statement.executeQuery().map(rs => ByteVector32(rs.getByteVector32FromHex("channel_id"))).lastOption + } + // Remove a batch of HTLC information for that channel. + channelId_opt.foreach(channelId => { + val deletedCount = using(pg.prepareStatement(s"DELETE FROM local.htlc_infos WHERE channel_id=? AND commitment_number IN (SELECT commitment_number FROM local.htlc_infos WHERE channel_id=? LIMIT $batchSize)")) { statement => + statement.setString(1, channelId.toHex) + statement.setString(2, channelId.toHex) + statement.executeUpdate() + } + // If we've deleted all HTLC information for that channel, we can now remove it from the DB. + if (deletedCount < batchSize) { + using(pg.prepareStatement("DELETE FROM local.closed_channels_to_clean_up WHERE channel_id=?")) { statement => + statement.setString(1, channelId.toHex) + statement.executeUpdate() + } + } + }) + } + } + override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Postgres) { withLock { pg => using(pg.createStatement) { statement => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index ea07977280..f6bcfc6f26 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -24,7 +24,7 @@ import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec -import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampMilli, TimestampSecond} +import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampMilli} import grizzled.slf4j.Logging import scodec.bits.BitVector @@ -32,7 +32,7 @@ import java.sql.{Connection, Statement} object SqliteChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 4 + val CURRENT_VERSION = 5 } class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { @@ -79,10 +79,15 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { )(logger) } + def migration45(): Unit = { + statement.executeUpdate("CREATE TABLE closed_channels_to_clean_up (channel_id BLOB NOT NULL PRIMARY KEY)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)") statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") + statement.executeUpdate("CREATE TABLE closed_channels_to_clean_up (channel_id BLOB NOT NULL PRIMARY KEY)") statement.executeUpdate("CREATE INDEX htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") case Some(v@(1 | 2 | 3)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") @@ -95,6 +100,9 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { if (v < 4) { migration34() } + if (v < 5) { + migration45() + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -152,7 +160,9 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { statement.executeUpdate() } - using(sqlite.prepareStatement("DELETE FROM htlc_infos WHERE channel_id=?")) { statement => + // The htlc_infos may contain millions of rows, which is very expensive to delete synchronously. + // We instead run an asynchronous job to clean up that data in small batches. + using(sqlite.prepareStatement("INSERT INTO closed_channels_to_clean_up VALUES (?)")) { statement => statement.setBytes(1, channelId.toArray) statement.executeUpdate() } @@ -164,6 +174,28 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { } } + override def removeHtlcInfos(batchSize: Int): Unit = withMetrics("channels/remove-htlc-infos", DbBackends.Sqlite) { + // Check if there are channels that need to be cleaned up. + val channelId_opt = using(sqlite.prepareStatement("SELECT channel_id FROM closed_channels_to_clean_up LIMIT 1")) { statement => + statement.executeQuery().map(rs => ByteVector32(rs.getByteVector32("channel_id"))).lastOption + } + // Remove a batch of HTLC information for that channel. + channelId_opt.foreach(channelId => { + val deletedCount = using(sqlite.prepareStatement(s"DELETE FROM htlc_infos WHERE channel_id=? AND commitment_number IN (SELECT commitment_number FROM htlc_infos WHERE channel_id=? LIMIT $batchSize)")) { statement => + statement.setBytes(1, channelId.toArray) + statement.setBytes(2, channelId.toArray) + statement.executeUpdate() + } + // If we've deleted all HTLC information for that channel, we can now remove it from the DB. + if (deletedCount < batchSize) { + using(sqlite.prepareStatement("DELETE FROM closed_channels_to_clean_up WHERE channel_id=?")) { statement => + statement.setBytes(1, channelId.toArray) + statement.executeUpdate() + } + } + }) + } + override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Sqlite) { using(sqlite.createStatement) { statement => statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=0") @@ -175,21 +207,21 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { val sql = "SELECT data FROM local_channels WHERE is_closed=1 ORDER BY closed_timestamp DESC" remoteNodeId_opt match { - case None => - using(sqlite.prepareStatement(limited(sql, paginated_opt))) { statement => - statement.executeQuery().mapCodec(channelDataCodec).toSeq - } - case Some(nodeId) => - using(sqlite.prepareStatement(sql)) { statement => - val filtered = statement.executeQuery() - .mapCodec(channelDataCodec).filter(_.remoteNodeId == nodeId) - val limited = paginated_opt match { - case None => filtered - case Some(p) => filtered.slice(p.skip, p.skip + p.count) - } - limited.toSeq + case None => + using(sqlite.prepareStatement(limited(sql, paginated_opt))) { statement => + statement.executeQuery().mapCodec(channelDataCodec).toSeq + } + case Some(nodeId) => + using(sqlite.prepareStatement(sql)) { statement => + val filtered = statement.executeQuery() + .mapCodec(channelDataCodec).filter(_.remoteNodeId == nodeId) + val limited = paginated_opt match { + case None => filtered + case Some(p) => filtered.slice(p.skip, p.skip + p.count) } - } + limited.toSeq + } + } } override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info", DbBackends.Sqlite) { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 1ec95cb70e..13052966f7 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -23,6 +23,7 @@ import fr.acinq.eclair.blockchain.fee._ import fr.acinq.eclair.channel.fsm.Channel.{ChannelConf, RemoteRbfLimits, UnhandledExceptionStrategy} import fr.acinq.eclair.channel.{ChannelFlags, LocalParams} import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager} +import fr.acinq.eclair.db.ClosedChannelsCleaner import fr.acinq.eclair.io.MessageRelay.RelayAll import fr.acinq.eclair.io.{OpenChannelInterceptor, PeerConnection} import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig @@ -218,7 +219,8 @@ object TestConstants { timeout = 200 millis, maxAttempts = 2, ), - purgeInvoicesInterval = None + purgeInvoicesInterval = None, + closedChannelsCleanerConfig = ClosedChannelsCleaner.Config(10, 100 millis) ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( @@ -377,7 +379,8 @@ object TestConstants { timeout = 100 millis, maxAttempts = 2, ), - purgeInvoicesInterval = None + purgeInvoicesInterval = None, + closedChannelsCleanerConfig = ClosedChannelsCleaner.Config(10, 100 millis) ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala index 25cd2ef44c..df48402160 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala @@ -18,8 +18,7 @@ package fr.acinq.eclair.db import com.softwaremill.quicklens._ import fr.acinq.bitcoin.scalacompat.ByteVector32 -import fr.acinq.bitcoin.scalacompat.Crypto.PrivateKey -import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey} import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases, migrationCheck} import fr.acinq.eclair.channel.RealScidStatus import fr.acinq.eclair.db.ChannelsDbSpec.{getPgTimestamp, getTimestamp, testCases} @@ -31,7 +30,7 @@ import fr.acinq.eclair.db.sqlite.SqliteChannelsDb import fr.acinq.eclair.db.sqlite.SqliteUtils.ExtendedResultSet._ import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec -import fr.acinq.eclair.{CltvExpiry, RealShortChannelId, TestDatabases, TimestampSecond, randomBytes32, randomKey} +import fr.acinq.eclair.{CltvExpiry, RealShortChannelId, TestDatabases, randomBytes32, randomKey} import org.scalatest.funsuite.AnyFunSuite import scodec.bits.ByteVector @@ -65,15 +64,16 @@ class ChannelsDbSpec extends AnyFunSuite { val channel2a = ChannelCodecsSpec.normal.modify(_.commitments.params.channelId).setTo(randomBytes32()) val channel2b = channel2a.modify(_.shortIds.real).setTo(RealScidStatus.Final(RealShortChannelId(189371))) - val commitNumber = 42 + val commitNumber1 = 42 + val commitNumber2 = 43 val paymentHash1 = ByteVector32.Zeroes val cltvExpiry1 = CltvExpiry(123) val paymentHash2 = ByteVector32(ByteVector.fill(32)(1)) val cltvExpiry2 = CltvExpiry(656) - intercept[SQLException](db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash1, cltvExpiry1)) // no related channel + intercept[SQLException](db.addHtlcInfo(channel1.channelId, commitNumber1, paymentHash1, cltvExpiry1)) // no related channel - assert(db.listLocalChannels().toSet == Set.empty) + assert(db.listLocalChannels().isEmpty) db.addOrUpdateChannel(channel1) db.addOrUpdateChannel(channel1) assert(db.listLocalChannels() == List(channel1)) @@ -85,11 +85,13 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listLocalChannels() == List(channel1, channel2b)) assert(db.getChannel(channel2b.channelId).contains(channel2b)) - assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList == Nil) - db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash1, cltvExpiry1) - db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash2, cltvExpiry2) - assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList.toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) - assert(db.listHtlcInfos(channel1.channelId, 43).toList == Nil) + assert(db.listHtlcInfos(channel1.channelId, commitNumber1).isEmpty) + db.addHtlcInfo(channel1.channelId, commitNumber1, paymentHash1, cltvExpiry1) + db.addHtlcInfo(channel1.channelId, commitNumber1, paymentHash2, cltvExpiry2) + db.addHtlcInfo(channel1.channelId, commitNumber2, paymentHash1, cltvExpiry1) + assert(db.listHtlcInfos(channel1.channelId, commitNumber1).toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) + assert(db.listHtlcInfos(channel1.channelId, commitNumber2).toSet == Set((paymentHash1, cltvExpiry1))) + assert(db.listHtlcInfos(channel1.channelId, 44).isEmpty) assert(db.listClosedChannels(None, None).isEmpty) db.removeChannel(channel1.channelId) @@ -97,11 +99,22 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listLocalChannels() == List(channel2b)) assert(db.listClosedChannels(None, None) == List(channel1)) assert(db.listClosedChannels(Some(channel1.remoteNodeId), None) == List(channel1)) - assert(db.listClosedChannels(Some(PrivateKey(randomBytes32()).publicKey), None) == Nil) - assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList == Nil) + assert(db.listClosedChannels(Some(PrivateKey(randomBytes32()).publicKey), None).isEmpty) + + // HTLC info is cleaned up asynchronously. + assert(db.listHtlcInfos(channel1.channelId, commitNumber1).toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) + assert(db.listHtlcInfos(channel1.channelId, commitNumber2).toSet == Set((paymentHash1, cltvExpiry1))) + db.removeHtlcInfos(1) // remove one of the commitment number (ordered not deterministic) + val remainingHtlcInfos = Seq(commitNumber1, commitNumber2).flatMap(commitNumber => db.listHtlcInfos(channel1.channelId, commitNumber)) + assert(remainingHtlcInfos.nonEmpty) + db.removeHtlcInfos(1) // remove the remaining commitment number + assert(db.listHtlcInfos(channel1.channelId, commitNumber1).isEmpty) + assert(db.listHtlcInfos(channel1.channelId, commitNumber2).isEmpty) + db.removeHtlcInfos(1) // noop + db.removeChannel(channel2b.channelId) assert(db.getChannel(channel2b.channelId).isEmpty) - assert(db.listLocalChannels() == Nil) + assert(db.listLocalChannels().isEmpty) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/ClosedChannelsCleanerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/ClosedChannelsCleanerSpec.scala new file mode 100644 index 0000000000..8e7e1dcc4d --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/ClosedChannelsCleanerSpec.scala @@ -0,0 +1,54 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db + +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import com.softwaremill.quicklens.ModifyPimp +import com.typesafe.config.ConfigFactory +import fr.acinq.eclair.TestDatabases.TestSqliteDatabases +import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec +import fr.acinq.eclair.{CltvExpiry, randomBytes32} +import org.scalatest.funsuite.AnyFunSuiteLike + +import scala.concurrent.duration.DurationInt + +class ClosedChannelsCleanerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike { + + test("clean closed channels at regular intervals") { + val channelsDb = TestSqliteDatabases().channels + + val channelId = randomBytes32() + channelsDb.addOrUpdateChannel(ChannelCodecsSpec.normal.modify(_.commitments.params.channelId).setTo(channelId)) + channelsDb.addHtlcInfo(channelId, 17, randomBytes32(), CltvExpiry(561)) + channelsDb.addHtlcInfo(channelId, 19, randomBytes32(), CltvExpiry(1105)) + channelsDb.addHtlcInfo(channelId, 23, randomBytes32(), CltvExpiry(1729)) + channelsDb.removeChannel(channelId) + assert(channelsDb.listHtlcInfos(channelId, 17).nonEmpty) + assert(channelsDb.listHtlcInfos(channelId, 19).nonEmpty) + assert(channelsDb.listHtlcInfos(channelId, 23).nonEmpty) + + val config = ClosedChannelsCleaner.Config(batchSize = 1, interval = 10 millis) + testKit.spawn(ClosedChannelsCleaner(channelsDb, config)) + + eventually { + assert(channelsDb.listHtlcInfos(channelId, 17).isEmpty) + assert(channelsDb.listHtlcInfos(channelId, 19).isEmpty) + assert(channelsDb.listHtlcInfos(channelId, 23).isEmpty) + } + } + +}