Skip to content

Commit

Permalink
agglayer support(#160)
Browse files Browse the repository at this point in the history
* update

* update

* agglayer sync (#161)

* update

* update
  • Loading branch information
zjg555543 committed Mar 30, 2024
1 parent 796bdf1 commit ba4e89c
Show file tree
Hide file tree
Showing 32 changed files with 568 additions and 120 deletions.
40 changes: 19 additions & 21 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package aggregator

import (
"context"
"crypto/ecdsa"
"encoding/json"
"errors"
"fmt"
Expand All @@ -13,6 +14,7 @@ import (
"time"
"unicode"

"github.com/0xPolygon/agglayer/client"
"github.com/0xPolygonHermez/zkevm-node/aggregator/metrics"
"github.com/0xPolygonHermez/zkevm-node/aggregator/prover"
"github.com/0xPolygonHermez/zkevm-node/config/types"
Expand Down Expand Up @@ -67,6 +69,9 @@ type Aggregator struct {
srv *grpc.Server
ctx context.Context
exit context.CancelFunc

AggLayerClient client.ClientInterface
sequencerPrivateKey *ecdsa.PrivateKey
}

// New creates a new aggregator.
Expand All @@ -75,6 +80,8 @@ func New(
stateInterface stateInterface,
ethTxManager ethTxManager,
etherman etherman,
agglayerClient client.ClientInterface,
sequencerPrivateKey *ecdsa.PrivateKey,
) (Aggregator, error) {
var profitabilityChecker aggregatorTxProfitabilityChecker
switch cfg.TxProfitabilityCheckerType {
Expand All @@ -96,6 +103,9 @@ func New(
TimeCleanupLockedProofs: cfg.CleanupLockedProofsInterval,

finalProof: make(chan finalProofMsg),

AggLayerClient: agglayerClient,
sequencerPrivateKey: sequencerPrivateKey,
}

return a, nil
Expand Down Expand Up @@ -269,28 +279,16 @@ func (a *Aggregator) sendFinalProof() {

log.Infof("Final proof inputs: NewLocalExitRoot [%#x], NewStateRoot [%#x]", inputs.NewLocalExitRoot, inputs.NewStateRoot)

// add batch verification to be monitored
sender := common.HexToAddress(a.cfg.SenderAddress)
to, data, err := a.Ethman.BuildTrustedVerifyBatchesTxData(proof.BatchNumber-1, proof.BatchNumberFinal, &inputs, sender)
if err != nil {
log.Errorf("Error estimating batch verification to add to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
continue
}
monitoredTxID := buildMonitoredTxID(proof.BatchNumber, proof.BatchNumberFinal)
err = a.EthTxManager.Add(ctx, ethTxManagerOwner, monitoredTxID, sender, to, nil, data, a.cfg.GasOffset, nil)
if err != nil {
mTxLogger := ethtxmanager.CreateLogger(ethTxManagerOwner, monitoredTxID, sender, to)
mTxLogger.Errorf("Error to add batch verification tx to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)
continue
switch a.cfg.SettlementBackend {
case AggLayer:
if success := a.settleWithAggLayer(ctx, proof, inputs); !success {
continue
}
default:
if success := a.settleDirect(ctx, proof, inputs); !success {
continue
}
}

// process monitored batch verifications before starting a next cycle
a.EthTxManager.ProcessPendingMonitoredTxs(ctx, ethTxManagerOwner, func(result ethtxmanager.MonitoredTxResult, dbTx pgx.Tx) {
a.handleMonitoredTxResult(result)
}, nil)

a.resetVerifyProofTime()
a.endProofVerification()
}
Expand Down
10 changes: 5 additions & 5 deletions aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestSendFinalProof(t *testing.T) {
stateMock := mocks.NewStateMock(t)
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nil, nil)
require.NoError(err)
a.ctx, a.exit = context.WithCancel(context.Background())
m := mox{
Expand Down Expand Up @@ -686,7 +686,7 @@ func TestTryAggregateProofs(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nil, nil)
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down Expand Up @@ -1029,7 +1029,7 @@ func TestTryGenerateBatchProof(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nil, nil)
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down Expand Up @@ -1306,7 +1306,7 @@ func TestTryBuildFinalProof(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nil, nil)
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down Expand Up @@ -1436,7 +1436,7 @@ func TestIsSynced(t *testing.T) {
ethTxManager := mocks.NewEthTxManager(t)
etherman := mocks.NewEtherman(t)
proverMock := mocks.NewProverMock(t)
a, err := New(cfg, stateMock, ethTxManager, etherman)
a, err := New(cfg, stateMock, ethTxManager, etherman, nil, nil)
require.NoError(err)
aggregatorCtx := context.WithValue(context.Background(), "owner", "aggregator") //nolint:staticcheck
a.ctx, a.exit = context.WithCancel(aggregatorCtx)
Expand Down
133 changes: 133 additions & 0 deletions aggregator/aggregator_xlayer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package aggregator

import (
"context"
"fmt"
"strings"
"time"

agglayerTypes "github.com/0xPolygon/agglayer/rpc/types"
"github.com/0xPolygon/agglayer/tx"
ethmanTypes "github.com/0xPolygonHermez/zkevm-node/etherman/types"
"github.com/0xPolygonHermez/zkevm-node/ethtxmanager"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/ethereum/go-ethereum/common"
"github.com/jackc/pgx/v4"
)

func (a *Aggregator) settleDirect(
ctx context.Context,
proof *state.Proof,
inputs ethmanTypes.FinalProofInputs,
) (success bool) {
// add batch verification to be monitored
sender := common.HexToAddress(a.cfg.SenderAddress)

to, data, err := a.Ethman.BuildTrustedVerifyBatchesTxData(
proof.BatchNumber-1,
proof.BatchNumberFinal,
&inputs,
sender,
)
if err != nil {
log.Errorf("Error estimating batch verification to add to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)

return false
}

monitoredTxID := buildMonitoredTxID(proof.BatchNumber, proof.BatchNumberFinal)
err = a.EthTxManager.Add(
ctx,
ethTxManagerOwner,
monitoredTxID,
sender,
to,
nil,
data,
a.cfg.GasOffset,
nil,
)
if err != nil {
mTxLogger := ethtxmanager.CreateLogger(ethTxManagerOwner, monitoredTxID, sender, to)
mTxLogger.Errorf("Error to add batch verification tx to eth tx manager: %v", err)
a.handleFailureToAddVerifyBatchToBeMonitored(ctx, proof)

return false
}

// process monitored batch verifications before starting a next cycle
a.EthTxManager.ProcessPendingMonitoredTxs(
ctx,
ethTxManagerOwner,
func(result ethtxmanager.MonitoredTxResult, dbTx pgx.Tx) {
a.handleMonitoredTxResult(result)
},
nil,
)

return true
}

func (a *Aggregator) settleWithAggLayer(
ctx context.Context,
proof *state.Proof,
inputs ethmanTypes.FinalProofInputs,
) (success bool) {
proofStrNo0x := strings.TrimPrefix(inputs.FinalProof.Proof, "0x")
proofBytes := common.Hex2Bytes(proofStrNo0x)
tx := tx.Tx{
LastVerifiedBatch: agglayerTypes.ArgUint64(proof.BatchNumber - 1),
NewVerifiedBatch: agglayerTypes.ArgUint64(proof.BatchNumberFinal),
ZKP: tx.ZKP{
NewStateRoot: common.BytesToHash(inputs.NewStateRoot),
NewLocalExitRoot: common.BytesToHash(inputs.NewLocalExitRoot),
Proof: agglayerTypes.ArgBytes(proofBytes),
},
RollupID: a.Ethman.GetRollupId(),
}
signedTx, err := tx.Sign(a.sequencerPrivateKey)

if err != nil {
log.Errorf("failed to sign tx: %v", err)
a.handleFailureToSendToAggLayer(ctx, proof)

return false
}

log.Debug("final proof signedTx: ", signedTx.Tx.ZKP.Proof.Hex())
txHash, err := a.AggLayerClient.SendTx(*signedTx)
if err != nil {
log.Errorf("failed to send tx to the interop: %v", err)
a.handleFailureToSendToAggLayer(ctx, proof)

return false
}

log.Infof("tx %s sent to agglayer, waiting to be mined", txHash.Hex())
log.Debugf("Timeout set to %f seconds", a.cfg.AggLayerTxTimeout.Duration.Seconds())
waitCtx, cancelFunc := context.WithDeadline(ctx, time.Now().Add(a.cfg.AggLayerTxTimeout.Duration))
defer cancelFunc()
if err := a.AggLayerClient.WaitTxToBeMined(txHash, waitCtx); err != nil {
log.Errorf("interop didn't mine the tx: %v", err)
a.handleFailureToSendToAggLayer(ctx, proof)

return false
}

// TODO: wait for synchronizer to catch up
return true
}

func (a *Aggregator) handleFailureToSendToAggLayer(ctx context.Context, proof *state.Proof) {
log := log.WithFields("proofId", proof.ProofID, "batches", fmt.Sprintf("%d-%d", proof.BatchNumber, proof.BatchNumberFinal))
proof.GeneratingSince = nil

err := a.State.UpdateGeneratedProof(ctx, proof, nil)
if err != nil {
log.Errorf("Failed updating proof state (false): %v", err)
}

a.endProofVerification()
}
12 changes: 12 additions & 0 deletions aggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,16 @@ type Config struct {

// BatchProofL1BlockConfirmations is number of L1 blocks to consider we can generate the proof for a virtual batch
BatchProofL1BlockConfirmations uint64 `mapstructure:"BatchProofL1BlockConfirmations"`

// SettlementBackend configuration defines how a final ZKP should be settled. Directly to L1 or over the Beethoven service.
SettlementBackend SettlementBackend `mapstructure:"SettlementBackend"`

// AggLayerTxTimeout is the interval time to wait for a tx to be mined from the agglayer
AggLayerTxTimeout types.Duration `mapstructure:"AggLayerTxTimeout"`

// AggLayerURL url of the agglayer service
AggLayerURL string `mapstructure:"AggLayerURL"`

// SequencerPrivateKey Private key of the trusted sequencer
SequencerPrivateKey types.KeystoreFileConfig `mapstructure:"SequencerPrivateKey"`
}
12 changes: 12 additions & 0 deletions aggregator/config_xlayer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package aggregator

// SettlementBackend is the type of the settlement backend
type SettlementBackend string

const (
// AggLayer settlement backend
AggLayer SettlementBackend = "agglayer"

// L1 settlement backend
L1 SettlementBackend = "l1"
)
1 change: 1 addition & 0 deletions aggregator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ethTxManager interface {

// etherman contains the methods required to interact with ethereum
type etherman interface {
GetRollupId() uint32
GetLatestVerifiedBatchNum() (uint64, error)
BuildTrustedVerifyBatchesTxData(lastVerifiedBatch, newVerifiedBatch uint64, inputs *ethmanTypes.FinalProofInputs, beneficiary common.Address) (to *common.Address, data []byte, err error)
GetLatestBlockHeader(ctx context.Context) (*types.Header, error)
Expand Down
21 changes: 21 additions & 0 deletions aggregator/mocks/mock_etherman_xlayer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 20 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"net"
Expand All @@ -12,6 +13,7 @@ import (
"runtime"
"time"

agglayerClient "github.com/0xPolygon/agglayer/client"
datastreamerlog "github.com/0xPolygonHermez/zkevm-data-streamer/log"
"github.com/0xPolygonHermez/zkevm-node"
"github.com/0xPolygonHermez/zkevm-node/aggregator"
Expand Down Expand Up @@ -441,7 +443,23 @@ func createSequenceSender(cfg config.Config, pool *pool.Pool, etmStorage *ethtxm
}

func runAggregator(ctx context.Context, c aggregator.Config, etherman *etherman.Client, ethTxManager *ethtxmanager.Client, st *state.State) {
agg, err := aggregator.New(c, st, ethTxManager, etherman)
var (
aggCli *agglayerClient.Client
pk *ecdsa.PrivateKey
err error
)

if c.SettlementBackend == aggregator.AggLayer {
aggCli = agglayerClient.New(c.AggLayerURL)

// Load private key
pk, err = config.NewKeyFromKeystore(c.SequencerPrivateKey)
if err != nil {
log.Fatal(err)
}
}

agg, err := aggregator.New(c, st, ethTxManager, etherman, aggCli, pk)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -511,7 +529,7 @@ func newState(ctx context.Context, c *config.Config, etherman *etherman.Client,
}
log.Infof("Starting L1InfoRoot: %v", l1inforoot.String())

forkIDIntervals, err := forkIDIntervals(ctx, st, etherman, c.NetworkConfig.Genesis.BlockNumber)
forkIDIntervals, err := forkIDIntervals(ctx, st, etherman, c.NetworkConfig.Genesis.RollupBlockNumber)
if err != nil {
log.Fatal("error getting forkIDs. Error: ", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/run_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func newDataAvailability(c config.Config, st *state.State, etherman *etherman.Cl
c.Etherman.URL,
dacAddr,
pk,
&dataCommitteeClient.Factory{},
dataCommitteeClient.NewFactory(),
)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit ba4e89c

Please sign in to comment.