Skip to content

Commit

Permalink
Batch headers & messages when sending to parachain (paritytech#398)
Browse files Browse the repository at this point in the history
* Write header + messages in atomic batch call

* Update writer test

* indexing array -> append to array
  • Loading branch information
Rizziepit committed May 20, 2021
1 parent a61b34d commit bf923be
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 50 deletions.
4 changes: 2 additions & 2 deletions relayer/workers/ethrelayer/ethereum-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (li *EthereumListener) processEventsAndHeaders(

// Don't attempt to forward events prior to genesis block
if descendantsUntilFinal > gethheader.Number.Uint64() {
li.payloads <- ParachainPayload{header: header}
li.payloads <- ParachainPayload{Header: header}
continue
}

Expand Down Expand Up @@ -170,7 +170,7 @@ func (li *EthereumListener) processEventsAndHeaders(
return err
}

li.payloads <- ParachainPayload{header: header, messages: messages}
li.payloads <- ParachainPayload{Header: header, Messages: messages}
}
}
}
Expand Down
75 changes: 35 additions & 40 deletions relayer/workers/ethrelayer/parachain-writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
)

type ParachainPayload struct {
header *chain.Header
messages []*chain.EthereumOutboundMessage
Header *chain.Header
Messages []*chain.EthereumOutboundMessage
}

type ParachainWriter struct {
Expand Down Expand Up @@ -107,22 +107,19 @@ func (wr *ParachainWriter) writeLoop(ctx context.Context) error {
return nil
}

err := wr.WriteHeader(ctx, payload.header)
err := wr.WritePayload(ctx, &payload)
if err != nil {
wr.log.WithFields(logrus.Fields{
"blockNumber": payload.header.HeaderData.(ethereum.Header).Number,
"error": err,
}).Error("Failure submitting header to substrate")
wr.log.WithError(err).WithFields(logrus.Fields{
"blockNumber": payload.Header.HeaderData.(ethereum.Header).Number,
"messageCount": len(payload.Messages),
}).Error("Failure submitting header and messages to Substrate")
return err
}

err = wr.WriteMessages(ctx, payload.messages)
if err != nil {
wr.log.WithFields(logrus.Fields{
"error": err,
}).Error("Failure submitting message to substrate")
return err
}
wr.log.WithFields(logrus.Fields{
"blockNumber": payload.Header.HeaderData.(ethereum.Header).Number,
"messageCount": len(payload.Messages),
}).Info("Submitted header and messages to Substrate")
}
}
}
Expand Down Expand Up @@ -171,44 +168,42 @@ func (wr *ParachainWriter) write(ctx context.Context, c types.Call) error {
return nil
}

func (wr *ParachainWriter) WriteMessages(ctx context.Context, msgs []*chain.EthereumOutboundMessage) error {
for _, msg := range msgs {

c, err := types.NewCall(wr.conn.GetMetadata(), msg.Call, msg.Args...)
if err != nil {
return err
}
func (wr *ParachainWriter) WritePayload(ctx context.Context, payload *ParachainPayload) error {
var calls []types.Call
call, err := wr.makeHeaderImportCall(ctx, payload.Header)
if err != nil {
return err
}
calls = append(calls, call)

err = wr.write(ctx, c)
for _, msg := range payload.Messages {
call, err := wr.makeMessageSubmitCall(ctx, msg)
if err != nil {
return err
}
calls = append(calls, call)
}

wr.log.WithField("count", len(msgs)).Info("Submitted messages to Substrate")
call, err = types.NewCall(wr.conn.GetMetadata(), "Utility.batch_all", calls)
if err != nil {
return err
}

return nil
return wr.write(ctx, call)
}

// WriteHeader submits a "VerifierLightclient.import_header" call
func (wr *ParachainWriter) WriteHeader(ctx context.Context, header *chain.Header) error {
if header == (*chain.Header)(nil) {
return fmt.Errorf("Header is nil")
func (wr *ParachainWriter) makeMessageSubmitCall(ctx context.Context, msg *chain.EthereumOutboundMessage) (types.Call, error) {
if msg == (*chain.EthereumOutboundMessage)(nil) {
return types.Call{}, fmt.Errorf("Message is nil")
}

c, err := types.NewCall(wr.conn.GetMetadata(), "VerifierLightclient.import_header", header.HeaderData, header.ProofData)
if err != nil {
return err
}
return types.NewCall(wr.conn.GetMetadata(), msg.Call, msg.Args...)
}

err = wr.write(ctx, c)
if err != nil {
return err
func (wr *ParachainWriter) makeHeaderImportCall(ctx context.Context, header *chain.Header) (types.Call, error) {
if header == (*chain.Header)(nil) {
return types.Call{}, fmt.Errorf("Header is nil")
}

wr.log.WithFields(logrus.Fields{
"blockNumber": header.HeaderData.(ethereum.Header).Number,
}).Info("Submitted header to Substrate")

return nil
return types.NewCall(wr.conn.GetMetadata(), "VerifierLightclient.import_header", header.HeaderData, header.ProofData)
}
18 changes: 10 additions & 8 deletions relayer/workers/ethrelayer/parachain-writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"context"
"testing"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"

"golang.org/x/sync/errgroup"

Expand All @@ -22,7 +20,7 @@ import (
)

func TestWrite(t *testing.T) {
logger, hook := test.NewNullLogger()
logger, _ := test.NewNullLogger()
log := logger.WithField("chain", "Parachain")

conn := parachain.NewConnection("ws://127.0.0.1:11144/", sr25519.Alice().AsKeyringPair(), log)
Expand Down Expand Up @@ -61,13 +59,17 @@ func TestWrite(t *testing.T) {
Call: "BasicInboundChannel.submit",
Args: args,
}
header := chain.Header{
HeaderData: "headerdata",
ProofData: "proofdata",
}
payload := ethrelayer.ParachainPayload{
Header: &header,
Messages: []*chain.EthereumOutboundMessage{&message},
}

err = writer.WriteMessages(ctx, []*chain.EthereumOutboundMessage{&message})
err = writer.WritePayload(ctx, &payload)
if err != nil {
t.Fatal(err)
}

assert.Equal(t, logrus.InfoLevel, hook.LastEntry().Level)
assert.Equal(t, "Submitted messages to Substrate", hook.LastEntry().Message)

}

0 comments on commit bf923be

Please sign in to comment.