Skip to content

Commit

Permalink
Merge pull request #433 from ethereum-optimism/p2p-api
Browse files Browse the repository at this point in the history
opnode: p2p RPC, fix static-peers
  • Loading branch information
protolambda committed May 13, 2022
2 parents fb044a6 + a8243cb commit 4170fcb
Show file tree
Hide file tree
Showing 15 changed files with 918 additions and 83 deletions.
2 changes: 1 addition & 1 deletion opnode/flags/p2p_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ var (
Usage: "User-agent string to share via LibP2P identify. If empty it defaults to 'optimism-VERSIONHERE'.",
Hidden: true,
Required: false,
Value: "",
Value: "optimism",
EnvVar: p2pEnv("AGENT"),
}
TimeoutNegotiation = cli.DurationFlag{
Expand Down
76 changes: 23 additions & 53 deletions opnode/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ import (

"github.com/ethereum-optimism/optimistic-specs/opnode/p2p"

"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p-core/host"
pubsub "github.com/libp2p/go-libp2p-pubsub"

multierror "github.com/hashicorp/go-multierror"

"github.com/ethereum-optimism/optimistic-specs/opnode/backoff"
Expand All @@ -39,11 +34,7 @@ type OpNode struct {
l2Engines []*driver.Driver // engines to keep synced
l2Nodes []*rpc.Client // L2 Execution Engines to close at shutdown
server *rpcServer // RPC server hosting the rollup-node API
host host.Host // p2p host (optional, may be nil)
dv5Local *enode.LocalNode // p2p discovery identity (optional, may be nil)
dv5Udp *discover.UDPv5 // p2p discovery service (optional, may be nil)
gs *pubsub.PubSub // p2p gossip router (optional, may be nil)
gsOut p2p.GossipOut // p2p gossip application interface for publishing (optional, may be nil)
p2pNode p2p.Node // P2P node functionality
p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer
tracer Tracer // tracer to get events for testing/debugging

Expand Down Expand Up @@ -109,15 +100,16 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initL2(ctx, cfg, snapshotLog); err != nil {
return err
}
if err := n.initRPCServer(ctx, cfg); err != nil {
return err
}
if err := n.initP2PSigner(ctx, cfg); err != nil {
return err
}
if err := n.initP2P(ctx, cfg); err != nil {
return err
}
// Only expose the server at the end, ensuring all RPC backend components are initialized.
if err := n.initRPCServer(ctx, cfg); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -212,6 +204,9 @@ func (n *OpNode) initRPCServer(ctx context.Context, cfg *Config) error {
if err != nil {
return err
}
if n.p2pNode != nil {
n.server.EnableP2P(p2p.NewP2PAPIBackend(n.p2pNode, n.log))
}
n.log.Info("Starting JSON-RPC server")
if err := n.server.Start(); err != nil {
return fmt.Errorf("unable to start RPC server: %w", err)
Expand All @@ -220,34 +215,12 @@ func (n *OpNode) initRPCServer(ctx context.Context, cfg *Config) error {
}

func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error {
// the p2p setup is optional
if cfg.P2P == nil {
return nil
}
var err error
// All nil if disabled.
n.dv5Local, n.dv5Udp, err = cfg.P2P.Discovery(n.log.New("p2p", "discv5"))
if err != nil {
return fmt.Errorf("failed to start discv5: %v", err)
}

// nil if disabled.
n.host, err = cfg.P2P.Host()
if err != nil {
return fmt.Errorf("failed to start p2p host: %v", err)
}

if n.host != nil {
n.gs, err = p2p.NewGossipSub(n.resourcesCtx, n.host, &cfg.Rollup)
if cfg.P2P != nil {
p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n)
if err != nil {
return fmt.Errorf("failed to start gossipsub router: %v", err)
}

n.gsOut, err = p2p.JoinGossip(n.resourcesCtx, n.host.ID(), n.gs, n.log, &cfg.Rollup, n)
if err != nil {
return fmt.Errorf("failed to join blocks gossip topic: %v", err)
return err
}
n.log.Info("started p2p host", "addrs", n.host.Addrs(), "peerID", n.host.ID().Pretty())
n.p2pNode = p2pNode
}
return nil
}
Expand Down Expand Up @@ -302,12 +275,12 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, payload *l2.ExecutionPayl
n.tracer.OnPublishL2Payload(ctx, payload)

// publish to p2p, if we are running p2p at all
if n.gsOut != nil {
if n.p2pNode != nil {
if n.p2pSigner == nil {
return fmt.Errorf("node has no p2p signer, payload %s cannot be published", payload.ID())
}
n.log.Info("Publishing signed execution payload on p2p", "id", payload.ID())
return n.gsOut.PublishL2Payload(ctx, payload, n.p2pSigner)
return n.p2pNode.GossipOut().PublishL2Payload(ctx, payload, n.p2pSigner)
}
// if p2p is not enabled then we just don't publish the payload
return nil
Expand All @@ -318,7 +291,7 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *l
defer n.l2Lock.Unlock()

// ignore if it's from ourselves
if from == n.host.ID() {
if n.p2pNode != nil && from == n.p2pNode.Host().ID() {
return nil
}

Expand All @@ -339,31 +312,28 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *l
return nil
}

func (n *OpNode) P2P() p2p.Node {
return n.p2pNode
}

// Close closes all resources.
func (n *OpNode) Close() error {
var result *multierror.Error

if n.server != nil {
n.server.Stop()
}
if n.dv5Udp != nil {
n.dv5Udp.Close()
}
if n.gsOut != nil {
if err := n.gsOut.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close gossip cleanly: %v", err))
if n.p2pNode != nil {
if err := n.p2pNode.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close p2p node: %v", err))
}
}
if n.p2pSigner != nil {
if err := n.p2pSigner.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close p2p signer: %v", err))
}
}
if n.host != nil {
if err := n.host.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close p2p host cleanly: %v", err))
}
}

if n.resourcesClose != nil {
n.resourcesClose()
}
Expand Down
29 changes: 20 additions & 9 deletions opnode/node/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"net"
"net/http"

"github.com/ethereum-optimism/optimistic-specs/opnode/p2p"

"github.com/ethereum-optimism/optimistic-specs/opnode/l2"
"github.com/ethereum-optimism/optimistic-specs/opnode/rollup"

Expand All @@ -19,7 +21,7 @@ import (

type rpcServer struct {
endpoint string
api *nodeAPI
apis []rpc.API
httpServer *http.Server
appVersion string
listenAddr net.Addr
Expand All @@ -32,23 +34,32 @@ func newRPCServer(ctx context.Context, rpcCfg *RPCConfig, rollupCfg *rollup.Conf
// TODO: extend RPC config with options for WS, IPC and HTTP RPC connections
endpoint := fmt.Sprintf("%s:%d", rpcCfg.ListenAddr, rpcCfg.ListenPort)
r := &rpcServer{
endpoint: endpoint,
api: api,
endpoint: endpoint,
apis: []rpc.API{{
Namespace: "optimism",
Service: api,
Public: true,
Authenticated: false,
}},
appVersion: appVersion,
log: log,
}
return r, nil
}

func (s *rpcServer) Start() error {
apis := []rpc.API{{
Namespace: "optimism",
Service: s.api,
func (s *rpcServer) EnableP2P(backend *p2p.APIBackend) {
s.apis = append(s.apis, rpc.API{
Namespace: p2p.NamespaceRPC,
Version: "",
Service: backend,
Public: true,
Authenticated: false,
}}
})
}

func (s *rpcServer) Start() error {
srv := rpc.NewServer()
if err := node.RegisterApis(apis, nil, srv, true); err != nil {
if err := node.RegisterApis(s.apis, nil, srv, true); err != nil {
return err
}

Expand Down
26 changes: 25 additions & 1 deletion opnode/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"strings"
"time"

"github.com/libp2p/go-libp2p-core/peer"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/libp2p/go-libp2p-core/host"
Expand Down Expand Up @@ -40,7 +42,7 @@ import (
type SetupP2P interface {
Check() error
// Host creates a libp2p host service. Returns nil, nil if p2p is disabled.
Host() (host.Host, error)
Host(log log.Logger) (host.Host, error)
// Discovery creates a disc-v5 service. Returns nil, nil, nil if discovery is disabled.
Discovery(log log.Logger) (*enode.LocalNode, *discover.UDPv5, error)
}
Expand Down Expand Up @@ -93,6 +95,28 @@ type Config struct {
BandwidthMetrics metrics.Reporter
}

type ConnectionGater interface {
connmgr.ConnectionGater

// BlockPeer adds a peer to the set of blocked peers.
// Note: active connections to the peer are not automatically closed.
BlockPeer(p peer.ID) error
UnblockPeer(p peer.ID) error
ListBlockedPeers() []peer.ID

// BlockAddr adds an IP address to the set of blocked addresses.
// Note: active connections to the IP address are not automatically closed.
BlockAddr(ip net.IP) error
UnblockAddr(ip net.IP) error
ListBlockedAddrs() []net.IP

// BlockSubnet adds an IP subnet to the set of blocked addresses.
// Note: active connections to the IP subnet are not automatically closed.
BlockSubnet(ipnet *net.IPNet) error
UnblockSubnet(ipnet *net.IPNet) error
ListBlockedSubnets() []*net.IPNet
}

func DefaultConnGater(conf *Config) (connmgr.ConnectionGater, error) {
return conngater.NewBasicConnectionGater(conf.Store)
}
Expand Down
6 changes: 5 additions & 1 deletion opnode/p2p/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,13 @@ type GossipIn interface {
OnUnsafeL2Payload(ctx context.Context, from peer.ID, msg *l2.ExecutionPayload) error
}

type GossipTopicInfo interface {
BlocksTopicPeers() []peer.ID
}

type GossipOut interface {
GossipTopicInfo
PublishL2Payload(ctx context.Context, msg *l2.ExecutionPayload, signer Signer) error
BlocksTopicPeers() []peer.ID
Close() error
}

Expand Down
57 changes: 55 additions & 2 deletions opnode/p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"net"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/libp2p/go-libp2p-core/connmgr"

"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -17,7 +20,29 @@ import (
madns "github.com/multiformats/go-multiaddr-dns"
)

func (conf *Config) Host() (host.Host, error) {
type ExtraHostFeatures interface {
host.Host
ConnectionGater() ConnectionGater
ConnectionManager() connmgr.ConnManager
}

type extraHost struct {
host.Host
gater ConnectionGater
connMgr connmgr.ConnManager
}

func (e *extraHost) ConnectionGater() ConnectionGater {
return e.gater
}

func (e *extraHost) ConnectionManager() connmgr.ConnManager {
return e.connMgr
}

var _ ExtraHostFeatures = (*extraHost)(nil)

func (conf *Config) Host(log log.Logger) (host.Host, error) {
if conf.DisableP2P {
return nil, nil
}
Expand Down Expand Up @@ -114,7 +139,35 @@ func (conf *Config) Host() (host.Host, error) {
EnableHolePunching: false,
HolePunchingOptions: nil,
}
return p2pConf.NewNode()
h, err := p2pConf.NewNode()
if err != nil {
return nil, err
}
for _, peerAddr := range conf.StaticPeers {
addr, err := peer.AddrInfoFromP2pAddr(peerAddr)
if err != nil {
return nil, fmt.Errorf("bad peer address: %v", err)
}
h.Peerstore().AddAddrs(addr.ID, addr.Addrs, time.Hour*24*7)
// We protect the peer, so the connection manager doesn't decide to prune it.
// We tag it with "static" so other protects/unprotects with different tags don't affect this protection.
connMngr.Protect(addr.ID, "static")
// Try to dial the node in the background
go func() {
log.Info("Dialing static peer", "peer", addr.ID, "addrs", addr.Addrs)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
if _, err := h.Network().DialPeer(ctx, addr.ID); err != nil {
log.Warn("Failed to dial static peer", "peer", addr.ID, "addrs", addr.Addrs)
}
}()
}
out := &extraHost{Host: h, connMgr: connMngr}
// Only add the connection gater if it offers the full interface we're looking for.
if g, ok := connGtr.(ConnectionGater); ok {
out.gater = g
}
return out, nil
}

// Creates a multi-addr to bind to. Does not contain a PeerID component (required for usage by external peers)
Expand Down
Loading

0 comments on commit 4170fcb

Please sign in to comment.