diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 926f566088..c9de3593a9 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -154,7 +154,7 @@ eclair { max-total-pending-channels-private-nodes = 99 // maximum number of pending channels we will accept from all private nodes channel-opener-whitelist = [] // a list of public keys; we will ignore rate limits on pending channels from these peers } -} + } balance-check-interval = 1 hour @@ -444,6 +444,12 @@ eclair { migrate-on-restart = false // migrate sqlite -> postgres on restart (only applies if sqlite is primary) compare-on-restart = false // compare sqlite and postgres dbs on restart (only applies if sqlite is primary) } + closed-channels-cleaner { + // Number of old HTLCs to delete per batch: a higher value will clean up closed channels faster, but may have a higher impact on performance. + batch-size = 50000 + // Frequency at which batches of HTLCs are deleted: a lower value will clean up closed channels faster, but may have a higher impact on performance. + interval = 5 minutes + } } file-backup { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 3eab339cec..4ca91db070 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -33,8 +33,8 @@ import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig import fr.acinq.eclair.payment.relay.Relayer.{AsyncPaymentsParams, RelayFees, RelayParams} import fr.acinq.eclair.router.Announcements.AddressException import fr.acinq.eclair.router.Graph.{HeuristicsConstants, WeightRatios} +import fr.acinq.eclair.router.Router._ import fr.acinq.eclair.router.{Graph, PathFindingExperimentConf} -import fr.acinq.eclair.router.Router.{MessageRouteParams, MultiPartParams, PathFindingConf, RouterConf, SearchBoundaries} import fr.acinq.eclair.tor.Socks5ProxyParams import fr.acinq.eclair.wire.protocol._ import grizzled.slf4j.Logging @@ -85,7 +85,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager, blockchainWatchdogThreshold: Int, blockchainWatchdogSources: Seq[String], onionMessageConfig: OnionMessageConfig, - purgeInvoicesInterval: Option[FiniteDuration]) { + purgeInvoicesInterval: Option[FiniteDuration], + closedChannelsCleanerConfig: ClosedChannelsCleaner.Config) { val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey val nodeId: PublicKey = nodeKeyManager.nodeId @@ -595,7 +596,11 @@ object NodeParams extends Logging { timeout = FiniteDuration(config.getDuration("onion-messages.reply-timeout").getSeconds, TimeUnit.SECONDS), maxAttempts = config.getInt("onion-messages.max-attempts"), ), - purgeInvoicesInterval = purgeInvoicesInterval + purgeInvoicesInterval = purgeInvoicesInterval, + closedChannelsCleanerConfig = ClosedChannelsCleaner.Config( + batchSize = config.getInt("db.closed-channels-cleaner.batch-size"), + interval = FiniteDuration(config.getDuration("db.closed-channels-cleaner.interval").getSeconds, TimeUnit.SECONDS) + ) ) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala index c312c9786d..b978d240cc 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala @@ -32,6 +32,8 @@ trait ChannelsDb { def removeChannel(channelId: ByteVector32): Unit + def removeHtlcInfos(batchSize: Int): Unit + def listLocalChannels(): Seq[PersistentChannelData] def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/ClosedChannelsCleaner.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/ClosedChannelsCleaner.scala new file mode 100644 index 0000000000..a7d7285303 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/ClosedChannelsCleaner.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db + +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors + +import scala.concurrent.duration.FiniteDuration + +/** + * When a channel is closed, we can remove the information about old HTLCs that was stored in the DB to punish revoked commitments. + * We potentially have millions of rows to delete per channel, and there is no rush to remove them. + * We don't want this to negatively impact active channels, so this actor deletes that data in small batches, at regular intervals. + */ +object ClosedChannelsCleaner { + + // @formatter:off + sealed trait Command + private case object DeleteBatch extends Command + // @formatter:on + + case class Config(batchSize: Int, interval: FiniteDuration) + + def apply(db: ChannelsDb, config: Config): Behavior[Command] = { + Behaviors.setup { _ => + Behaviors.withTimers { timers => + timers.startTimerWithFixedDelay(DeleteBatch, config.interval) + Behaviors.receiveMessage { + case DeleteBatch => + db.removeHtlcInfos(config.batchSize) + Behaviors.same + } + } + } + } + +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala index 1b7eea5aab..916254c2a9 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala @@ -16,6 +16,9 @@ package fr.acinq.eclair.db +import akka.actor.typed.SupervisorStrategy +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter.ClassicActorContextOps import akka.actor.{Actor, DiagnosticActorLogging, Props} import akka.event.Logging.MDC import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey @@ -33,8 +36,10 @@ import fr.acinq.eclair.{Logs, NodeParams} */ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorLogging { - val auditDb: AuditDb = nodeParams.db.audit - val channelsDb: ChannelsDb = nodeParams.db.channels + private val auditDb: AuditDb = nodeParams.db.audit + private val channelsDb: ChannelsDb = nodeParams.db.channels + + context.spawn(Behaviors.supervise(ClosedChannelsCleaner(channelsDb, nodeParams.closedChannelsCleanerConfig)).onFailure(SupervisorStrategy.restart), name = "closed-channels-cleaner") context.system.eventStream.subscribe(self, classOf[PaymentSent]) context.system.eventStream.subscribe(self, classOf[PaymentFailed]) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index baa6e62fcb..27c328dc57 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -11,7 +11,7 @@ import fr.acinq.eclair.payment._ import fr.acinq.eclair.payment.relay.Relayer.RelayFees import fr.acinq.eclair.router.Router import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement} -import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond} +import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli} import grizzled.slf4j.Logging import java.io.File @@ -225,6 +225,11 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch primary.removeChannel(channelId) } + override def removeHtlcInfos(batchSize: Int): Unit = { + runAsync(secondary.removeHtlcInfos(batchSize)) + primary.removeHtlcInfos(batchSize) + } + override def listLocalChannels(): Seq[PersistentChannelData] = { runAsync(secondary.listLocalChannels()) primary.listLocalChannels() diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index a2f852befc..4b361f7540 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -36,7 +36,7 @@ import javax.sql.DataSource object PgChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 8 + val CURRENT_VERSION = 9 } class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb with Logging { @@ -65,7 +65,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate("ALTER TABLE local_channels ALTER COLUMN last_connected_timestamp SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + last_connected_timestamp * interval '1 millisecond'") statement.executeUpdate("ALTER TABLE local_channels ALTER COLUMN closed_timestamp SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + closed_timestamp * interval '1 millisecond'") - statement.executeUpdate("ALTER TABLE htlc_infos ALTER COLUMN commitment_number SET DATA TYPE BIGINT USING commitment_number::BIGINT") + statement.executeUpdate("ALTER TABLE htlc_infos ALTER COLUMN commitment_number SET DATA TYPE BIGINT USING commitment_number::BIGINT") } def migration45(statement: Statement): Unit = { @@ -115,12 +115,17 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local.channels(remote_node_id)") } + def migration89(statement: Statement): Unit = { + statement.executeUpdate("CREATE TABLE local.closed_channels_to_clean_up (channel_id TEXT NOT NULL PRIMARY KEY)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local") statement.executeUpdate("CREATE TABLE local.channels (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, data BYTEA NOT NULL, json JSONB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp TIMESTAMP WITH TIME ZONE, last_payment_sent_timestamp TIMESTAMP WITH TIME ZONE, last_payment_received_timestamp TIMESTAMP WITH TIME ZONE, last_connected_timestamp TIMESTAMP WITH TIME ZONE, closed_timestamp TIMESTAMP WITH TIME ZONE)") statement.executeUpdate("CREATE TABLE local.htlc_infos (channel_id TEXT NOT NULL, commitment_number BIGINT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local.channels(channel_id))") + statement.executeUpdate("CREATE TABLE local.closed_channels_to_clean_up (channel_id TEXT NOT NULL PRIMARY KEY)") statement.executeUpdate("CREATE INDEX local_channels_type_idx ON local.channels ((json->>'type'))") statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local.channels(remote_node_id)") @@ -145,6 +150,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit if (v < 8) { migration78(statement) } + if (v < 9) { + migration89(statement) + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -225,7 +233,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate() } - using(pg.prepareStatement("DELETE FROM local.htlc_infos WHERE channel_id=?")) { statement => + // The htlc_infos may contain millions of rows, which is very expensive to delete synchronously. + // We instead run an asynchronous job to clean up that data in small batches. + using(pg.prepareStatement("INSERT INTO local.closed_channels_to_clean_up VALUES (?) ON CONFLICT DO NOTHING")) { statement => statement.setString(1, channelId.toHex) statement.executeUpdate() } @@ -238,6 +248,30 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit } } + override def removeHtlcInfos(batchSize: Int): Unit = withMetrics("channels/remove-htlc-infos", DbBackends.Postgres) { + withLock { pg => + // Check if there are channels that need to be cleaned up. + val channelId_opt = using(pg.prepareStatement("SELECT channel_id FROM local.closed_channels_to_clean_up LIMIT 1")) { statement => + statement.executeQuery().map(rs => ByteVector32(rs.getByteVector32FromHex("channel_id"))).lastOption + } + // Remove a batch of HTLC information for that channel. + channelId_opt.foreach(channelId => { + val deletedCount = using(pg.prepareStatement(s"DELETE FROM local.htlc_infos WHERE channel_id=? AND commitment_number IN (SELECT commitment_number FROM local.htlc_infos WHERE channel_id=? LIMIT $batchSize)")) { statement => + statement.setString(1, channelId.toHex) + statement.setString(2, channelId.toHex) + statement.executeUpdate() + } + // If we've deleted all HTLC information for that channel, we can now remove it from the DB. + if (deletedCount < batchSize) { + using(pg.prepareStatement("DELETE FROM local.closed_channels_to_clean_up WHERE channel_id=?")) { statement => + statement.setString(1, channelId.toHex) + statement.executeUpdate() + } + } + }) + } + } + override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Postgres) { withLock { pg => using(pg.createStatement) { statement => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index ea07977280..f6bcfc6f26 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -24,7 +24,7 @@ import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec -import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampMilli, TimestampSecond} +import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampMilli} import grizzled.slf4j.Logging import scodec.bits.BitVector @@ -32,7 +32,7 @@ import java.sql.{Connection, Statement} object SqliteChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 4 + val CURRENT_VERSION = 5 } class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { @@ -79,10 +79,15 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { )(logger) } + def migration45(): Unit = { + statement.executeUpdate("CREATE TABLE closed_channels_to_clean_up (channel_id BLOB NOT NULL PRIMARY KEY)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)") statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") + statement.executeUpdate("CREATE TABLE closed_channels_to_clean_up (channel_id BLOB NOT NULL PRIMARY KEY)") statement.executeUpdate("CREATE INDEX htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") case Some(v@(1 | 2 | 3)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") @@ -95,6 +100,9 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { if (v < 4) { migration34() } + if (v < 5) { + migration45() + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -152,7 +160,9 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { statement.executeUpdate() } - using(sqlite.prepareStatement("DELETE FROM htlc_infos WHERE channel_id=?")) { statement => + // The htlc_infos may contain millions of rows, which is very expensive to delete synchronously. + // We instead run an asynchronous job to clean up that data in small batches. + using(sqlite.prepareStatement("INSERT INTO closed_channels_to_clean_up VALUES (?)")) { statement => statement.setBytes(1, channelId.toArray) statement.executeUpdate() } @@ -164,6 +174,28 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { } } + override def removeHtlcInfos(batchSize: Int): Unit = withMetrics("channels/remove-htlc-infos", DbBackends.Sqlite) { + // Check if there are channels that need to be cleaned up. + val channelId_opt = using(sqlite.prepareStatement("SELECT channel_id FROM closed_channels_to_clean_up LIMIT 1")) { statement => + statement.executeQuery().map(rs => ByteVector32(rs.getByteVector32("channel_id"))).lastOption + } + // Remove a batch of HTLC information for that channel. + channelId_opt.foreach(channelId => { + val deletedCount = using(sqlite.prepareStatement(s"DELETE FROM htlc_infos WHERE channel_id=? AND commitment_number IN (SELECT commitment_number FROM htlc_infos WHERE channel_id=? LIMIT $batchSize)")) { statement => + statement.setBytes(1, channelId.toArray) + statement.setBytes(2, channelId.toArray) + statement.executeUpdate() + } + // If we've deleted all HTLC information for that channel, we can now remove it from the DB. + if (deletedCount < batchSize) { + using(sqlite.prepareStatement("DELETE FROM closed_channels_to_clean_up WHERE channel_id=?")) { statement => + statement.setBytes(1, channelId.toArray) + statement.executeUpdate() + } + } + }) + } + override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Sqlite) { using(sqlite.createStatement) { statement => statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=0") @@ -175,21 +207,21 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { val sql = "SELECT data FROM local_channels WHERE is_closed=1 ORDER BY closed_timestamp DESC" remoteNodeId_opt match { - case None => - using(sqlite.prepareStatement(limited(sql, paginated_opt))) { statement => - statement.executeQuery().mapCodec(channelDataCodec).toSeq - } - case Some(nodeId) => - using(sqlite.prepareStatement(sql)) { statement => - val filtered = statement.executeQuery() - .mapCodec(channelDataCodec).filter(_.remoteNodeId == nodeId) - val limited = paginated_opt match { - case None => filtered - case Some(p) => filtered.slice(p.skip, p.skip + p.count) - } - limited.toSeq + case None => + using(sqlite.prepareStatement(limited(sql, paginated_opt))) { statement => + statement.executeQuery().mapCodec(channelDataCodec).toSeq + } + case Some(nodeId) => + using(sqlite.prepareStatement(sql)) { statement => + val filtered = statement.executeQuery() + .mapCodec(channelDataCodec).filter(_.remoteNodeId == nodeId) + val limited = paginated_opt match { + case None => filtered + case Some(p) => filtered.slice(p.skip, p.skip + p.count) } - } + limited.toSeq + } + } } override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info", DbBackends.Sqlite) { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 1ec95cb70e..13052966f7 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -23,6 +23,7 @@ import fr.acinq.eclair.blockchain.fee._ import fr.acinq.eclair.channel.fsm.Channel.{ChannelConf, RemoteRbfLimits, UnhandledExceptionStrategy} import fr.acinq.eclair.channel.{ChannelFlags, LocalParams} import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager} +import fr.acinq.eclair.db.ClosedChannelsCleaner import fr.acinq.eclair.io.MessageRelay.RelayAll import fr.acinq.eclair.io.{OpenChannelInterceptor, PeerConnection} import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig @@ -218,7 +219,8 @@ object TestConstants { timeout = 200 millis, maxAttempts = 2, ), - purgeInvoicesInterval = None + purgeInvoicesInterval = None, + closedChannelsCleanerConfig = ClosedChannelsCleaner.Config(10, 100 millis) ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( @@ -377,7 +379,8 @@ object TestConstants { timeout = 100 millis, maxAttempts = 2, ), - purgeInvoicesInterval = None + purgeInvoicesInterval = None, + closedChannelsCleanerConfig = ClosedChannelsCleaner.Config(10, 100 millis) ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala index 25cd2ef44c..df48402160 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala @@ -18,8 +18,7 @@ package fr.acinq.eclair.db import com.softwaremill.quicklens._ import fr.acinq.bitcoin.scalacompat.ByteVector32 -import fr.acinq.bitcoin.scalacompat.Crypto.PrivateKey -import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey} import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases, migrationCheck} import fr.acinq.eclair.channel.RealScidStatus import fr.acinq.eclair.db.ChannelsDbSpec.{getPgTimestamp, getTimestamp, testCases} @@ -31,7 +30,7 @@ import fr.acinq.eclair.db.sqlite.SqliteChannelsDb import fr.acinq.eclair.db.sqlite.SqliteUtils.ExtendedResultSet._ import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec -import fr.acinq.eclair.{CltvExpiry, RealShortChannelId, TestDatabases, TimestampSecond, randomBytes32, randomKey} +import fr.acinq.eclair.{CltvExpiry, RealShortChannelId, TestDatabases, randomBytes32, randomKey} import org.scalatest.funsuite.AnyFunSuite import scodec.bits.ByteVector @@ -65,15 +64,16 @@ class ChannelsDbSpec extends AnyFunSuite { val channel2a = ChannelCodecsSpec.normal.modify(_.commitments.params.channelId).setTo(randomBytes32()) val channel2b = channel2a.modify(_.shortIds.real).setTo(RealScidStatus.Final(RealShortChannelId(189371))) - val commitNumber = 42 + val commitNumber1 = 42 + val commitNumber2 = 43 val paymentHash1 = ByteVector32.Zeroes val cltvExpiry1 = CltvExpiry(123) val paymentHash2 = ByteVector32(ByteVector.fill(32)(1)) val cltvExpiry2 = CltvExpiry(656) - intercept[SQLException](db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash1, cltvExpiry1)) // no related channel + intercept[SQLException](db.addHtlcInfo(channel1.channelId, commitNumber1, paymentHash1, cltvExpiry1)) // no related channel - assert(db.listLocalChannels().toSet == Set.empty) + assert(db.listLocalChannels().isEmpty) db.addOrUpdateChannel(channel1) db.addOrUpdateChannel(channel1) assert(db.listLocalChannels() == List(channel1)) @@ -85,11 +85,13 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listLocalChannels() == List(channel1, channel2b)) assert(db.getChannel(channel2b.channelId).contains(channel2b)) - assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList == Nil) - db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash1, cltvExpiry1) - db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash2, cltvExpiry2) - assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList.toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) - assert(db.listHtlcInfos(channel1.channelId, 43).toList == Nil) + assert(db.listHtlcInfos(channel1.channelId, commitNumber1).isEmpty) + db.addHtlcInfo(channel1.channelId, commitNumber1, paymentHash1, cltvExpiry1) + db.addHtlcInfo(channel1.channelId, commitNumber1, paymentHash2, cltvExpiry2) + db.addHtlcInfo(channel1.channelId, commitNumber2, paymentHash1, cltvExpiry1) + assert(db.listHtlcInfos(channel1.channelId, commitNumber1).toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) + assert(db.listHtlcInfos(channel1.channelId, commitNumber2).toSet == Set((paymentHash1, cltvExpiry1))) + assert(db.listHtlcInfos(channel1.channelId, 44).isEmpty) assert(db.listClosedChannels(None, None).isEmpty) db.removeChannel(channel1.channelId) @@ -97,11 +99,22 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listLocalChannels() == List(channel2b)) assert(db.listClosedChannels(None, None) == List(channel1)) assert(db.listClosedChannels(Some(channel1.remoteNodeId), None) == List(channel1)) - assert(db.listClosedChannels(Some(PrivateKey(randomBytes32()).publicKey), None) == Nil) - assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList == Nil) + assert(db.listClosedChannels(Some(PrivateKey(randomBytes32()).publicKey), None).isEmpty) + + // HTLC info is cleaned up asynchronously. + assert(db.listHtlcInfos(channel1.channelId, commitNumber1).toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) + assert(db.listHtlcInfos(channel1.channelId, commitNumber2).toSet == Set((paymentHash1, cltvExpiry1))) + db.removeHtlcInfos(1) // remove one of the commitment number (ordered not deterministic) + val remainingHtlcInfos = Seq(commitNumber1, commitNumber2).flatMap(commitNumber => db.listHtlcInfos(channel1.channelId, commitNumber)) + assert(remainingHtlcInfos.nonEmpty) + db.removeHtlcInfos(1) // remove the remaining commitment number + assert(db.listHtlcInfos(channel1.channelId, commitNumber1).isEmpty) + assert(db.listHtlcInfos(channel1.channelId, commitNumber2).isEmpty) + db.removeHtlcInfos(1) // noop + db.removeChannel(channel2b.channelId) assert(db.getChannel(channel2b.channelId).isEmpty) - assert(db.listLocalChannels() == Nil) + assert(db.listLocalChannels().isEmpty) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/ClosedChannelsCleanerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/ClosedChannelsCleanerSpec.scala new file mode 100644 index 0000000000..8e7e1dcc4d --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/ClosedChannelsCleanerSpec.scala @@ -0,0 +1,54 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db + +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import com.softwaremill.quicklens.ModifyPimp +import com.typesafe.config.ConfigFactory +import fr.acinq.eclair.TestDatabases.TestSqliteDatabases +import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec +import fr.acinq.eclair.{CltvExpiry, randomBytes32} +import org.scalatest.funsuite.AnyFunSuiteLike + +import scala.concurrent.duration.DurationInt + +class ClosedChannelsCleanerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike { + + test("clean closed channels at regular intervals") { + val channelsDb = TestSqliteDatabases().channels + + val channelId = randomBytes32() + channelsDb.addOrUpdateChannel(ChannelCodecsSpec.normal.modify(_.commitments.params.channelId).setTo(channelId)) + channelsDb.addHtlcInfo(channelId, 17, randomBytes32(), CltvExpiry(561)) + channelsDb.addHtlcInfo(channelId, 19, randomBytes32(), CltvExpiry(1105)) + channelsDb.addHtlcInfo(channelId, 23, randomBytes32(), CltvExpiry(1729)) + channelsDb.removeChannel(channelId) + assert(channelsDb.listHtlcInfos(channelId, 17).nonEmpty) + assert(channelsDb.listHtlcInfos(channelId, 19).nonEmpty) + assert(channelsDb.listHtlcInfos(channelId, 23).nonEmpty) + + val config = ClosedChannelsCleaner.Config(batchSize = 1, interval = 10 millis) + testKit.spawn(ClosedChannelsCleaner(channelsDb, config)) + + eventually { + assert(channelsDb.listHtlcInfos(channelId, 17).isEmpty) + assert(channelsDb.listHtlcInfos(channelId, 19).isEmpty) + assert(channelsDb.listHtlcInfos(channelId, 23).isEmpty) + } + } + +}