Skip to content

Commit

Permalink
Merge PR #34: Fix deadlock issue and clean up codec
Browse files Browse the repository at this point in the history
* Push work

* fix minor UnrelayedSequences issues

* Push additional retries

* fix test compilation issues
  • Loading branch information
jackzampolin committed Oct 25, 2021
1 parent 92b047c commit ba064cc
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 235 deletions.
3 changes: 2 additions & 1 deletion helpers/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ func KeyAddOrRestore(chain *relayer.Chain, keyName string, coinType uint32, mnem
return KeyOutput{}, err
}

defer chain.UseSDKContext()()
done := chain.UseSDKContext()
ko := KeyOutput{Mnemonic: mnemonicStr, Address: info.GetAddress().String()}
done()

return ko, nil
}
2 changes: 1 addition & 1 deletion relayer/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (c *Chain) CLIContext(height int64) sdkCtx.Context {
}
return sdkCtx.Context{}.
WithChainID(c.ChainID).
WithCodec(newContextualStdCodec(c.Encoding.Marshaler, c.UseSDKContext)).
WithCodec(c.Encoding.Marshaler).
WithInterfaceRegistry(c.Encoding.InterfaceRegistry).
WithTxConfig(c.Encoding.TxConfig).
WithLegacyAmino(c.Encoding.Amino).
Expand Down
178 changes: 0 additions & 178 deletions relayer/contextual.go

This file was deleted.

4 changes: 2 additions & 2 deletions relayer/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cosmos/cosmos-sdk/simapp"
"github.com/cosmos/cosmos-sdk/simapp/params"
"github.com/cosmos/cosmos-sdk/std"
signingtypes "github.com/cosmos/cosmos-sdk/types/tx/signing"
"github.com/cosmos/cosmos-sdk/x/auth/tx"
transfer "github.com/cosmos/ibc-go/modules/apps/transfer"
ibc "github.com/cosmos/ibc-go/modules/core"
Expand All @@ -23,7 +24,7 @@ func (c *Chain) MakeEncodingConfig() params.EncodingConfig {
amino := codec.NewLegacyAmino()
interfaceRegistry := types.NewInterfaceRegistry()
marshaler := c.NewProtoCodec(interfaceRegistry, c.AccountPrefix)
txCfg := tx.NewTxConfig(marshaler, tx.DefaultSignModes)
txCfg := tx.NewTxConfig(marshaler, []signingtypes.SignMode{signingtypes.SignMode_SIGN_MODE_DIRECT})

encodingConfig := params.EncodingConfig{
InterfaceRegistry: interfaceRegistry,
Expand Down Expand Up @@ -151,7 +152,6 @@ func (pc *ProtoCodec) MarshalJSON(o proto.Message) ([]byte, error) {
if !ok {
return nil, fmt.Errorf("cannot protobuf JSON encode unsupported type: %T", o)
}

bz, err := codec.ProtoMarshalJSON(m, pc.interfaceRegistry)
if err != nil {
return []byte{}, err
Expand Down
76 changes: 37 additions & 39 deletions relayer/naive-strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,7 @@ func (nrs *NaiveStrategy) UnrelayedSequences(src, dst *Chain) (*RelaySequences,
rs = &RelaySequences{Src: []uint64{}, Dst: []uint64{}}
)

srch, err := src.QueryLatestHeight()
if err != nil {
return nil, err
}
dsth, err := dst.QueryLatestHeight()
srch, dsth, err := QueryLatestHeights(src, dst)
if err != nil {
return nil, err
}
Expand All @@ -83,10 +79,7 @@ func (nrs *NaiveStrategy) UnrelayedSequences(src, dst *Chain) (*RelaySequences,
return nil
}
}, rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) {
if src.debug {
src.Log(fmt.Sprintf("- [%s]@{%d} - try(%d/%d) query packet commitments: %s", src.ChainID,
srch, n+1, rtyAttNum, err))
}
srch, _ = src.QueryLatestHeight()
})); err != nil {
return err
}
Expand All @@ -109,10 +102,7 @@ func (nrs *NaiveStrategy) UnrelayedSequences(src, dst *Chain) (*RelaySequences,
return nil
}
}, rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) {
if dst.debug {
dst.Log(fmt.Sprintf("- [%s]@{%d} - try(%d/%d) query packet commitments: %s",
dst.ChainID, dsth, n+1, rtyAttNum, err))
}
dsth, _ = dst.QueryLatestHeight()
})); err != nil {
return err
}
Expand All @@ -128,24 +118,22 @@ func (nrs *NaiveStrategy) UnrelayedSequences(src, dst *Chain) (*RelaySequences,

eg.Go(func() error {
// Query all packets sent by src that have been received by dst
rs.Src, err = dst.QueryUnreceivedPackets(uint64(dsth), srcPacketSeq)
if src.debug {
if out, err := json.Marshal(rs.Src); err != nil {
src.logUnreceivedPackets(dst, "commitments", string(out))
}
}
return err
return retry.Do(func() error {
rs.Src, err = dst.QueryUnreceivedPackets(uint64(dsth), srcPacketSeq)
return err
}, rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) {
dsth, _ = dst.QueryLatestHeight()
}))
})

