From ba064cc631cd03c7423495e31b5415965fdcbd64 Mon Sep 17 00:00:00 2001 From: Jack Zampolin Date: Mon, 13 Sep 2021 17:03:20 -0700 Subject: [PATCH] Merge PR #34: Fix deadlock issue and clean up codec * Push work * fix minor UnrelayedSequences issues * Push additional retries * fix test compilation issues --- helpers/keys.go | 3 +- relayer/chain.go | 2 +- relayer/contextual.go | 178 ------------------------------------- relayer/encoding.go | 4 +- relayer/naive-strategy.go | 76 ++++++++-------- test/relayer_akash_test.go | 12 +-- test/relayer_gaia_test.go | 14 +-- test/test_setup.go | 2 +- 8 files changed, 56 insertions(+), 235 deletions(-) delete mode 100644 relayer/contextual.go diff --git a/helpers/keys.go b/helpers/keys.go index 6e8eed60c24..83875d93f89 100644 --- a/helpers/keys.go +++ b/helpers/keys.go @@ -30,8 +30,9 @@ func KeyAddOrRestore(chain *relayer.Chain, keyName string, coinType uint32, mnem return KeyOutput{}, err } - defer chain.UseSDKContext()() + done := chain.UseSDKContext() ko := KeyOutput{Mnemonic: mnemonicStr, Address: info.GetAddress().String()} + done() return ko, nil } diff --git a/relayer/chain.go b/relayer/chain.go index 6d7d14be111..d829da71894 100644 --- a/relayer/chain.go +++ b/relayer/chain.go @@ -452,7 +452,7 @@ func (c *Chain) CLIContext(height int64) sdkCtx.Context { } return sdkCtx.Context{}. WithChainID(c.ChainID). - WithCodec(newContextualStdCodec(c.Encoding.Marshaler, c.UseSDKContext)). + WithCodec(c.Encoding.Marshaler). WithInterfaceRegistry(c.Encoding.InterfaceRegistry). WithTxConfig(c.Encoding.TxConfig). WithLegacyAmino(c.Encoding.Amino). diff --git a/relayer/contextual.go b/relayer/contextual.go deleted file mode 100644 index 22a25bc992f..00000000000 --- a/relayer/contextual.go +++ /dev/null @@ -1,178 +0,0 @@ -package relayer - -import ( - "github.com/cosmos/cosmos-sdk/codec" - "github.com/gogo/protobuf/proto" -) - -type contextualStdCodec struct { - codec.Codec - useContext func() func() -} - -var _ codec.Codec = &contextualStdCodec{} - -// newContextualStdCodec creates a codec that sets and resets context -func newContextualStdCodec(cdc codec.Codec, useContext func() func()) *contextualStdCodec { - return &contextualStdCodec{ - Codec: cdc, - useContext: useContext, - } -} - -// MarshalJSON marshals with the original codec and new context -func (cdc *contextualStdCodec) MarshalJSON(ptr proto.Message) ([]byte, error) { - done := cdc.useContext() - defer done() - - return cdc.Codec.MarshalJSON(ptr) -} - -func (cdc *contextualStdCodec) MustMarshalJSON(ptr proto.Message) []byte { - out, err := cdc.MarshalJSON(ptr) - if err != nil { - panic(err) - } - return out -} - -// UnmarshalJSON unmarshals with the original codec and new context -func (cdc *contextualStdCodec) UnmarshalJSON(bz []byte, ptr proto.Message) error { - done := cdc.useContext() - defer done() - - return cdc.Codec.UnmarshalJSON(bz, ptr) -} - -func (cdc *contextualStdCodec) MustUnmarshalJSON(bz []byte, ptr proto.Message) { - if err := cdc.UnmarshalJSON(bz, ptr); err != nil { - panic(err) - } -} - -func (cdc *contextualStdCodec) Marshal(ptr codec.ProtoMarshaler) ([]byte, error) { - done := cdc.useContext() - defer done() - - return cdc.Codec.Marshal(ptr) -} - -func (cdc *contextualStdCodec) MustMarshal(ptr codec.ProtoMarshaler) []byte { - out, err := cdc.Marshal(ptr) - if err != nil { - panic(err) - } - return out -} - -func (cdc *contextualStdCodec) Unmarshal(bz []byte, ptr codec.ProtoMarshaler) error { - done := cdc.useContext() - defer done() - - return cdc.Codec.Unmarshal(bz, ptr) -} - -func (cdc *contextualStdCodec) MustUnmarshal(bz []byte, ptr codec.ProtoMarshaler) { - if err := cdc.Unmarshal(bz, ptr); err != nil { - panic(err) - } -} - -// // newContextualCodec creates a codec that sets and resets context -// func newContextualAminoCodec(cdc *codec.LegacyAmino, useContext func() func()) *contextualAminoCodec { -// return &contextualAminoCodec{ -// LegacyAmino: cdc, -// useContext: useContext, -// } -// } - -// // MarshalJSON marshals with the original codec and new context -// func (cdc *contextualAminoCodec) MarshalJSON(ptr proto.Message) ([]byte, error) { -// done := cdc.useContext() -// defer done() - -// return cdc.LegacyAmino.MarshalJSON(ptr) -// } - -// // MustMarshalJSON marshals with the original codec and new context -// func (cdc *contextualAminoCodec) MustMarshalJSON(ptr proto.Message) []byte { -// out, err := cdc.MarshalJSON(ptr) -// if err != nil { -// panic(err) -// } -// return out -// } - -// // UnmarshalJSON unmarshals with the original codec and new context -// func (cdc *contextualAminoCodec) UnmarshalJSON(bz []byte, ptr proto.Message) error { -// done := cdc.useContext() -// defer done() - -// return cdc.LegacyAmino.UnmarshalJSON(bz, ptr) -// } - -// // MustUnmarshalJSON unmarshals with the original codec and new context -// func (cdc *contextualAminoCodec) MustUnmarshalJSON(bz []byte, ptr proto.Message) { -// if err := cdc.UnmarshalJSON(bz, ptr); err != nil { -// panic(err) -// } -// return -// } - -// func (cdc *contextualAminoCodec) Marshal(ptr codec.ProtoMarshaler) ([]byte, error) { -// done := cdc.useContext() -// defer done() - -// return cdc.LegacyAmino.Marshal(ptr) -// } - -// func (cdc *contextualAminoCodec) MustMarshal(ptr codec.ProtoMarshaler) []byte { -// out, err := cdc.Marshal(ptr) -// if err != nil { -// panic(err) -// } -// return out -// } - -// func (cdc *contextualAminoCodec) Unmarshal(bz []byte, ptr codec.ProtoMarshaler) error { -// done := cdc.useContext() -// defer done() - -// return cdc.LegacyAmino.Unmarshal(bz, ptr) -// } - -// func (cdc *contextualAminoCodec) MustUnmarshal(bz []byte, ptr codec.ProtoMarshaler) { -// if err := cdc.Unmarshal(bz, ptr); err != nil { -// panic(err) -// } -// return -// } - -// func (cdc *contextualAminoCodec) MarshalBinaryLengthPrefixed(ptr codec.ProtoMarshaler) ([]byte, error) { -// done := cdc.useContext() -// defer done() - -// return cdc.LegacyAmino.MarshalBinaryLengthPrefixed(ptr) -// } - -// func (cdc *contextualAminoCodec) MustMarshalBinaryLengthPrefixed(ptr codec.ProtoMarshaler) []byte { -// out, err := cdc.MarshalBinaryLengthPrefixed(ptr) -// if err != nil { -// panic(err) -// } -// return out -// } - -// func (cdc *contextualAminoCodec) UnmarshalBinaryLengthPrefixed(bz []byte, ptr codec.ProtoMarshaler) error { -// done := cdc.useContext() -// defer done() - -// return cdc.LegacyAmino.UnmarshalBinaryLengthPrefixed(bz, ptr) -// } - -// func (cdc *contextualAminoCodec) MustUnmarshalBinaryLengthPrefixed(bz []byte, ptr codec.ProtoMarshaler) { -// if err := cdc.UnmarshalBinaryLengthPrefixed(bz, ptr); err != nil { -// panic(err) -// } -// return -// } diff --git a/relayer/encoding.go b/relayer/encoding.go index fccddd8f00f..67856f6e86e 100644 --- a/relayer/encoding.go +++ b/relayer/encoding.go @@ -11,6 +11,7 @@ import ( "github.com/cosmos/cosmos-sdk/simapp" "github.com/cosmos/cosmos-sdk/simapp/params" "github.com/cosmos/cosmos-sdk/std" + signingtypes "github.com/cosmos/cosmos-sdk/types/tx/signing" "github.com/cosmos/cosmos-sdk/x/auth/tx" transfer "github.com/cosmos/ibc-go/modules/apps/transfer" ibc "github.com/cosmos/ibc-go/modules/core" @@ -23,7 +24,7 @@ func (c *Chain) MakeEncodingConfig() params.EncodingConfig { amino := codec.NewLegacyAmino() interfaceRegistry := types.NewInterfaceRegistry() marshaler := c.NewProtoCodec(interfaceRegistry, c.AccountPrefix) - txCfg := tx.NewTxConfig(marshaler, tx.DefaultSignModes) + txCfg := tx.NewTxConfig(marshaler, []signingtypes.SignMode{signingtypes.SignMode_SIGN_MODE_DIRECT}) encodingConfig := params.EncodingConfig{ InterfaceRegistry: interfaceRegistry, @@ -151,7 +152,6 @@ func (pc *ProtoCodec) MarshalJSON(o proto.Message) ([]byte, error) { if !ok { return nil, fmt.Errorf("cannot protobuf JSON encode unsupported type: %T", o) } - bz, err := codec.ProtoMarshalJSON(m, pc.interfaceRegistry) if err != nil { return []byte{}, err diff --git a/relayer/naive-strategy.go b/relayer/naive-strategy.go index 65b5c668a8d..02c3e12beea 100644 --- a/relayer/naive-strategy.go +++ b/relayer/naive-strategy.go @@ -60,11 +60,7 @@ func (nrs *NaiveStrategy) UnrelayedSequences(src, dst *Chain) (*RelaySequences, rs = &RelaySequences{Src: []uint64{}, Dst: []uint64{}} ) - srch, err := src.QueryLatestHeight() - if err != nil { - return nil, err - } - dsth, err := dst.QueryLatestHeight() + srch, dsth, err := QueryLatestHeights(src, dst) if err != nil { return nil, err } @@ -83,10 +79,7 @@ func (nrs *NaiveStrategy) UnrelayedSequences(src, dst *Chain) (*RelaySequences, return nil } }, rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) { - if src.debug { - src.Log(fmt.Sprintf("- [%s]@{%d} - try(%d/%d) query packet commitments: %s", src.ChainID, - srch, n+1, rtyAttNum, err)) - } + srch, _ = src.QueryLatestHeight() })); err != nil { return err } @@ -109,10 +102,7 @@ func (nrs *NaiveStrategy) UnrelayedSequences(src, dst *Chain) (*RelaySequences, return nil } }, rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) { - if dst.debug { - dst.Log(fmt.Sprintf("- [%s]@{%d} - try(%d/%d) query packet commitments: %s", - dst.ChainID, dsth, n+1, rtyAttNum, err)) - } + dsth, _ = dst.QueryLatestHeight() })); err != nil { return err } @@ -128,24 +118,22 @@ func (nrs *NaiveStrategy) UnrelayedSequences(src, dst *Chain) (*RelaySequences, eg.Go(func() error { // Query all packets sent by src that have been received by dst - rs.Src, err = dst.QueryUnreceivedPackets(uint64(dsth), srcPacketSeq) - if src.debug { - if out, err := json.Marshal(rs.Src); err != nil { - src.logUnreceivedPackets(dst, "commitments", string(out)) - } - } - return err + return retry.Do(func() error { + rs.Src, err = dst.QueryUnreceivedPackets(uint64(dsth), srcPacketSeq) + return err + }, rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) { + dsth, _ = dst.QueryLatestHeight() + })) }) eg.Go(func() error { // Query all packets sent by dst that have been received by src - rs.Dst, err = src.QueryUnreceivedPackets(uint64(srch), dstPacketSeq) - if dst.debug { - if out, err := json.Marshal(rs.Dst); err != nil { - dst.logUnreceivedPackets(src, "commitments", string(out)) - } - } - return err + return retry.Do(func() error { + rs.Dst, err = src.QueryUnreceivedPackets(uint64(srch), dstPacketSeq) + return err + }, rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) { + dsth, _ = dst.QueryLatestHeight() + })) }) if err := eg.Wait(); err != nil { @@ -534,38 +522,48 @@ func (nrs *NaiveStrategy) RelayPackets(src, dst *Chain, sp *RelaySequences) erro // add messages for sequences on src for _, seq := range sp.Src { // Query src for the sequence number to get type of packet - recvMsgs, timeoutMsgs, err := relayPacketFromSequence(src, dst, uint64(srch), uint64(dsth), seq) - if err != nil { + var recvMsg, timeoutMsg sdk.Msg + if err = retry.Do(func() error { + recvMsg, timeoutMsg, err = relayPacketFromSequence(src, dst, uint64(srch), uint64(dsth), seq) + return err + }, rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) { + srch, dsth, _ = QueryLatestHeights(src, dst) + })); err != nil { return err } // depending on the type of message to be relayed, we need to // send to different chains - if recvMsgs != nil { - msgs.Dst = append(msgs.Dst, recvMsgs) + if recvMsg != nil { + msgs.Dst = append(msgs.Dst, recvMsg) } - if timeoutMsgs != nil { - msgs.Src = append(msgs.Src, timeoutMsgs) + if timeoutMsg != nil { + msgs.Src = append(msgs.Src, timeoutMsg) } } // add messages for sequences on dst for _, seq := range sp.Dst { // Query dst for the sequence number to get type of packet - recvMsgs, timeoutMsgs, err := relayPacketFromSequence(dst, src, uint64(dsth), uint64(srch), seq) - if err != nil { + var recvMsg, timeoutMsg sdk.Msg + if err = retry.Do(func() error { + recvMsg, timeoutMsg, err = relayPacketFromSequence(dst, src, uint64(dsth), uint64(srch), seq) + return nil + }, rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) { + srch, dsth, _ = QueryLatestHeights(src, dst) + })); err != nil { return err } // depending on the type of message to be relayed, we need to // send to different chains - if recvMsgs != nil { - msgs.Src = append(msgs.Src, recvMsgs) + if recvMsg != nil { + msgs.Src = append(msgs.Src, recvMsg) } - if timeoutMsgs != nil { - msgs.Dst = append(msgs.Dst, timeoutMsgs) + if timeoutMsg != nil { + msgs.Dst = append(msgs.Dst, timeoutMsg) } } diff --git a/test/relayer_akash_test.go b/test/relayer_akash_test.go index 048f4ba3d05..e20cf35e081 100644 --- a/test/relayer_akash_test.go +++ b/test/relayer_akash_test.go @@ -49,12 +49,12 @@ func TestAkashToGaiaStreamingRelayer(t *testing.T) { testChannelPair(t, src, dst) // send a couple of transfers to the queue on src - require.NoError(t, src.SendTransferMsg(dst, testCoin, dst.MustGetAddress().String(), 0, 0)) - require.NoError(t, src.SendTransferMsg(dst, testCoin, dst.MustGetAddress().String(), 0, 0)) + require.NoError(t, src.SendTransferMsg(dst, testCoin, dst.MustGetAddress(), 0, 0)) + require.NoError(t, src.SendTransferMsg(dst, testCoin, dst.MustGetAddress(), 0, 0)) // send a couple of transfers to the queue on dst - require.NoError(t, dst.SendTransferMsg(src, testCoin, src.MustGetAddress().String(), 0, 0)) - require.NoError(t, dst.SendTransferMsg(src, testCoin, src.MustGetAddress().String(), 0, 0)) + require.NoError(t, dst.SendTransferMsg(src, testCoin, src.MustGetAddress(), 0, 0)) + require.NoError(t, dst.SendTransferMsg(src, testCoin, src.MustGetAddress(), 0, 0)) // Wait for message inclusion in both chains require.NoError(t, dst.WaitForNBlocks(1)) @@ -68,8 +68,8 @@ func TestAkashToGaiaStreamingRelayer(t *testing.T) { require.NoError(t, dst.WaitForNBlocks(1)) // send those tokens from dst back to dst and src back to src - require.NoError(t, src.SendTransferMsg(dst, twoTestCoin, dst.MustGetAddress().String(), 0, 0)) - require.NoError(t, dst.SendTransferMsg(src, twoTestCoin, src.MustGetAddress().String(), 0, 0)) + require.NoError(t, src.SendTransferMsg(dst, twoTestCoin, dst.MustGetAddress(), 0, 0)) + require.NoError(t, dst.SendTransferMsg(src, twoTestCoin, src.MustGetAddress(), 0, 0)) // wait for packet processing require.NoError(t, dst.WaitForNBlocks(6)) diff --git a/test/relayer_gaia_test.go b/test/relayer_gaia_test.go index 9616036a2ce..3424f68c523 100644 --- a/test/relayer_gaia_test.go +++ b/test/relayer_gaia_test.go @@ -59,12 +59,12 @@ func TestGaiaToGaiaStreamingRelayer(t *testing.T) { testChannelPair(t, src, dst) // send a couple of transfers to the queue on src - require.NoError(t, src.SendTransferMsg(dst, testCoin, dst.MustGetAddress().String(), 0, 0)) - require.NoError(t, src.SendTransferMsg(dst, testCoin, dst.MustGetAddress().String(), 0, 0)) + require.NoError(t, src.SendTransferMsg(dst, testCoin, dst.MustGetAddress(), 0, 0)) + require.NoError(t, src.SendTransferMsg(dst, testCoin, dst.MustGetAddress(), 0, 0)) // send a couple of transfers to the queue on dst - require.NoError(t, dst.SendTransferMsg(src, testCoin, src.MustGetAddress().String(), 0, 0)) - require.NoError(t, dst.SendTransferMsg(src, testCoin, src.MustGetAddress().String(), 0, 0)) + require.NoError(t, dst.SendTransferMsg(src, testCoin, src.MustGetAddress(), 0, 0)) + require.NoError(t, dst.SendTransferMsg(src, testCoin, src.MustGetAddress(), 0, 0)) // Wait for message inclusion in both chains require.NoError(t, dst.WaitForNBlocks(1)) @@ -78,8 +78,8 @@ func TestGaiaToGaiaStreamingRelayer(t *testing.T) { require.NoError(t, dst.WaitForNBlocks(1)) // send those tokens from dst back to dst and src back to src - require.NoError(t, src.SendTransferMsg(dst, twoTestCoin, dst.MustGetAddress().String(), 0, 0)) - require.NoError(t, dst.SendTransferMsg(src, twoTestCoin, src.MustGetAddress().String(), 0, 0)) + require.NoError(t, src.SendTransferMsg(dst, twoTestCoin, dst.MustGetAddress(), 0, 0)) + require.NoError(t, dst.SendTransferMsg(src, twoTestCoin, src.MustGetAddress(), 0, 0)) // wait for packet processing require.NoError(t, dst.WaitForNBlocks(6)) @@ -234,7 +234,7 @@ func TestGaiaMisbehaviourMonitoring(t *testing.T) { header.GetTime().Add(time.Minute), valSet, valSet, signers, header) // update client with duplicate header - updateMsg, err := clienttypes.NewMsgUpdateClient(src.PathEnd.ClientID, newHeader, src.MustGetAddress().String()) + updateMsg, err := clienttypes.NewMsgUpdateClient(src.PathEnd.ClientID, newHeader, src.MustGetAddress()) require.NoError(t, err) res, success, err := src.SendMsg(updateMsg) diff --git a/test/test_setup.go b/test/test_setup.go index 1df6547e9eb..18826f2677a 100644 --- a/test/test_setup.go +++ b/test/test_setup.go @@ -144,7 +144,7 @@ func spinUpTestContainer(t *testing.T, rchan chan<- *dockertest.Resource, ExposedPorts: []string{tc.t.rpcPort, c.GetRPCPort()}, Cmd: []string{ c.ChainID, - c.MustGetAddress().String(), + c.MustGetAddress(), getPrivValFileName(tc.seed), }, PortBindings: map[dc.Port][]dc.PortBinding{