From c3b09e0fde39e22c590d751c8c593f693778fb15 Mon Sep 17 00:00:00 2001 From: pm47 Date: Fri, 16 Jul 2021 11:03:36 +0200 Subject: [PATCH] fetch incoming payments in parallel This is a simpler approach to completely parallelizing the handling of payments, where we simply parallelize the fetch from the database. This brings a ~30% performance improvement in performance in `PerformanceIntegrationSpec`. --- .../payment/receive/MultiPartHandler.scala | 26 ++++++++++++++++--- .../eclair/payment/MultiPartHandlerSpec.scala | 17 +++++++----- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/receive/MultiPartHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/receive/MultiPartHandler.scala index 8d32bbd7a4..29b2d27152 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/receive/MultiPartHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/receive/MultiPartHandler.scala @@ -31,7 +31,6 @@ import fr.acinq.eclair.payment.{IncomingPacket, PaymentReceived, PaymentRequest} import fr.acinq.eclair.wire.protocol._ import fr.acinq.eclair.{Features, Logs, MilliSatoshi, NodeParams, randomBytes32} -import java.util.UUID import scala.util.{Failure, Success, Try} /** @@ -57,12 +56,16 @@ class MultiPartHandler(nodeParams: NodeParams, register: ActorRef, db: IncomingP override def handle(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Receive = { case receivePayment: ReceivePayment => - val child = ctx.spawn(CreateInvoiceActor(nodeParams), name = UUID.randomUUID().toString) + val child = ctx.spawnAnonymous(CreateInvoiceActor(nodeParams)) child ! CreateInvoiceActor.CreatePaymentRequest(ctx.sender(), receivePayment) case p: IncomingPacket.FinalPacket if doHandle(p.add.paymentHash) => + val child = ctx.spawnAnonymous(GetIncomingPaymentActor(nodeParams)) + child ! GetIncomingPaymentActor.GetIncomingPayment(ctx.self, p) + + case ProcessPacket(p, payment_opt) if doHandle(p.add.paymentHash) => Logs.withMdc(log)(Logs.mdc(paymentHash_opt = Some(p.add.paymentHash))) { - db.getIncomingPayment(p.add.paymentHash) match { + payment_opt match { case Some(record) => validatePayment(nodeParams, p, record) match { case Some(cmdFail) => Metrics.PaymentFailed.withTag(Tags.Direction, Tags.Directions.Received).withTag(Tags.Failure, Tags.FailureType(cmdFail)).increment() @@ -166,6 +169,7 @@ class MultiPartHandler(nodeParams: NodeParams, register: ActorRef, db: IncomingP object MultiPartHandler { // @formatter:off + case class ProcessPacket(packet: IncomingPacket.FinalPacket, payment_opt: Option[IncomingPayment]) case class DoFulfill(preimage: ByteVector32, success: MultiPartPaymentFSM.MultiPartPaymentSucceeded) case object GetPendingPayments @@ -228,6 +232,22 @@ object MultiPartHandler { } } + object GetIncomingPaymentActor { + + // @formatter:off + sealed trait Command + case class GetIncomingPayment(replyTo: ActorRef, packet: IncomingPacket.FinalPacket) extends Command + // @formatter:on + + def apply(nodeParams: NodeParams): Behavior[Command] = { + Behaviors.receiveMessage { + case GetIncomingPayment(replyTo, packet) => + replyTo ! ProcessPacket(packet, nodeParams.db.payments.getIncomingPayment(packet.add.paymentHash)) + Behaviors.stopped + } + } + } + private def validatePaymentStatus(payment: IncomingPacket.FinalPacket, record: IncomingPayment)(implicit log: LoggingAdapter): Boolean = { if (record.status.isInstanceOf[IncomingPaymentStatus.Received]) { log.warning("ignoring incoming payment for which has already been paid") diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/MultiPartHandlerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/MultiPartHandlerSpec.scala index 4036a3d073..0554ef8d7b 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/MultiPartHandlerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/MultiPartHandlerSpec.scala @@ -436,9 +436,11 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike val nodeParams = Alice.nodeParams.copy(multiPartPaymentExpiry = 250 millis, features = featuresWithMpp) val handler = TestActorRef[PaymentHandler](PaymentHandler.props(nodeParams, f.register.ref)) - f.sender.send(handler, ReceivePayment(Some(1000 msat), "1 coffee, no sugar")) + val preimage = randomBytes32() + f.sender.send(handler, ReceivePayment(Some(1000 msat), "1 coffee, no sugar", paymentPreimage_opt = Some(preimage))) val pr = f.sender.expectMsgType[PaymentRequest] assert(pr.features.allowMultiPart) + assert(pr.paymentHash == Crypto.sha256(preimage)) val add1 = UpdateAddHtlc(ByteVector32.One, 0, 800 msat, pr.paymentHash, f.defaultExpiry, TestConstants.emptyOnionPacket) f.sender.send(handler, IncomingPacket.FinalPacket(add1, Onion.createMultiPartPayload(add1.amountMsat, 1000 msat, add1.cltvExpiry, pr.paymentSecret.get))) @@ -453,14 +455,15 @@ class MultiPartHandlerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike val add3 = UpdateAddHtlc(ByteVector32.Zeroes, 5, 700 msat, pr.paymentHash, f.defaultExpiry, TestConstants.emptyOnionPacket) f.sender.send(handler, IncomingPacket.FinalPacket(add3, Onion.createMultiPartPayload(add3.amountMsat, 1000 msat, add3.cltvExpiry, pr.paymentSecret.get))) - val cmd1 = f.register.expectMsgType[Register.Forward[CMD_FULFILL_HTLC]] - assert(cmd1.channelId === add2.channelId) - assert(cmd1.message.id === 2) - assert(Crypto.sha256(cmd1.message.r) === pr.paymentHash) - f.register.expectMsg(Register.Forward(ActorRef.noSender, add3.channelId, CMD_FULFILL_HTLC(5, cmd1.message.r, commit = true))) + // the fulfill are not necessarily in the same order as the commands + f.register.expectMsgAllOf( + Register.Forward(ActorRef.noSender, add2.channelId, CMD_FULFILL_HTLC(2, preimage, commit = true)), + Register.Forward(ActorRef.noSender, add3.channelId, CMD_FULFILL_HTLC(5, preimage, commit = true)) + ) val paymentReceived = f.eventListener.expectMsgType[PaymentReceived] - assert(paymentReceived.copy(parts = paymentReceived.parts.map(_.copy(timestamp = 0))) === PaymentReceived(pr.paymentHash, PartialPayment(300 msat, ByteVector32.One, 0) :: PartialPayment(700 msat, ByteVector32.Zeroes, 0) :: Nil)) + assert(paymentReceived.paymentHash === pr.paymentHash) + assert(paymentReceived.parts.map(_.copy(timestamp = 0)).toSet === Set(PartialPayment(300 msat, ByteVector32.One, 0), PartialPayment(700 msat, ByteVector32.Zeroes, 0))) val received = nodeParams.db.payments.getIncomingPayment(pr.paymentHash) assert(received.isDefined && received.get.status.isInstanceOf[IncomingPaymentStatus.Received]) assert(received.get.status.asInstanceOf[IncomingPaymentStatus.Received].amount === 1000.msat)