eg.Go(func() error {
// Query all packets sent by dst that have been received by src
rs.Dst, err = src.QueryUnreceivedPackets(uint64(srch), dstPacketSeq)
if dst.debug {
if out, err := json.Marshal(rs.Dst); err != nil {
dst.logUnreceivedPackets(src, "commitments", string(out))
}
}
return err
return retry.Do(func() error {
rs.Dst, err = src.QueryUnreceivedPackets(uint64(srch), dstPacketSeq)
return err
}, rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) {
dsth, _ = dst.QueryLatestHeight()
}))
})

if err := eg.Wait(); err != nil {
Expand Down Expand Up @@ -534,38 +522,48 @@ func (nrs *NaiveStrategy) RelayPackets(src, dst *Chain, sp *RelaySequences) erro
// add messages for sequences on src
for _, seq := range sp.Src {
// Query src for the sequence number to get type of packet
recvMsgs, timeoutMsgs, err := relayPacketFromSequence(src, dst, uint64(srch), uint64(dsth), seq)
if err != nil {
var recvMsg, timeoutMsg sdk.Msg
if err = retry.Do(func() error {
recvMsg, timeoutMsg, err = relayPacketFromSequence(src, dst, uint64(srch), uint64(dsth), seq)
return err
}, rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) {
srch, dsth, _ = QueryLatestHeights(src, dst)
})); err != nil {
return err
}

// depending on the type of message to be relayed, we need to
// send to different chains
if recvMsgs != nil {
msgs.Dst = append(msgs.Dst, recvMsgs)
if recvMsg != nil {
msgs.Dst = append(msgs.Dst, recvMsg)
}

if timeoutMsgs != nil {
msgs.Src = append(msgs.Src, timeoutMsgs)
if timeoutMsg != nil {
msgs.Src = append(msgs.Src, timeoutMsg)
}
}

// add messages for sequences on dst
for _, seq := range sp.Dst {
// Query dst for the sequence number to get type of packet
recvMsgs, timeoutMsgs, err := relayPacketFromSequence(dst, src, uint64(dsth), uint64(srch), seq)
if err != nil {
var recvMsg, timeoutMsg sdk.Msg
if err = retry.Do(func() error {
recvMsg, timeoutMsg, err = relayPacketFromSequence(dst, src, uint64(dsth), uint64(srch), seq)
return nil
}, rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) {
srch, dsth, _ = QueryLatestHeights(src, dst)
})); err != nil {
return err
}

// depending on the type of message to be relayed, we need to
// send to different chains
if recvMsgs != nil {
msgs.Src = append(msgs.Src, recvMsgs)
if recvMsg != nil {
msgs.Src = append(msgs.Src, recvMsg)
}

if timeoutMsgs != nil {
msgs.Dst = append(msgs.Dst, timeoutMsgs)
if timeoutMsg != nil {
msgs.Dst = append(msgs.Dst, timeoutMsg)
}
}

Expand Down
12 changes: 6 additions & 6 deletions test/relayer_akash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ func TestAkashToGaiaStreamingRelayer(t *testing.T) {
testChannelPair(t, src, dst)

// send a couple of transfers to the queue on src
require.NoError(t, src.SendTransferMsg(dst, testCoin, dst.MustGetAddress().String(), 0, 0))
require.NoError(t, src.SendTransferMsg(dst, testCoin, dst.MustGetAddress().String(), 0, 0))
require.NoError(t, src.SendTransferMsg(dst, testCoin, dst.MustGetAddress(), 0, 0))
require.NoError(t, src.SendTransferMsg(dst, testCoin, dst.MustGetAddress(), 0, 0))

// send a couple of transfers to the queue on dst
require.NoError(t, dst.SendTransferMsg(src, testCoin, src.MustGetAddress().String(), 0, 0))
require.NoError(t, dst.SendTransferMsg(src, testCoin, src.MustGetAddress().String(), 0, 0))
require.NoError(t, dst.SendTransferMsg(src, testCoin, src.MustGetAddress(), 0, 0))
require.NoError(t, dst.SendTransferMsg(src, testCoin, src.MustGetAddress(), 0, 0))

// Wait for message inclusion in both chains
require.NoError(t, dst.WaitForNBlocks(1))
Expand All @@ -68,8 +68,8 @@ func TestAkashToGaiaStreamingRelayer(t *testing.T) {
require.NoError(t, dst.WaitForNBlocks(1))

// send those tokens from dst back to dst and src back to src
require.NoError(t, src.SendTransferMsg(dst, twoTestCoin, dst.MustGetAddress().String(), 0, 0))
require.NoError(t, dst.SendTransferMsg(src, twoTestCoin, src.MustGetAddress().String(), 0, 0))
require.NoError(t, src.SendTransferMsg(dst, twoTestCoin, dst.MustGetAddress(), 0, 0))
require.NoError(t, dst.SendTransferMsg(src, twoTestCoin, src.MustGetAddress(), 0, 0))

// wait for packet processing
require.NoError(t, dst.WaitForNBlocks(6))
Expand Down
Loading

0 comments on commit ba064cc

Please sign in to comment.