diff --git a/.gitignore b/.gitignore index e6e63d66..f0c4a045 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ /tx-monitor /block-fetch /peer-sharing +/tx-submission # Test binary, built with `go test -c` *.test diff --git a/cmd/tx-submission/main.go b/cmd/tx-submission/main.go new file mode 100644 index 00000000..cd04a753 --- /dev/null +++ b/cmd/tx-submission/main.go @@ -0,0 +1,79 @@ +// Copyright 2023 Blink Labs, LLC. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "os" + + ouroboros "github.com/blinklabs-io/gouroboros" + "github.com/blinklabs-io/gouroboros/cmd/common" + "github.com/blinklabs-io/gouroboros/protocol/txsubmission" +) + +type txSubmissionFlags struct { + *common.GlobalFlags +} + +func main() { + // Parse commandline + f := txSubmissionFlags{ + GlobalFlags: common.NewGlobalFlags(), + } + f.Parse() + // Create connection + conn := common.CreateClientConnection(f.GlobalFlags) + errorChan := make(chan error) + go func() { + for { + err := <-errorChan + fmt.Printf("ERROR(async): %s\n", err) + os.Exit(1) + } + }() + o, err := ouroboros.New( + ouroboros.WithConnection(conn), + ouroboros.WithNetworkMagic(uint32(f.NetworkMagic)), + ouroboros.WithErrorChan(errorChan), + ouroboros.WithNodeToNode(f.NtnProto), + ouroboros.WithKeepAlive(true), + ouroboros.WithTxSubmissionConfig( + txsubmission.NewConfig( + txsubmission.WithRequestTxIdsFunc( + // TODO: do something more useful + func(blocking bool, ack uint16, req uint16) ([]txsubmission.TxIdAndSize, error) { + return []txsubmission.TxIdAndSize{}, nil + }, + ), + txsubmission.WithRequestTxsFunc( + // TODO: do something more useful + func(txIds []txsubmission.TxId) ([]txsubmission.TxBody, error) { + return []txsubmission.TxBody{}, nil + }, + ), + ), + ), + ) + if err != nil { + fmt.Printf("ERROR: %s\n", err) + os.Exit(1) + } + + // Start the TxSubmission activity loop + o.TxSubmission().Client.Init() + + // Wait forever + select {} +} diff --git a/protocol/txsubmission/client.go b/protocol/txsubmission/client.go index 3c217b4d..87550276 100644 --- a/protocol/txsubmission/client.go +++ b/protocol/txsubmission/client.go @@ -16,12 +16,15 @@ package txsubmission import ( "fmt" + "sync" + "github.com/blinklabs-io/gouroboros/protocol" ) type Client struct { *protocol.Protocol - config *Config + config *Config + onceInit sync.Once } func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { @@ -34,14 +37,14 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { } // Update state map with timeout stateMap := StateMap.Copy() - if entry, ok := stateMap[STATE_IDLE]; ok { + if entry, ok := stateMap[stateIdle]; ok { entry.Timeout = c.config.IdleTimeout - stateMap[STATE_IDLE] = entry + stateMap[stateIdle] = entry } // Configure underlying Protocol protoConfig := protocol.ProtocolConfig{ - Name: PROTOCOL_NAME, - ProtocolId: PROTOCOL_ID, + Name: ProtocolName, + ProtocolId: ProtocolId, Muxer: protoOptions.Muxer, ErrorChan: protoOptions.ErrorChan, Mode: protoOptions.Mode, @@ -49,21 +52,30 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { MessageHandlerFunc: c.messageHandler, MessageFromCborFunc: NewMsgFromCbor, StateMap: stateMap, - InitialState: STATE_INIT, + InitialState: stateInit, } c.Protocol = protocol.New(protoConfig) return c } +// Init tells the server to begin asking us for transactions +func (c *Client) Init() { + c.onceInit.Do(func() { + // Send our Init message + msg := NewMsgInit() + _ = c.SendMessage(msg) + }) +} + func (c *Client) messageHandler(msg protocol.Message, isResponse bool) error { var err error switch msg.Type() { - case MESSAGE_TYPE_REQUEST_TX_IDS: + case MessageTypeRequestTxIds: err = c.handleRequestTxIds(msg) - case MESSAGE_TYPE_REQUEST_TXS: + case MessageTypeRequestTxs: err = c.handleRequestTxs(msg) default: - err = fmt.Errorf("%s: received unexpected message type %d", PROTOCOL_NAME, msg.Type()) + err = fmt.Errorf("%s: received unexpected message type %d", ProtocolName, msg.Type()) } return err } @@ -74,7 +86,15 @@ func (c *Client) handleRequestTxIds(msg protocol.Message) error { } msgRequestTxIds := msg.(*MsgRequestTxIds) // Call the user callback function - return c.config.RequestTxIdsFunc(msgRequestTxIds.Blocking, msgRequestTxIds.Ack, msgRequestTxIds.Req) + txIds, err := c.config.RequestTxIdsFunc(msgRequestTxIds.Blocking, msgRequestTxIds.Ack, msgRequestTxIds.Req) + if err != nil { + return err + } + resp := NewMsgReplyTxIds(txIds) + if err := c.SendMessage(resp); err != nil { + return err + } + return nil } func (c *Client) handleRequestTxs(msg protocol.Message) error { @@ -83,5 +103,13 @@ func (c *Client) handleRequestTxs(msg protocol.Message) error { } msgRequestTxs := msg.(*MsgRequestTxs) // Call the user callback function - return c.config.RequestTxsFunc(msgRequestTxs.TxIds) + txs, err := c.config.RequestTxsFunc(msgRequestTxs.TxIds) + if err != nil { + return err + } + resp := NewMsgReplyTxs(txs) + if err := c.SendMessage(resp); err != nil { + return err + } + return nil } diff --git a/protocol/txsubmission/messages.go b/protocol/txsubmission/messages.go index 7d9291a7..17cb29f9 100644 --- a/protocol/txsubmission/messages.go +++ b/protocol/txsubmission/messages.go @@ -16,37 +16,38 @@ package txsubmission import ( "fmt" + "github.com/blinklabs-io/gouroboros/cbor" "github.com/blinklabs-io/gouroboros/protocol" ) const ( - MESSAGE_TYPE_REQUEST_TX_IDS = 0 - MESSAGE_TYPE_REPLY_TX_IDS = 1 - MESSAGE_TYPE_REQUEST_TXS = 2 - MESSAGE_TYPE_REPLY_TXS = 3 - MESSAGE_TYPE_DONE = 4 - MESSAGE_TYPE_INIT = 6 + MessageTypeRequestTxIds = 0 + MessageTypeReplyTxIds = 1 + MessageTypeRequestTxs = 2 + MessageTypeReplyTxs = 3 + MessageTypeDone = 4 + MessageTypeInit = 6 ) func NewMsgFromCbor(msgType uint, data []byte) (protocol.Message, error) { var ret protocol.Message switch msgType { - case MESSAGE_TYPE_REQUEST_TX_IDS: + case MessageTypeRequestTxIds: ret = &MsgRequestTxIds{} - case MESSAGE_TYPE_REPLY_TX_IDS: + case MessageTypeReplyTxIds: ret = &MsgReplyTxIds{} - case MESSAGE_TYPE_REQUEST_TXS: + case MessageTypeRequestTxs: ret = &MsgRequestTxs{} - case MESSAGE_TYPE_REPLY_TXS: + case MessageTypeReplyTxs: ret = &MsgReplyTxs{} - case MESSAGE_TYPE_DONE: + case MessageTypeDone: ret = &MsgDone{} - case MESSAGE_TYPE_INIT: + case MessageTypeInit: ret = &MsgInit{} } if _, err := cbor.Decode(data, ret); err != nil { - return nil, fmt.Errorf("%s: decode error: %s", PROTOCOL_NAME, err) + return nil, fmt.Errorf("%s: decode error: %s", ProtocolName, err) } if ret != nil { // Store the raw message CBOR @@ -65,7 +66,7 @@ type MsgRequestTxIds struct { func NewMsgRequestTxIds(blocking bool, ack uint16, req uint16) *MsgRequestTxIds { m := &MsgRequestTxIds{ MessageBase: protocol.MessageBase{ - MessageType: MESSAGE_TYPE_REQUEST_TX_IDS, + MessageType: MessageTypeRequestTxIds, }, Blocking: blocking, Ack: ack, @@ -82,7 +83,7 @@ type MsgReplyTxIds struct { func NewMsgReplyTxIds(txIds []TxIdAndSize) *MsgReplyTxIds { m := &MsgReplyTxIds{ MessageBase: protocol.MessageBase{ - MessageType: MESSAGE_TYPE_REPLY_TX_IDS, + MessageType: MessageTypeReplyTxIds, }, TxIds: txIds, } @@ -97,7 +98,7 @@ type MsgRequestTxs struct { func NewMsgRequestTxs(txIds []TxId) *MsgRequestTxs { m := &MsgRequestTxs{ MessageBase: protocol.MessageBase{ - MessageType: MESSAGE_TYPE_REQUEST_TXS, + MessageType: MessageTypeRequestTxs, }, TxIds: txIds, } @@ -112,7 +113,7 @@ type MsgReplyTxs struct { func NewMsgReplyTxs(txs []TxBody) *MsgReplyTxs { m := &MsgReplyTxs{ MessageBase: protocol.MessageBase{ - MessageType: MESSAGE_TYPE_REPLY_TXS, + MessageType: MessageTypeReplyTxs, }, Txs: txs, } @@ -126,7 +127,7 @@ type MsgDone struct { func NewMsgDone() *MsgDone { m := &MsgDone{ MessageBase: protocol.MessageBase{ - MessageType: MESSAGE_TYPE_DONE, + MessageType: MessageTypeDone, }, } return m @@ -139,7 +140,7 @@ type MsgInit struct { func NewMsgInit() *MsgInit { m := &MsgInit{ MessageBase: protocol.MessageBase{ - MessageType: MESSAGE_TYPE_INIT, + MessageType: MessageTypeInit, }, } return m diff --git a/protocol/txsubmission/messages_test.go b/protocol/txsubmission/messages_test.go index 77d3515f..eadd81f0 100644 --- a/protocol/txsubmission/messages_test.go +++ b/protocol/txsubmission/messages_test.go @@ -16,10 +16,11 @@ package txsubmission import ( "encoding/hex" - "github.com/blinklabs-io/gouroboros/cbor" - "github.com/blinklabs-io/gouroboros/protocol" "reflect" "testing" + + "github.com/blinklabs-io/gouroboros/cbor" + "github.com/blinklabs-io/gouroboros/protocol" ) type testDefinition struct { @@ -33,12 +34,12 @@ var tests = []testDefinition{ { CborHex: "8104", Message: NewMsgDone(), - MessageType: MESSAGE_TYPE_DONE, + MessageType: MessageTypeDone, }, { CborHex: "8106", Message: NewMsgInit(), - MessageType: MESSAGE_TYPE_INIT, + MessageType: MessageTypeInit, }, } diff --git a/protocol/txsubmission/server.go b/protocol/txsubmission/server.go index c44426fb..363526d2 100644 --- a/protocol/txsubmission/server.go +++ b/protocol/txsubmission/server.go @@ -16,6 +16,7 @@ package txsubmission import ( "fmt" + "github.com/blinklabs-io/gouroboros/protocol" ) @@ -29,8 +30,8 @@ func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server { config: cfg, } protoConfig := protocol.ProtocolConfig{ - Name: PROTOCOL_NAME, - ProtocolId: PROTOCOL_ID, + Name: ProtocolName, + ProtocolId: ProtocolId, Muxer: protoOptions.Muxer, ErrorChan: protoOptions.ErrorChan, Mode: protoOptions.Mode, @@ -38,7 +39,7 @@ func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server { MessageHandlerFunc: s.messageHandler, MessageFromCborFunc: NewMsgFromCbor, StateMap: StateMap, - InitialState: STATE_INIT, + InitialState: stateInit, } s.Protocol = protocol.New(protoConfig) return s @@ -47,16 +48,16 @@ func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server { func (s *Server) messageHandler(msg protocol.Message, isResponse bool) error { var err error switch msg.Type() { - case MESSAGE_TYPE_REPLY_TX_IDS: + case MessageTypeReplyTxIds: err = s.handleReplyTxIds(msg) - case MESSAGE_TYPE_REPLY_TXS: + case MessageTypeReplyTxs: err = s.handleReplyTxs(msg) - case MESSAGE_TYPE_DONE: + case MessageTypeDone: err = s.handleDone() - case MESSAGE_TYPE_INIT: + case MessageTypeInit: err = s.handleInit() default: - err = fmt.Errorf("%s: received unexpected message type %d", PROTOCOL_NAME, msg.Type()) + err = fmt.Errorf("%s: received unexpected message type %d", ProtocolName, msg.Type()) } return err } diff --git a/protocol/txsubmission/txsubmission.go b/protocol/txsubmission/txsubmission.go index 92900476..3967ba19 100644 --- a/protocol/txsubmission/txsubmission.go +++ b/protocol/txsubmission/txsubmission.go @@ -21,35 +21,35 @@ import ( ) const ( - PROTOCOL_NAME = "tx-submission" - PROTOCOL_ID uint16 = 4 + ProtocolName = "tx-submission" + ProtocolId uint16 = 4 ) var ( - STATE_INIT = protocol.NewState(1, "Init") - STATE_IDLE = protocol.NewState(2, "Idle") - STATE_TX_IDS_BLOCKING = protocol.NewState(3, "TxIdsBlocking") - STATE_TX_IDS_NONBLOCKING = protocol.NewState(4, "TxIdsNonBlocking") - STATE_TXS = protocol.NewState(5, "Txs") - STATE_DONE = protocol.NewState(6, "Done") + stateInit = protocol.NewState(1, "Init") + stateIdle = protocol.NewState(2, "Idle") + stateTxIdsBlocking = protocol.NewState(3, "TxIdsBlocking") + stateTxIdsNonblocking = protocol.NewState(4, "TxIdsNonBlocking") + stateTxs = protocol.NewState(5, "Txs") + stateDone = protocol.NewState(6, "Done") ) var StateMap = protocol.StateMap{ - STATE_INIT: protocol.StateMapEntry{ + stateInit: protocol.StateMapEntry{ Agency: protocol.AgencyClient, Transitions: []protocol.StateTransition{ { - MsgType: MESSAGE_TYPE_INIT, - NewState: STATE_IDLE, + MsgType: MessageTypeInit, + NewState: stateIdle, }, }, }, - STATE_IDLE: protocol.StateMapEntry{ + stateIdle: protocol.StateMapEntry{ Agency: protocol.AgencyServer, Transitions: []protocol.StateTransition{ { - MsgType: MESSAGE_TYPE_REQUEST_TX_IDS, - NewState: STATE_TX_IDS_BLOCKING, + MsgType: MessageTypeRequestTxIds, + NewState: stateTxIdsBlocking, // Match if blocking MatchFunc: func(msg protocol.Message) bool { msgRequestTxIds := msg.(*MsgRequestTxIds) @@ -57,8 +57,8 @@ var StateMap = protocol.StateMap{ }, }, { - MsgType: MESSAGE_TYPE_REQUEST_TX_IDS, - NewState: STATE_TX_IDS_NONBLOCKING, + MsgType: MessageTypeRequestTxIds, + NewState: stateTxIdsNonblocking, // Metch if non-blocking MatchFunc: func(msg protocol.Message) bool { msgRequestTxIds := msg.(*MsgRequestTxIds) @@ -66,43 +66,43 @@ var StateMap = protocol.StateMap{ }, }, { - MsgType: MESSAGE_TYPE_REQUEST_TXS, - NewState: STATE_TXS, + MsgType: MessageTypeRequestTxs, + NewState: stateTxs, }, }, }, - STATE_TX_IDS_BLOCKING: protocol.StateMapEntry{ + stateTxIdsBlocking: protocol.StateMapEntry{ Agency: protocol.AgencyClient, Transitions: []protocol.StateTransition{ { - MsgType: MESSAGE_TYPE_REPLY_TX_IDS, - NewState: STATE_IDLE, + MsgType: MessageTypeReplyTxIds, + NewState: stateIdle, }, { - MsgType: MESSAGE_TYPE_DONE, - NewState: STATE_DONE, + MsgType: MessageTypeDone, + NewState: stateDone, }, }, }, - STATE_TX_IDS_NONBLOCKING: protocol.StateMapEntry{ + stateTxIdsNonblocking: protocol.StateMapEntry{ Agency: protocol.AgencyClient, Transitions: []protocol.StateTransition{ { - MsgType: MESSAGE_TYPE_REPLY_TX_IDS, - NewState: STATE_IDLE, + MsgType: MessageTypeReplyTxIds, + NewState: stateIdle, }, }, }, - STATE_TXS: protocol.StateMapEntry{ + stateTxs: protocol.StateMapEntry{ Agency: protocol.AgencyClient, Transitions: []protocol.StateTransition{ { - MsgType: MESSAGE_TYPE_REPLY_TXS, - NewState: STATE_IDLE, + MsgType: MessageTypeReplyTxs, + NewState: stateIdle, }, }, }, - STATE_DONE: protocol.StateMapEntry{ + stateDone: protocol.StateMapEntry{ Agency: protocol.AgencyNone, }, } @@ -123,9 +123,9 @@ type Config struct { } // Callback function types -type RequestTxIdsFunc func(bool, uint16, uint16) error +type RequestTxIdsFunc func(bool, uint16, uint16) ([]TxIdAndSize, error) type ReplyTxIdsFunc func(interface{}) error -type RequestTxsFunc func(interface{}) error +type RequestTxsFunc func([]TxId) ([]TxBody, error) type ReplyTxsFunc func(interface{}) error type DoneFunc func() error type InitFunc func() error