diff --git a/loopd/daemon.go b/loopd/daemon.go index 880e19621..6aca1c7cc 100644 --- a/loopd/daemon.go +++ b/loopd/daemon.go @@ -571,6 +571,27 @@ func (d *Daemon) initialize(withMacaroonService bool) error { } }) + // Start the HTLC recovery worker that handles HTLC-confirmed + // notifications with a direct sweep attempt. + htlcConfirmedRecovery := &htlcConfirmedRecoveryManager{ + notificationSource: notificationManager, + swapStore: swapDb, + chainParams: d.lnd.ChainParams, + notifier: d.lnd.ChainNotifier, + wallet: d.lnd.WalletKit, + signer: d.lnd.Signer, + } + + d.wg.Go(func() { + debugf("Starting htlc confirmed recovery worker") + defer debugf("Htlc confirmed recovery worker stopped") + + err := htlcConfirmedRecovery.run(d.mainCtx) + if shouldReportManagerErr(err) { + debugf("htlc confirmed recovery worker failed: %v", err) + } + }) + var ( staticAddressManager *address.Manager depositManager *deposit.Manager diff --git a/loopd/htlc_confirmed_recovery.go b/loopd/htlc_confirmed_recovery.go new file mode 100644 index 000000000..7d63bd871 --- /dev/null +++ b/loopd/htlc_confirmed_recovery.go @@ -0,0 +1,118 @@ +package loopd + +import ( + "context" + "fmt" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/loop/looprpc" + "github.com/lightninglabs/loop/swapserverrpc" +) + +const ( + // htlcConfirmedRecoverySatPerVByte is the fixed fee rate used by the + // HTLC recovery worker. + htlcConfirmedRecoverySatPerVByte = 10 +) + +// htlcConfirmedSubscriber exposes the HTLC-confirmed notification stream used +// by the recovery worker. +type htlcConfirmedSubscriber interface { + SubscribeHtlcConfirmed(ctx context.Context, + ) <-chan *swapserverrpc.ServerHtlcConfirmedNotification +} + +// htlcConfirmedRecoveryManager consumes HTLC-confirmed notifications and +// reuses sweepHtlc to recover the notified loop-out HTLC. +type htlcConfirmedRecoveryManager struct { + notificationSource htlcConfirmedSubscriber + swapStore loopOutStore + chainParams *chaincfg.Params + notifier htlcChainNotifier + wallet htlcWallet + signer htlcSigner +} + +// run starts the HTLC recovery worker. +func (m *htlcConfirmedRecoveryManager) run(ctx context.Context) error { + if m.notificationSource == nil { + return nil + } + + ntfnChan := m.notificationSource.SubscribeHtlcConfirmed(ctx) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + + case ntfn, ok := <-ntfnChan: + if !ok { + return nil + } + + m.handleNotification(ctx, ntfn) + } + } +} + +// handleNotification validates a notification and triggers a direct sweep +// attempt for the notified HTLC. +func (m *htlcConfirmedRecoveryManager) handleNotification(ctx context.Context, + ntfn *swapserverrpc.ServerHtlcConfirmedNotification) { + + if ntfn == nil { + debugf("Ignoring nil HTLC recovery notification") + + return + } + + // Require all dependencies up front so the optional recovery path + // exits quietly when the daemon is not wired for sweeping. + if m.swapStore == nil || m.chainParams == nil || m.notifier == nil || + m.wallet == nil || m.signer == nil { + + debugf("HTLC recovery dependencies unavailable") + + return + } + + outpoint, htlcAddress, err := m.parseNotification(ntfn) + if err != nil { + debugf("Ignoring HTLC recovery notification: %v", err) + + return + } + + // Reuse sweepHtlc so the worker follows the same success-path spend + // construction and destination selection as the manual sweep path. + _, err = sweepHtlc( + ctx, &looprpc.SweepHtlcRequest{ + Outpoint: outpoint.String(), + HtlcAddress: htlcAddress, + SatPerVbyte: htlcConfirmedRecoverySatPerVByte, + Publish: true, + }, m.chainParams, m.swapStore, m.notifier, m.wallet, m.signer, + ) + if err != nil { + debugf("Unable to recover HTLC outpoint %s: %v", outpoint, err) + } +} + +// parseNotification parses the swap hash and outpoint from a notification. +func (m *htlcConfirmedRecoveryManager) parseNotification( + ntfn *swapserverrpc.ServerHtlcConfirmedNotification) (*wire.OutPoint, + string, error) { + + outpoint, err := wire.NewOutPointFromString(ntfn.HtlcOutpoint) + if err != nil { + return nil, "", fmt.Errorf("bad outpoint: %w", err) + } + + if ntfn.HtlcAddress == "" { + return nil, "", fmt.Errorf("missing HTLC address") + } + + return outpoint, ntfn.HtlcAddress, nil +} diff --git a/loopd/htlc_confirmed_recovery_test.go b/loopd/htlc_confirmed_recovery_test.go new file mode 100644 index 000000000..4c7f03c80 --- /dev/null +++ b/loopd/htlc_confirmed_recovery_test.go @@ -0,0 +1,359 @@ +package loopd + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/loop/loopdb" + "github.com/lightninglabs/loop/swap" + "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightninglabs/loop/test" + "github.com/lightninglabs/loop/utils" + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lntypes" + "github.com/stretchr/testify/require" +) + +const ( + // htlcConfirmedRecoveryTestTimeout bounds recovery worker tests so + // channel interactions cannot block the suite indefinitely. + htlcConfirmedRecoveryTestTimeout = 5 * time.Second + + // htlcConfirmedRecoveryNoEventWait bounds negative channel assertions. + htlcConfirmedRecoveryNoEventWait = 100 * time.Millisecond +) + +// mockHtlcConfirmedSubscriber exposes a controllable HTLC-confirmed +// notification stream for recovery worker tests. +type mockHtlcConfirmedSubscriber struct { + ntfnChan chan *swapserverrpc.ServerHtlcConfirmedNotification +} + +// newMockHtlcConfirmedSubscriber creates a buffered notification source for +// the recovery worker tests. +func newMockHtlcConfirmedSubscriber() *mockHtlcConfirmedSubscriber { + return &mockHtlcConfirmedSubscriber{ + ntfnChan: make( + chan *swapserverrpc.ServerHtlcConfirmedNotification, 2, + ), + } +} + +// SubscribeHtlcConfirmed returns the test-controlled notification stream. +func (m *mockHtlcConfirmedSubscriber) SubscribeHtlcConfirmed( + context.Context) <-chan *swapserverrpc.ServerHtlcConfirmedNotification { + + return m.ntfnChan +} + +// htlcConfirmedRecoveryFixture bundles the mocks and swap data used by the +// recovery worker tests. +type htlcConfirmedRecoveryFixture struct { + lnd *test.LndMockServices + store *loopdb.StoreMock + subscriber *mockHtlcConfirmedSubscriber + manager *htlcConfirmedRecoveryManager + swap *loopdb.LoopOut + htlc *swap.Htlc + fundingTx *wire.MsgTx + outpoint wire.OutPoint + destPkScript []byte +} + +// newHtlcConfirmedRecoveryFixture builds a loop-out swap and the mock daemon +// services needed to recover its HTLC. +func newHtlcConfirmedRecoveryFixture( + t *testing.T) *htlcConfirmedRecoveryFixture { + + t.Helper() + + lnd := test.NewMockLnd() + store := loopdb.NewStoreMock(t) + + preimage := lntypes.Preimage{1, 2, 3, 4} + swapHash := preimage.Hash() + + // Construct deterministic HTLC keys so the test reconstructs a stable + // swap script and outpoint. + _, senderPub := test.CreateKey(0) + _, receiverPub := test.CreateKey(1) + + var senderKey, receiverKey [33]byte + copy(senderKey[:], senderPub.SerializeCompressed()) + copy(receiverKey[:], receiverPub.SerializeCompressed()) + + htlcKeys := loopdb.HtlcKeys{ + SenderScriptKey: senderKey, + ReceiverScriptKey: receiverKey, + ClientScriptKeyLocator: keychain.KeyLocator{ + Family: keychain.KeyFamily(swap.KeyFamily), + Index: 7, + }, + } + + destAddr, err := btcutil.NewAddressWitnessPubKeyHash( + bytes.Repeat([]byte{2}, 20), lnd.ChainParams, + ) + require.NoError(t, err) + + swapContract := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + Preimage: preimage, + AmountRequested: 100_000, + HtlcKeys: htlcKeys, + CltvExpiry: 500, + InitiationHeight: 123, + ProtocolVersion: loopdb.ProtocolVersionHtlcV2, + }, + DestAddr: destAddr, + } + + loopOut := &loopdb.LoopOut{ + Loop: loopdb.Loop{ + Hash: swapHash, + }, + Contract: swapContract, + } + store.LoopOutSwaps[swapHash] = swapContract + + // Fund the reconstructed HTLC with a single output so the worker has a + // recoverable success-path spend target. + htlc, err := utils.GetHtlc( + swapHash, &swapContract.SwapContract, lnd.ChainParams, + ) + require.NoError(t, err) + + fundingTx := wire.NewMsgTx(2) + fundingTx.AddTxOut(&wire.TxOut{ + Value: int64(swapContract.AmountRequested), + PkScript: htlc.PkScript, + }) + + outpoint := wire.OutPoint{ + Hash: fundingTx.TxHash(), + Index: 0, + } + + destPkScript, err := txscript.PayToAddrScript(destAddr) + require.NoError(t, err) + + subscriber := newMockHtlcConfirmedSubscriber() + manager := &htlcConfirmedRecoveryManager{ + notificationSource: subscriber, + swapStore: store, + chainParams: lnd.ChainParams, + notifier: lnd.ChainNotifier, + wallet: lnd.WalletKit, + signer: lnd.Signer, + } + + return &htlcConfirmedRecoveryFixture{ + lnd: lnd, + store: store, + subscriber: subscriber, + manager: manager, + swap: loopOut, + htlc: htlc, + fundingTx: fundingTx, + outpoint: outpoint, + destPkScript: destPkScript, + } +} + +// waitForConfRegistration waits for the recovery worker to register a single +// confirmation notification. +func waitForConfRegistration(t *testing.T, ctx context.Context, + lnd *test.LndMockServices) *test.ConfRegistration { + + t.Helper() + + select { + case reg := <-lnd.RegisterConfChannel: + return reg + + case <-ctx.Done(): + t.Fatalf("timed out waiting for confirmation registration: %v", + ctx.Err()) + + return nil + } +} + +// waitForSignOutputRawRequest waits for the worker to request a signature. +func waitForSignOutputRawRequest(t *testing.T, ctx context.Context, + lnd *test.LndMockServices) test.SignOutputRawRequest { + + t.Helper() + + select { + case req := <-lnd.SignOutputRawChannel: + return req + + case <-ctx.Done(): + t.Fatalf("timed out waiting for sign request: %v", ctx.Err()) + + return test.SignOutputRawRequest{} + } +} + +// waitForPublishedTx waits for the recovery worker to publish its sweep. +func waitForPublishedTx(t *testing.T, ctx context.Context, + lnd *test.LndMockServices) *wire.MsgTx { + + t.Helper() + + select { + case tx := <-lnd.TxPublishChannel: + return tx + + case <-ctx.Done(): + t.Fatalf("timed out waiting for published tx: %v", ctx.Err()) + + return nil + } +} + +// waitForManagerExit waits for the recovery worker goroutine to stop. +func waitForManagerExit(t *testing.T, ctx context.Context, + runErrChan <-chan error) error { + + t.Helper() + + select { + case err := <-runErrChan: + return err + + case <-ctx.Done(): + t.Fatalf("timed out waiting for recovery worker exit: %v", + ctx.Err()) + + return nil + } +} + +// assertNoRecoveryActivity verifies that the recovery worker did not start a +// sweep attempt. +func assertNoRecoveryActivity(t *testing.T, lnd *test.LndMockServices) { + t.Helper() + + select { + case reg := <-lnd.RegisterConfChannel: + t.Fatalf("unexpected confirmation registration: %+v", reg) + + case <-time.After(htlcConfirmedRecoveryNoEventWait): + } + + select { + case req := <-lnd.SignOutputRawChannel: + t.Fatalf("unexpected sign request: %+v", req) + + case <-time.After(htlcConfirmedRecoveryNoEventWait): + } + + select { + case tx := <-lnd.TxPublishChannel: + t.Fatalf("unexpected published tx: %v", tx.TxHash()) + + case <-time.After(htlcConfirmedRecoveryNoEventWait): + } +} + +// TestHtlcConfirmedRecoveryManagerPublishesSweep verifies that a valid +// notification triggers a direct sweep to the stored destination address. +func TestHtlcConfirmedRecoveryManagerPublishesSweep(t *testing.T) { + defer test.Guard(t)() + + setLogger(newFormatLogger()) + + fixture := newHtlcConfirmedRecoveryFixture(t) + + ctx, cancel := context.WithTimeout( + t.Context(), htlcConfirmedRecoveryTestTimeout, + ) + defer cancel() + + runErrChan := make(chan error, 1) + + go func() { + runErrChan <- fixture.manager.run(ctx) + }() + + ntfn := &swapserverrpc.ServerHtlcConfirmedNotification{ + SwapHash: fixture.swap.Hash[:], + HtlcOutpoint: fixture.outpoint.String(), + HtlcAddress: fixture.htlc.Address.String(), + } + fixture.subscriber.ntfnChan <- ntfn + close(fixture.subscriber.ntfnChan) + + reg := waitForConfRegistration(t, ctx, fixture.lnd) + require.NotNil(t, reg.TxID) + require.Equal(t, fixture.outpoint.Hash, *reg.TxID) + require.Equal(t, fixture.htlc.PkScript, reg.PkScript) + + fixture.lnd.ConfChannel <- &chainntnfs.TxConfirmation{ + Tx: fixture.fundingTx, + BlockHeight: 321, + } + + signReq := waitForSignOutputRawRequest(t, ctx, fixture.lnd) + require.Equal( + t, fixture.outpoint, signReq.Tx.TxIn[0].PreviousOutPoint, + ) + require.Len(t, signReq.Tx.TxOut, 1) + require.Equal( + t, fixture.destPkScript, signReq.Tx.TxOut[0].PkScript, + ) + + publishedTx := waitForPublishedTx(t, ctx, fixture.lnd) + require.Equal( + t, fixture.outpoint, publishedTx.TxIn[0].PreviousOutPoint, + ) + require.Len(t, publishedTx.TxOut, 1) + require.Equal( + t, fixture.destPkScript, publishedTx.TxOut[0].PkScript, + ) + require.NotEmpty(t, publishedTx.TxIn[0].Witness) + + require.NoError(t, waitForManagerExit(t, ctx, runErrChan)) + require.NoError(t, fixture.lnd.IsDone()) +} + +// TestHtlcConfirmedRecoveryManagerIgnoresBadNotification verifies that a bad +// notification is ignored without starting a sweep attempt. +func TestHtlcConfirmedRecoveryManagerIgnoresBadNotification(t *testing.T) { + defer test.Guard(t)() + + setLogger(newFormatLogger()) + + fixture := newHtlcConfirmedRecoveryFixture(t) + + ctx, cancel := context.WithTimeout( + t.Context(), htlcConfirmedRecoveryTestTimeout, + ) + defer cancel() + + runErrChan := make(chan error, 1) + + go func() { + runErrChan <- fixture.manager.run(ctx) + }() + + ntfn := &swapserverrpc.ServerHtlcConfirmedNotification{ + SwapHash: fixture.swap.Hash[:], + HtlcOutpoint: "not-an-outpoint", + HtlcAddress: fixture.htlc.Address.String(), + } + fixture.subscriber.ntfnChan <- ntfn + close(fixture.subscriber.ntfnChan) + + require.NoError(t, waitForManagerExit(t, ctx, runErrChan)) + assertNoRecoveryActivity(t, fixture.lnd) + require.NoError(t, fixture.lnd.IsDone()) +} diff --git a/loopd/log.go b/loopd/log.go index 6d102bc59..7d92c120b 100644 --- a/loopd/log.go +++ b/loopd/log.go @@ -43,6 +43,11 @@ func tracef(format string, params ...any) { log().Tracef(format, params...) } +// debugf logs a message with level DEBUG. +func debugf(format string, params ...any) { + log().Debugf(format, params...) +} + // infof logs a message with level INFO. func infof(format string, params ...any) { log().Infof(format, params...) diff --git a/loopd/sweep_htlc.go b/loopd/sweep_htlc.go index c613b5d7c..a95727f3b 100644 --- a/loopd/sweep_htlc.go +++ b/loopd/sweep_htlc.go @@ -106,8 +106,8 @@ func sweepHtlc(ctx context.Context, req *looprpc.SweepHtlcRequest, return nil, status.Error(codes.InvalidArgument, err.Error()) } - // Destination address: honor a provided override or derive a fresh - // wallet address from the default account. + // Destination address: honor a provided override immediately so request + // validation stays independent from swap lookup. var sweepAddr btcutil.Address if req.DestAddress != "" { sweepAddr, err = btcutil.DecodeAddress( @@ -117,30 +117,10 @@ func sweepHtlc(ctx context.Context, req *looprpc.SweepHtlcRequest, return nil, status.Errorf(codes.InvalidArgument, "invalid dest_address: %v", err) } - } else { - sweepAddr, err = wallet.NextAddr( - ctx, lnwallet.DefaultAccountName, - walletrpc.AddressType_TAPROOT_PUBKEY, - false, - ) - if err != nil { - return nil, status.Errorf(codes.Internal, - "derive sweep address: %v", err) - } - infof("sweephtlc: generated new destination address: %v", - sweepAddr.EncodeAddress()) - } - - sweepPkScript, err := txscript.PayToAddrScript(sweepAddr) - if err != nil { - return nil, err } - infof("sweephtlc: start sweep for %v -> %v", req.Outpoint, - sweepAddr.EncodeAddress()) - // Locate the loop-out swap whose HTLC script matches the outpoint so - // we can obtain keys and the stored preimage. + // we can obtain keys, the stored preimage, and the default destination. swaps, err := store.FetchLoopOutSwaps(ctx) if err != nil { return nil, err @@ -172,6 +152,34 @@ func sweepHtlc(ctx context.Context, req *looprpc.SweepHtlcRequest, "no matching swap HTLC found") } + // Prefer the stored swap destination for recovery sweeps and only + // derive a fresh wallet address when neither the request nor DB + // specifies one. + if sweepAddr == nil { + sweepAddr = targetSwap.Contract.DestAddr + } + if sweepAddr == nil { + sweepAddr, err = wallet.NextAddr( + ctx, lnwallet.DefaultAccountName, + walletrpc.AddressType_TAPROOT_PUBKEY, + false, + ) + if err != nil { + return nil, status.Errorf(codes.Internal, + "derive sweep address: %v", err) + } + infof("sweephtlc: generated new destination address: %v", + sweepAddr.EncodeAddress()) + } + + sweepPkScript, err := txscript.PayToAddrScript(sweepAddr) + if err != nil { + return nil, err + } + + infof("sweephtlc: start sweep for %v -> %v", req.Outpoint, + sweepAddr.EncodeAddress()) + infof("sweephtlc: matched swap %v at height hint %v", targetSwap.Hash, targetSwap.Contract.InitiationHeight) diff --git a/loopd/sweep_htlc_test.go b/loopd/sweep_htlc_test.go index 139eca823..7be675cc6 100644 --- a/loopd/sweep_htlc_test.go +++ b/loopd/sweep_htlc_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog/v2" "github.com/lightninglabs/loop/loopdb" @@ -17,7 +18,9 @@ import ( "github.com/lightninglabs/loop/utils" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lntypes" + "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chainfee" "github.com/stretchr/testify/require" ) @@ -45,7 +48,6 @@ var sweepHtlcTests = []struct { satPerVByte: 10, expectRegister: true, expectLogs: []string{ - "sweephtlc: generated new destination address: %v", "sweephtlc: start sweep for %v -> %v", "sweephtlc: matched swap %v at height hint %v", "sweephtlc: registering conf ntfn for %v hint=%v", @@ -63,7 +65,6 @@ var sweepHtlcTests = []struct { satPerVByte: 10, expectRegister: true, expectLogs: []string{ - "sweephtlc: generated new destination address: %v", "sweephtlc: start sweep for %v -> %v", "sweephtlc: matched swap %v at height hint %v", "sweephtlc: registering conf ntfn for %v hint=%v", @@ -83,7 +84,6 @@ var sweepHtlcTests = []struct { satPerVByte: 10, expectRegister: true, expectLogs: []string{ - "sweephtlc: generated new destination address: %v", "sweephtlc: start sweep for %v -> %v", "sweephtlc: matched swap %v at height hint %v", "sweephtlc: registering conf ntfn for %v hint=%v", @@ -105,7 +105,6 @@ var sweepHtlcTests = []struct { expectErrMsg: "fee exceeds", expectRegister: true, expectLogs: []string{ - "sweephtlc: generated new destination address: %v", "sweephtlc: start sweep for %v -> %v", "sweephtlc: matched swap %v at height hint %v", "sweephtlc: registering conf ntfn for %v hint=%v", @@ -124,7 +123,6 @@ var sweepHtlcTests = []struct { expectErrMsg: "fee too low for relay after clamp", expectRegister: true, expectLogs: []string{ - "sweephtlc: generated new destination address: %v", "sweephtlc: start sweep for %v -> %v", "sweephtlc: matched swap %v at height hint %v", "sweephtlc: registering conf ntfn for %v hint=%v", @@ -182,10 +180,7 @@ var sweepHtlcTests = []struct { expectErrMsg: "no matching swap", expectRegister: false, noSwap: true, - expectLogs: []string{ - "sweephtlc: generated new destination address: %v", - "sweephtlc: start sweep for %v -> %v", - }, + expectLogs: []string{}, }, { name: "invalid initiation height", @@ -197,7 +192,6 @@ var sweepHtlcTests = []struct { contract.InitiationHeight = 0 }, expectLogs: []string{ - "sweephtlc: generated new destination address: %v", "sweephtlc: start sweep for %v -> %v", "sweephtlc: matched swap %v at height hint %v", }, @@ -212,7 +206,6 @@ var sweepHtlcTests = []struct { reg.ErrChan <- errors.New("boom") }, expectLogs: []string{ - "sweephtlc: generated new destination address: %v", "sweephtlc: start sweep for %v -> %v", "sweephtlc: matched swap %v at height hint %v", "sweephtlc: registering conf ntfn for %v hint=%v", @@ -230,7 +223,6 @@ var sweepHtlcTests = []struct { txOut.PkScript = []byte{0x6a} }, expectLogs: []string{ - "sweephtlc: generated new destination address: %v", "sweephtlc: start sweep for %v -> %v", "sweephtlc: matched swap %v at height hint %v", "sweephtlc: registering conf ntfn for %v hint=%v", @@ -245,7 +237,6 @@ var sweepHtlcTests = []struct { expectErrMsg: "fee exceeds HTLC value", expectRegister: true, expectLogs: []string{ - "sweephtlc: generated new destination address: %v", "sweephtlc: start sweep for %v -> %v", "sweephtlc: matched swap %v at height hint %v", "sweephtlc: registering conf ntfn for %v hint=%v", @@ -264,6 +255,23 @@ var sweepHtlcTests = []struct { modifyReq: func(req *looprpc.SweepHtlcRequest) { req.Preimage = bytes.Repeat([]byte{9}, 32) }, + expectLogs: []string{ + "sweephtlc: start sweep for %v -> %v", + "sweephtlc: matched swap %v at height hint %v", + "sweephtlc: registering conf ntfn for %v hint=%v", + "sweephtlc: waiting for confirmation of %v", + "sweephtlc: funding confirmed at height %v", + "sweephtlc: swap hash validated for %v", + }, + }, + { + name: "fallback to generated destination", + amount: 100_000, + satPerVByte: 10, + expectRegister: true, + mutateSwap: func(contract *loopdb.LoopOutContract) { + contract.DestAddr = nil + }, expectLogs: []string{ "sweephtlc: generated new destination address: %v", "sweephtlc: start sweep for %v -> %v", @@ -272,6 +280,9 @@ var sweepHtlcTests = []struct { "sweephtlc: waiting for confirmation of %v", "sweephtlc: funding confirmed at height %v", "sweephtlc: swap hash validated for %v", + "sweephtlc: sweeping to %v with feerate %v sat/vbyte", + "sweephtlc: signing sweep spending %v", + "sweephtlc: witness assembled, tx size=%d vbytes", }, }, } @@ -336,7 +347,7 @@ func TestSweepHtlc(t *testing.T) { } destAddr, err := btcutil.NewAddressWitnessPubKeyHash( - make([]byte, 20), lnd.ChainParams, + bytes.Repeat([]byte{1}, 20), lnd.ChainParams, ) require.NoError(t, err) @@ -498,6 +509,33 @@ func TestSweepHtlc(t *testing.T) { ) require.NotEmpty(t, sweepTx.TxIn[0].Witness) + // Verify that the sweep uses the stored destination address and + // only falls back to wallet address generation when the swap + // record does not define one. + expectedAddr := loopOut.Contract.DestAddr + if req.DestAddress != "" { + expectedAddr, err = btcutil.DecodeAddress( + req.DestAddress, lnd.ChainParams, + ) + require.NoError(t, err) + } + if expectedAddr == nil { + expectedAddr, err = lnd.WalletKit.NextAddr( + ctx, lnwallet.DefaultAccountName, + walletrpc.AddressType_TAPROOT_PUBKEY, false, + ) + require.NoError(t, err) + } + + expectedPkScript, err := txscript.PayToAddrScript( + expectedAddr, + ) + require.NoError(t, err) + require.Len(t, sweepTx.TxOut, 1) + require.Equal( + t, expectedPkScript, sweepTx.TxOut[0].PkScript, + ) + if tc.publish { // For publish=true we should see a // publish (or a publish failure diff --git a/notifications/manager.go b/notifications/manager.go index bd1ac4170..3eda5d652 100644 --- a/notifications/manager.go +++ b/notifications/manager.go @@ -29,6 +29,10 @@ const ( // NotificationTypeUnfinishedSwap is the notification type for unfinished // swap notifications. NotificationTypeUnfinishedSwap + + // NotificationTypeHtlcConfirmed is the notification type for HTLC + // confirmed notifications. + NotificationTypeHtlcConfirmed ) const ( @@ -37,6 +41,10 @@ const ( // successful connection. defaultMinAliveConnTime = time.Minute + // htlcConfirmedSubscriberSendTimeout is how long we wait for a busy + // htlc-confirmed subscriber before dropping the notification. + htlcConfirmedSubscriberSendTimeout = 200 * time.Millisecond + // current_version is the current version of the notification listener. current_version = swapserverrpc.SubscribeNotificationsRequest_V1 ) @@ -164,6 +172,27 @@ func (m *Manager) SubscribeUnfinishedSwaps(ctx context.Context, return notifChan } +// SubscribeHtlcConfirmed subscribes to the HTLC confirmed notifications. +func (m *Manager) SubscribeHtlcConfirmed(ctx context.Context, +) <-chan *swapserverrpc.ServerHtlcConfirmedNotification { + + notifChan := make( + chan *swapserverrpc.ServerHtlcConfirmedNotification, 1, + ) + sub := subscriber{ + subCtx: ctx, + recvChan: notifChan, + } + + m.addSubscriber(NotificationTypeHtlcConfirmed, sub) + context.AfterFunc(ctx, func() { + m.removeSubscriber(NotificationTypeHtlcConfirmed, sub) + close(notifChan) + }) + + return notifChan +} + // Run starts the notification manager. It will keep on running until the // context is canceled. It will subscribe to notifications and forward them to // the subscribers. On a first successful connection to the server, it will @@ -333,9 +362,30 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc. recvChan <- unfinishedSwapNtfn } + case *swapserverrpc.SubscribeNotificationsResponse_HtlcConfirmed: + // We'll forward the htlc confirmed notification to all + // subscribers. We wait briefly for a slow subscriber and + // then drop to avoid stalling the notification pipeline. + htlcConfirmedNtfn := ntfn.GetHtlcConfirmed() + m.Lock() + defer m.Unlock() + + subscribers := m.subscribers[NotificationTypeHtlcConfirmed] + for _, sub := range subscribers { + recvChan := sub.recvChan.(chan *swapserverrpc. + ServerHtlcConfirmedNotification) + + select { + case recvChan <- htlcConfirmedNtfn: + + case <-time.After(htlcConfirmedSubscriberSendTimeout): + log.Infof("Dropping htlc confirmed " + + "notification, subscriber busy") + } + } + default: - log.Warnf("Received unknown notification type: %v", - ntfn) + log.Debugf("Received unknown notification type: %v", ntfn) } } diff --git a/notifications/manager_test.go b/notifications/manager_test.go index 9a06503e8..52a72d852 100644 --- a/notifications/manager_test.go +++ b/notifications/manager_test.go @@ -3,9 +3,11 @@ package notifications import ( "context" "errors" + "fmt" "io" "sync" "testing" + "testing/synctest" "time" "github.com/lightninglabs/aperture/l402" @@ -428,3 +430,178 @@ func TestManager_Backoff_Pending_Token(t *testing.T) { "Expected to backoff for at ~3 seconds due to pending token", ) } + +// TestManager_HtlcConfirmedNotification tests that the Manager correctly +// forwards htlc confirmed notifications to subscribers via the end-to-end +// subscription path. +func TestManager_HtlcConfirmedNotification(t *testing.T) { + t.Parallel() + + // Create a mock notification client. + recvChan := make( + chan *swapserverrpc.SubscribeNotificationsResponse, 1, + ) + errChan := make(chan error, 1) + mockStream := &mockSubscribeNotificationsClient{ + recvChan: recvChan, + recvErrChan: errChan, + } + mockClient := &mockNotificationsClient{ + mockStream: mockStream, + } + + mgr := NewManager(&Config{ + Client: mockClient, + CurrentToken: func() (*l402.Token, error) { + return &l402.Token{ + Preimage: lntypes.Preimage{1, 2, 3}, + }, nil + }, + }) + + // Subscribe to htlc confirmed notifications. + ctx := t.Context() + subCtx, subCancel := context.WithCancel(ctx) + defer subCancel() + subChan := mgr.SubscribeHtlcConfirmed(subCtx) + + // Run the manager. + go func() { + _ = mgr.Run(ctx) + }() + + // Wait for the manager to subscribe to the server stream. + require.Eventually(t, func() bool { + mockClient.Lock() + defer mockClient.Unlock() + + return mockClient.timesCalled > 0 + }, time.Second*5, 10*time.Millisecond) + + // Send an htlc confirmed notification via the mock stream. + testSwapHash := []byte("test_hash_32_bytes_long_padding!") + testNtfn := &swapserverrpc.SubscribeNotificationsResponse{ + Notification: &swapserverrpc.SubscribeNotificationsResponse_HtlcConfirmed{ // nolint: lll + HtlcConfirmed: &swapserverrpc.ServerHtlcConfirmedNotification{ // nolint: lll + SwapHash: testSwapHash, + HtlcOutpoint: "abc123:0", + HtlcAddress: "tb1qexamplehtlcaddress", + }, + }, + } + recvChan <- testNtfn + + // Verify the subscriber receives it. + select { + case received := <-subChan: + require.NotNil(t, received) + require.Equal(t, testSwapHash, received.SwapHash) + require.Equal(t, "abc123:0", received.HtlcOutpoint) + require.Equal(t, "tb1qexamplehtlcaddress", received.HtlcAddress) + + case <-time.After(5 * time.Second): + t.Fatal("did not receive htlc confirmed notification") + } + + // Cancel the subscription and verify the channel closes. + subCancel() + require.Eventually(t, func() bool { + select { + case _, ok := <-subChan: + return !ok + + default: + return false + } + }, time.Second*5, 10*time.Millisecond) +} + +// TestManager_HtlcConfirmedNonBlocking tests that a slow htlc confirmed +// subscriber does not block the notification pipeline. +func TestManager_HtlcConfirmedNonBlocking(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + recvChan := make( + chan *swapserverrpc.SubscribeNotificationsResponse, 1, + ) + errChan := make(chan error, 1) + mockStream := &mockSubscribeNotificationsClient{ + recvChan: recvChan, + recvErrChan: errChan, + } + mockClient := &mockNotificationsClient{ + mockStream: mockStream, + } + + mgr := NewManager(&Config{ + Client: mockClient, + CurrentToken: func() (*l402.Token, error) { + return &l402.Token{ + Preimage: lntypes.Preimage{1, 2, 3}, + }, nil + }, + }) + + // Subscribe but never read from the channel. + subCtx, subCancel := context.WithCancel(t.Context()) + defer subCancel() + _ = mgr.SubscribeHtlcConfirmed(subCtx) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + go func() { + _ = mgr.Run(ctx) + }() + + // Wait for the manager to connect and block on stream Recv(). + synctest.Wait() + mockClient.Lock() + require.Greater(t, mockClient.timesCalled, 0) + mockClient.Unlock() + + // Also send a reservation notification to prove the pipeline + // is not blocked — it should still be dispatched once the + // timed htlc fanout drop has elapsed. + resChan := mgr.SubscribeReservations(subCtx) + + // Send two notifications rapidly. The subscriber channel has + // buffer 1, so the first should be delivered and the second + // should be dropped after waiting for the timeout. + for i := range 2 { + ntfn := &swapserverrpc.SubscribeNotificationsResponse{ + Notification: &swapserverrpc.SubscribeNotificationsResponse_HtlcConfirmed{ // nolint: lll + HtlcConfirmed: &swapserverrpc.ServerHtlcConfirmedNotification{ // nolint: lll + SwapHash: fmt.Appendf(nil, "hash_%d_padding_to_32bytes!!", i), // nolint: lll + HtlcOutpoint: "abc:0", + HtlcAddress: "tb1qexamplehtlcaddress", + }, + }, + } + recvChan <- ntfn + } + + resNtfn := &swapserverrpc.SubscribeNotificationsResponse{ + Notification: &swapserverrpc.SubscribeNotificationsResponse_ReservationNotification{ // nolint: lll + ReservationNotification: &swapserverrpc.ServerReservationNotification{ // nolint: lll + ReservationId: []byte("res1"), + }, + }, + } + recvChan <- resNtfn + + select { + case res := <-resChan: + require.Equal(t, []byte("res1"), res.ReservationId) + + case <-time.After(5 * htlcConfirmedSubscriberSendTimeout): + t.Fatal("reservation notification blocked by slow " + + "htlc confirmed subscriber") + } + + // Stop the manager and ensure all goroutines in the test exit. + cancel() + close(recvChan) + synctest.Wait() + }) +} diff --git a/swapserverrpc/server.pb.go b/swapserverrpc/server.pb.go index 0ba23bde8..c29e1fb8b 100644 --- a/swapserverrpc/server.pb.go +++ b/swapserverrpc/server.pb.go @@ -2789,6 +2789,7 @@ type SubscribeNotificationsResponse struct { // *SubscribeNotificationsResponse_UnfinishedSwap // *SubscribeNotificationsResponse_StaticLoopInRiskAccepted // *SubscribeNotificationsResponse_StaticLoopInRiskRejected + // *SubscribeNotificationsResponse_HtlcConfirmed Notification isSubscribeNotificationsResponse_Notification `protobuf_oneof:"notification"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -2876,6 +2877,15 @@ func (x *SubscribeNotificationsResponse) GetStaticLoopInRiskRejected() *ServerSt return nil } +func (x *SubscribeNotificationsResponse) GetHtlcConfirmed() *ServerHtlcConfirmedNotification { + if x != nil { + if x, ok := x.Notification.(*SubscribeNotificationsResponse_HtlcConfirmed); ok { + return x.HtlcConfirmed + } + } + return nil +} + type isSubscribeNotificationsResponse_Notification interface { isSubscribeNotificationsResponse_Notification() } @@ -2900,6 +2910,10 @@ type SubscribeNotificationsResponse_StaticLoopInRiskRejected struct { StaticLoopInRiskRejected *ServerStaticLoopInRiskRejectedNotification `protobuf:"bytes,5,opt,name=static_loop_in_risk_rejected,json=staticLoopInRiskRejected,proto3,oneof"` } +type SubscribeNotificationsResponse_HtlcConfirmed struct { + HtlcConfirmed *ServerHtlcConfirmedNotification `protobuf:"bytes,6,opt,name=htlc_confirmed,json=htlcConfirmed,proto3,oneof"` +} + func (*SubscribeNotificationsResponse_ReservationNotification) isSubscribeNotificationsResponse_Notification() { } @@ -2915,6 +2929,9 @@ func (*SubscribeNotificationsResponse_StaticLoopInRiskAccepted) isSubscribeNotif func (*SubscribeNotificationsResponse_StaticLoopInRiskRejected) isSubscribeNotificationsResponse_Notification() { } +func (*SubscribeNotificationsResponse_HtlcConfirmed) isSubscribeNotificationsResponse_Notification() { +} + // ServerStaticLoopInSweepNotification is a request from the server to the // client to cosign a transaction that contains deposits from a finished static // loop ins. @@ -3143,6 +3160,72 @@ func (x *ServerUnfinishedSwapNotification) GetIsLoopIn() bool { return false } +// ServerHtlcConfirmedNotification is a notification from the server to the +// client that the on-chain HTLC for a loop out swap has been confirmed. +type ServerHtlcConfirmedNotification struct { + state protoimpl.MessageState `protogen:"open.v1"` + // The swap hash of the loop out swap. + SwapHash []byte `protobuf:"bytes,1,opt,name=swap_hash,json=swapHash,proto3" json:"swap_hash,omitempty"` + // htlc_outpoint is the outpoint of the confirmed HTLC in the format + // "txid:output_index". + HtlcOutpoint string `protobuf:"bytes,2,opt,name=htlc_outpoint,json=htlcOutpoint,proto3" json:"htlc_outpoint,omitempty"` + // htlc_address is the address of the confirmed HTLC output. + HtlcAddress string `protobuf:"bytes,3,opt,name=htlc_address,json=htlcAddress,proto3" json:"htlc_address,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ServerHtlcConfirmedNotification) Reset() { + *x = ServerHtlcConfirmedNotification{} + mi := &file_server_proto_msgTypes[40] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ServerHtlcConfirmedNotification) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerHtlcConfirmedNotification) ProtoMessage() {} + +func (x *ServerHtlcConfirmedNotification) ProtoReflect() protoreflect.Message { + mi := &file_server_proto_msgTypes[40] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerHtlcConfirmedNotification.ProtoReflect.Descriptor instead. +func (*ServerHtlcConfirmedNotification) Descriptor() ([]byte, []int) { + return file_server_proto_rawDescGZIP(), []int{40} +} + +func (x *ServerHtlcConfirmedNotification) GetSwapHash() []byte { + if x != nil { + return x.SwapHash + } + return nil +} + +func (x *ServerHtlcConfirmedNotification) GetHtlcOutpoint() string { + if x != nil { + return x.HtlcOutpoint + } + return "" +} + +func (x *ServerHtlcConfirmedNotification) GetHtlcAddress() string { + if x != nil { + return x.HtlcAddress + } + return "" +} + var File_server_proto protoreflect.FileDescriptor const file_server_proto_rawDesc = "" + @@ -3311,13 +3394,14 @@ const file_server_proto_rawDesc = "" + "\x0fListenerVersion\x12\n" + "\n" + "\x06LEGACY\x10\x00\x12\x06\n" + - "\x02V1\x10\x01\"\xba\x04\n" + + "\x02V1\x10\x01\"\x8d\x05\n" + "\x1eSubscribeNotificationsResponse\x12c\n" + "\x18reservation_notification\x18\x01 \x01(\v2&.looprpc.ServerReservationNotificationH\x00R\x17reservationNotification\x12_\n" + "\x14static_loop_in_sweep\x18\x02 \x01(\v2,.looprpc.ServerStaticLoopInSweepNotificationH\x00R\x11staticLoopInSweep\x12T\n" + "\x0funfinished_swap\x18\x03 \x01(\v2).looprpc.ServerUnfinishedSwapNotificationH\x00R\x0eunfinishedSwap\x12u\n" + "\x1cstatic_loop_in_risk_accepted\x18\x04 \x01(\v23.looprpc.ServerStaticLoopInRiskAcceptedNotificationH\x00R\x18staticLoopInRiskAccepted\x12u\n" + - "\x1cstatic_loop_in_risk_rejected\x18\x05 \x01(\v23.looprpc.ServerStaticLoopInRiskRejectedNotificationH\x00R\x18staticLoopInRiskRejectedB\x0e\n" + + "\x1cstatic_loop_in_risk_rejected\x18\x05 \x01(\v23.looprpc.ServerStaticLoopInRiskRejectedNotificationH\x00R\x18staticLoopInRiskRejected\x12Q\n" + + "\x0ehtlc_confirmed\x18\x06 \x01(\v2(.looprpc.ServerHtlcConfirmedNotificationH\x00R\rhtlcConfirmedB\x0e\n" + "\fnotification\"\xd2\x02\n" + "#ServerStaticLoopInSweepNotification\x12\"\n" + "\rsweep_tx_psbt\x18\x01 \x01(\fR\vsweepTxPsbt\x12\x1b\n" + @@ -3334,7 +3418,11 @@ const file_server_proto_rawDesc = "" + " ServerUnfinishedSwapNotification\x12\x1b\n" + "\tswap_hash\x18\x01 \x01(\fR\bswapHash\x12\x1c\n" + "\n" + - "is_loop_in\x18\x02 \x01(\bR\bisLoopIn*\xef\x01\n" + + "is_loop_in\x18\x02 \x01(\bR\bisLoopIn\"\x86\x01\n" + + "\x1fServerHtlcConfirmedNotification\x12\x1b\n" + + "\tswap_hash\x18\x01 \x01(\fR\bswapHash\x12#\n" + + "\rhtlc_outpoint\x18\x02 \x01(\tR\fhtlcOutpoint\x12!\n" + + "\fhtlc_address\x18\x03 \x01(\tR\vhtlcAddress*\xef\x01\n" + "\x0fProtocolVersion\x12\n" + "\n" + "\x06LEGACY\x10\x00\x12\x12\n" + @@ -3417,7 +3505,7 @@ func file_server_proto_rawDescGZIP() []byte { } var file_server_proto_enumTypes = make([]protoimpl.EnumInfo, 6) -var file_server_proto_msgTypes = make([]protoimpl.MessageInfo, 41) +var file_server_proto_msgTypes = make([]protoimpl.MessageInfo, 42) var file_server_proto_goTypes = []any{ (ProtocolVersion)(0), // 0: looprpc.ProtocolVersion (ServerSwapState)(0), // 1: looprpc.ServerSwapState @@ -3465,16 +3553,17 @@ var file_server_proto_goTypes = []any{ (*ServerStaticLoopInRiskAcceptedNotification)(nil), // 43: looprpc.ServerStaticLoopInRiskAcceptedNotification (*ServerStaticLoopInRiskRejectedNotification)(nil), // 44: looprpc.ServerStaticLoopInRiskRejectedNotification (*ServerUnfinishedSwapNotification)(nil), // 45: looprpc.ServerUnfinishedSwapNotification - nil, // 46: looprpc.ServerStaticLoopInSweepNotification.DepositToNoncesEntry - (*RouteHint)(nil), // 47: looprpc.RouteHint - (*ServerReservationNotification)(nil), // 48: looprpc.ServerReservationNotification + (*ServerHtlcConfirmedNotification)(nil), // 46: looprpc.ServerHtlcConfirmedNotification + nil, // 47: looprpc.ServerStaticLoopInSweepNotification.DepositToNoncesEntry + (*RouteHint)(nil), // 48: looprpc.RouteHint + (*ServerReservationNotification)(nil), // 49: looprpc.ServerReservationNotification } var file_server_proto_depIdxs = []int32{ 0, // 0: looprpc.ServerLoopOutRequest.protocol_version:type_name -> looprpc.ProtocolVersion 0, // 1: looprpc.ServerLoopOutQuoteRequest.protocol_version:type_name -> looprpc.ProtocolVersion 0, // 2: looprpc.ServerLoopOutTermsRequest.protocol_version:type_name -> looprpc.ProtocolVersion 0, // 3: looprpc.ServerLoopInRequest.protocol_version:type_name -> looprpc.ProtocolVersion - 47, // 4: looprpc.ServerLoopInQuoteRequest.route_hints:type_name -> looprpc.RouteHint + 48, // 4: looprpc.ServerLoopInQuoteRequest.route_hints:type_name -> looprpc.RouteHint 0, // 5: looprpc.ServerLoopInQuoteRequest.protocol_version:type_name -> looprpc.ProtocolVersion 0, // 6: looprpc.ServerLoopInTermsRequest.protocol_version:type_name -> looprpc.ProtocolVersion 0, // 7: looprpc.ServerLoopOutPushPreimageRequest.protocol_version:type_name -> looprpc.ProtocolVersion @@ -3487,7 +3576,7 @@ var file_server_proto_depIdxs = []int32{ 0, // 14: looprpc.CancelLoopOutSwapRequest.protocol_version:type_name -> looprpc.ProtocolVersion 23, // 15: looprpc.CancelLoopOutSwapRequest.route_cancel:type_name -> looprpc.RouteCancel 0, // 16: looprpc.ServerProbeRequest.protocol_version:type_name -> looprpc.ProtocolVersion - 47, // 17: looprpc.ServerProbeRequest.route_hints:type_name -> looprpc.RouteHint + 48, // 17: looprpc.ServerProbeRequest.route_hints:type_name -> looprpc.RouteHint 0, // 18: looprpc.RecommendRoutingPluginReq.protocol_version:type_name -> looprpc.ProtocolVersion 4, // 19: looprpc.RecommendRoutingPluginRes.plugin:type_name -> looprpc.RoutingPlugin 0, // 20: looprpc.ReportRoutingResultReq.protocol_version:type_name -> looprpc.ProtocolVersion @@ -3496,52 +3585,53 @@ var file_server_proto_depIdxs = []int32{ 34, // 23: looprpc.MuSig2SignSweepReq.prevout_info:type_name -> looprpc.PrevoutInfo 0, // 24: looprpc.ServerPushKeyReq.protocol_version:type_name -> looprpc.ProtocolVersion 5, // 25: looprpc.SubscribeNotificationsRequest.version:type_name -> looprpc.SubscribeNotificationsRequest.ListenerVersion - 48, // 26: looprpc.SubscribeNotificationsResponse.reservation_notification:type_name -> looprpc.ServerReservationNotification + 49, // 26: looprpc.SubscribeNotificationsResponse.reservation_notification:type_name -> looprpc.ServerReservationNotification 42, // 27: looprpc.SubscribeNotificationsResponse.static_loop_in_sweep:type_name -> looprpc.ServerStaticLoopInSweepNotification 45, // 28: looprpc.SubscribeNotificationsResponse.unfinished_swap:type_name -> looprpc.ServerUnfinishedSwapNotification 43, // 29: looprpc.SubscribeNotificationsResponse.static_loop_in_risk_accepted:type_name -> looprpc.ServerStaticLoopInRiskAcceptedNotification 44, // 30: looprpc.SubscribeNotificationsResponse.static_loop_in_risk_rejected:type_name -> looprpc.ServerStaticLoopInRiskRejectedNotification - 46, // 31: looprpc.ServerStaticLoopInSweepNotification.deposit_to_nonces:type_name -> looprpc.ServerStaticLoopInSweepNotification.DepositToNoncesEntry - 34, // 32: looprpc.ServerStaticLoopInSweepNotification.prevout_info:type_name -> looprpc.PrevoutInfo - 10, // 33: looprpc.SwapServer.LoopOutTerms:input_type -> looprpc.ServerLoopOutTermsRequest - 6, // 34: looprpc.SwapServer.NewLoopOutSwap:input_type -> looprpc.ServerLoopOutRequest - 18, // 35: looprpc.SwapServer.LoopOutPushPreimage:input_type -> looprpc.ServerLoopOutPushPreimageRequest - 8, // 36: looprpc.SwapServer.LoopOutQuote:input_type -> looprpc.ServerLoopOutQuoteRequest - 16, // 37: looprpc.SwapServer.LoopInTerms:input_type -> looprpc.ServerLoopInTermsRequest - 12, // 38: looprpc.SwapServer.NewLoopInSwap:input_type -> looprpc.ServerLoopInRequest - 14, // 39: looprpc.SwapServer.LoopInQuote:input_type -> looprpc.ServerLoopInQuoteRequest - 20, // 40: looprpc.SwapServer.SubscribeLoopOutUpdates:input_type -> looprpc.SubscribeUpdatesRequest - 20, // 41: looprpc.SwapServer.SubscribeLoopInUpdates:input_type -> looprpc.SubscribeUpdatesRequest - 25, // 42: looprpc.SwapServer.CancelLoopOutSwap:input_type -> looprpc.CancelLoopOutSwapRequest - 27, // 43: looprpc.SwapServer.Probe:input_type -> looprpc.ServerProbeRequest - 29, // 44: looprpc.SwapServer.RecommendRoutingPlugin:input_type -> looprpc.RecommendRoutingPluginReq - 31, // 45: looprpc.SwapServer.ReportRoutingResult:input_type -> looprpc.ReportRoutingResultReq - 33, // 46: looprpc.SwapServer.MuSig2SignSweep:input_type -> looprpc.MuSig2SignSweepReq - 36, // 47: looprpc.SwapServer.PushKey:input_type -> looprpc.ServerPushKeyReq - 38, // 48: looprpc.SwapServer.FetchL402:input_type -> looprpc.FetchL402Request - 40, // 49: looprpc.SwapServer.SubscribeNotifications:input_type -> looprpc.SubscribeNotificationsRequest - 11, // 50: looprpc.SwapServer.LoopOutTerms:output_type -> looprpc.ServerLoopOutTerms - 7, // 51: looprpc.SwapServer.NewLoopOutSwap:output_type -> looprpc.ServerLoopOutResponse - 19, // 52: looprpc.SwapServer.LoopOutPushPreimage:output_type -> looprpc.ServerLoopOutPushPreimageResponse - 9, // 53: looprpc.SwapServer.LoopOutQuote:output_type -> looprpc.ServerLoopOutQuote - 17, // 54: looprpc.SwapServer.LoopInTerms:output_type -> looprpc.ServerLoopInTerms - 13, // 55: looprpc.SwapServer.NewLoopInSwap:output_type -> looprpc.ServerLoopInResponse - 15, // 56: looprpc.SwapServer.LoopInQuote:output_type -> looprpc.ServerLoopInQuoteResponse - 21, // 57: looprpc.SwapServer.SubscribeLoopOutUpdates:output_type -> looprpc.SubscribeLoopOutUpdatesResponse - 22, // 58: looprpc.SwapServer.SubscribeLoopInUpdates:output_type -> looprpc.SubscribeLoopInUpdatesResponse - 26, // 59: looprpc.SwapServer.CancelLoopOutSwap:output_type -> looprpc.CancelLoopOutSwapResponse - 28, // 60: looprpc.SwapServer.Probe:output_type -> looprpc.ServerProbeResponse - 30, // 61: looprpc.SwapServer.RecommendRoutingPlugin:output_type -> looprpc.RecommendRoutingPluginRes - 32, // 62: looprpc.SwapServer.ReportRoutingResult:output_type -> looprpc.ReportRoutingResultRes - 35, // 63: looprpc.SwapServer.MuSig2SignSweep:output_type -> looprpc.MuSig2SignSweepRes - 37, // 64: looprpc.SwapServer.PushKey:output_type -> looprpc.ServerPushKeyRes - 39, // 65: looprpc.SwapServer.FetchL402:output_type -> looprpc.FetchL402Response - 41, // 66: looprpc.SwapServer.SubscribeNotifications:output_type -> looprpc.SubscribeNotificationsResponse - 50, // [50:67] is the sub-list for method output_type - 33, // [33:50] is the sub-list for method input_type - 33, // [33:33] is the sub-list for extension type_name - 33, // [33:33] is the sub-list for extension extendee - 0, // [0:33] is the sub-list for field type_name + 46, // 31: looprpc.SubscribeNotificationsResponse.htlc_confirmed:type_name -> looprpc.ServerHtlcConfirmedNotification + 47, // 32: looprpc.ServerStaticLoopInSweepNotification.deposit_to_nonces:type_name -> looprpc.ServerStaticLoopInSweepNotification.DepositToNoncesEntry + 34, // 33: looprpc.ServerStaticLoopInSweepNotification.prevout_info:type_name -> looprpc.PrevoutInfo + 10, // 34: looprpc.SwapServer.LoopOutTerms:input_type -> looprpc.ServerLoopOutTermsRequest + 6, // 35: looprpc.SwapServer.NewLoopOutSwap:input_type -> looprpc.ServerLoopOutRequest + 18, // 36: looprpc.SwapServer.LoopOutPushPreimage:input_type -> looprpc.ServerLoopOutPushPreimageRequest + 8, // 37: looprpc.SwapServer.LoopOutQuote:input_type -> looprpc.ServerLoopOutQuoteRequest + 16, // 38: looprpc.SwapServer.LoopInTerms:input_type -> looprpc.ServerLoopInTermsRequest + 12, // 39: looprpc.SwapServer.NewLoopInSwap:input_type -> looprpc.ServerLoopInRequest + 14, // 40: looprpc.SwapServer.LoopInQuote:input_type -> looprpc.ServerLoopInQuoteRequest + 20, // 41: looprpc.SwapServer.SubscribeLoopOutUpdates:input_type -> looprpc.SubscribeUpdatesRequest + 20, // 42: looprpc.SwapServer.SubscribeLoopInUpdates:input_type -> looprpc.SubscribeUpdatesRequest + 25, // 43: looprpc.SwapServer.CancelLoopOutSwap:input_type -> looprpc.CancelLoopOutSwapRequest + 27, // 44: looprpc.SwapServer.Probe:input_type -> looprpc.ServerProbeRequest + 29, // 45: looprpc.SwapServer.RecommendRoutingPlugin:input_type -> looprpc.RecommendRoutingPluginReq + 31, // 46: looprpc.SwapServer.ReportRoutingResult:input_type -> looprpc.ReportRoutingResultReq + 33, // 47: looprpc.SwapServer.MuSig2SignSweep:input_type -> looprpc.MuSig2SignSweepReq + 36, // 48: looprpc.SwapServer.PushKey:input_type -> looprpc.ServerPushKeyReq + 38, // 49: looprpc.SwapServer.FetchL402:input_type -> looprpc.FetchL402Request + 40, // 50: looprpc.SwapServer.SubscribeNotifications:input_type -> looprpc.SubscribeNotificationsRequest + 11, // 51: looprpc.SwapServer.LoopOutTerms:output_type -> looprpc.ServerLoopOutTerms + 7, // 52: looprpc.SwapServer.NewLoopOutSwap:output_type -> looprpc.ServerLoopOutResponse + 19, // 53: looprpc.SwapServer.LoopOutPushPreimage:output_type -> looprpc.ServerLoopOutPushPreimageResponse + 9, // 54: looprpc.SwapServer.LoopOutQuote:output_type -> looprpc.ServerLoopOutQuote + 17, // 55: looprpc.SwapServer.LoopInTerms:output_type -> looprpc.ServerLoopInTerms + 13, // 56: looprpc.SwapServer.NewLoopInSwap:output_type -> looprpc.ServerLoopInResponse + 15, // 57: looprpc.SwapServer.LoopInQuote:output_type -> looprpc.ServerLoopInQuoteResponse + 21, // 58: looprpc.SwapServer.SubscribeLoopOutUpdates:output_type -> looprpc.SubscribeLoopOutUpdatesResponse + 22, // 59: looprpc.SwapServer.SubscribeLoopInUpdates:output_type -> looprpc.SubscribeLoopInUpdatesResponse + 26, // 60: looprpc.SwapServer.CancelLoopOutSwap:output_type -> looprpc.CancelLoopOutSwapResponse + 28, // 61: looprpc.SwapServer.Probe:output_type -> looprpc.ServerProbeResponse + 30, // 62: looprpc.SwapServer.RecommendRoutingPlugin:output_type -> looprpc.RecommendRoutingPluginRes + 32, // 63: looprpc.SwapServer.ReportRoutingResult:output_type -> looprpc.ReportRoutingResultRes + 35, // 64: looprpc.SwapServer.MuSig2SignSweep:output_type -> looprpc.MuSig2SignSweepRes + 37, // 65: looprpc.SwapServer.PushKey:output_type -> looprpc.ServerPushKeyRes + 39, // 66: looprpc.SwapServer.FetchL402:output_type -> looprpc.FetchL402Response + 41, // 67: looprpc.SwapServer.SubscribeNotifications:output_type -> looprpc.SubscribeNotificationsResponse + 51, // [51:68] is the sub-list for method output_type + 34, // [34:51] is the sub-list for method input_type + 34, // [34:34] is the sub-list for extension type_name + 34, // [34:34] is the sub-list for extension extendee + 0, // [0:34] is the sub-list for field type_name } func init() { file_server_proto_init() } @@ -3560,6 +3650,7 @@ func file_server_proto_init() { (*SubscribeNotificationsResponse_UnfinishedSwap)(nil), (*SubscribeNotificationsResponse_StaticLoopInRiskAccepted)(nil), (*SubscribeNotificationsResponse_StaticLoopInRiskRejected)(nil), + (*SubscribeNotificationsResponse_HtlcConfirmed)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -3567,7 +3658,7 @@ func file_server_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_server_proto_rawDesc), len(file_server_proto_rawDesc)), NumEnums: 6, - NumMessages: 41, + NumMessages: 42, NumExtensions: 0, NumServices: 1, }, diff --git a/swapserverrpc/server.proto b/swapserverrpc/server.proto index 0957fafaa..62e9fea16 100644 --- a/swapserverrpc/server.proto +++ b/swapserverrpc/server.proto @@ -683,6 +683,7 @@ message SubscribeNotificationsResponse { static_loop_in_risk_accepted = 4; ServerStaticLoopInRiskRejectedNotification static_loop_in_risk_rejected = 5; + ServerHtlcConfirmedNotification htlc_confirmed = 6; } } @@ -729,3 +730,17 @@ message ServerUnfinishedSwapNotification { // Whether the swap is a loop in or loop out. bool is_loop_in = 2; } + +// ServerHtlcConfirmedNotification is a notification from the server to the +// client that the on-chain HTLC for a loop out swap has been confirmed. +message ServerHtlcConfirmedNotification { + // The swap hash of the loop out swap. + bytes swap_hash = 1; + + // htlc_outpoint is the outpoint of the confirmed HTLC in the format + // "txid:output_index". + string htlc_outpoint = 2; + + // htlc_address is the address of the confirmed HTLC output. + string htlc_address = 3; +}