diff --git a/.gitignore b/.gitignore index e6e63d66..f0c4a045 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ /tx-monitor /block-fetch /peer-sharing +/tx-submission # Test binary, built with `go test -c` *.test diff --git a/cmd/tx-submission/main.go b/cmd/tx-submission/main.go new file mode 100644 index 00000000..8c8db01e --- /dev/null +++ b/cmd/tx-submission/main.go @@ -0,0 +1,149 @@ +// Copyright 2023 Blink Labs, LLC. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "os" + + ouroboros "github.com/blinklabs-io/gouroboros" + "github.com/blinklabs-io/gouroboros/cmd/common" + "github.com/blinklabs-io/gouroboros/protocol/txsubmission" +) + +type txSubmissionFlags struct { + *common.GlobalFlags + /* + slot uint64 + hash string + all bool + */ +} + +func main() { + // Parse commandline + f := txSubmissionFlags{ + GlobalFlags: common.NewGlobalFlags(), + } + /* + f.Flagset.Uint64Var(&f.slot, "slot", 0, "slot for single block to fetch") + f.Flagset.StringVar(&f.hash, "hash", "", "hash for single block to fetch") + f.Flagset.BoolVar(&f.all, "all", false, "show all available detail for block") + */ + f.Parse() + // Create connection + conn := common.CreateClientConnection(f.GlobalFlags) + errorChan := make(chan error) + go func() { + for { + err := <-errorChan + fmt.Printf("ERROR(async): %s\n", err) + os.Exit(1) + } + }() + o, err := ouroboros.New( + ouroboros.WithConnection(conn), + ouroboros.WithNetworkMagic(uint32(f.NetworkMagic)), + ouroboros.WithErrorChan(errorChan), + ouroboros.WithNodeToNode(f.NtnProto), + ouroboros.WithKeepAlive(true), + ouroboros.WithTxSubmissionConfig( + txsubmission.NewConfig( + txsubmission.WithRequestTxIdsFunc( + // TODO + func(blocking bool, ack uint16, req uint16) ([]txsubmission.TxIdAndSize, error) { + return []txsubmission.TxIdAndSize{}, nil + }, + ), + ), + ), + ) + if err != nil { + fmt.Printf("ERROR: %s\n", err) + os.Exit(1) + } + + // Start the TxSubmission activity loop + o.TxSubmission().Client.Init() + + // Wait forever + select {} + + /* + blockHash, err := hex.DecodeString(f.hash) + if err != nil { + fmt.Printf("ERROR: failed to decode block hash: %s\n", err) + os.Exit(1) + } + block, err := o.BlockFetch().Client.GetBlock(ocommon.NewPoint(f.slot, blockHash)) + if err != nil { + fmt.Printf("ERROR: failed to fetch block: %s\n", err) + os.Exit(1) + } + + // Display block info + switch v := block.(type) { + case *ledger.ByronEpochBoundaryBlock: + fmt.Printf("era = Byron (EBB), epoch = %d, id = %s\n", v.Header.ConsensusData.Epoch, v.Hash()) + case *ledger.ByronMainBlock: + fmt.Printf("era = Byron, epoch = %d, slot = %d, id = %s\n", v.Header.ConsensusData.SlotId.Epoch, v.SlotNumber(), v.Hash()) + case ledger.Block: + fmt.Printf("era = %s, slot = %d, block_no = %d, id = %s\n", v.Era().Name, v.SlotNumber(), v.BlockNumber(), v.Hash()) + } + if f.all { + // Display transaction info + fmt.Printf("\nTransactions:\n") + for _, tx := range block.Transactions() { + fmt.Printf(" Hash: %s\n", tx.Hash()) + if tx.Metadata().Value() != nil { + fmt.Printf(" Metadata:\n %#v (%x)\n", tx.Metadata().Value(), tx.Metadata().Cbor()) + } + fmt.Printf(" Inputs:\n") + for _, input := range tx.Inputs() { + fmt.Printf(" Id: %s\n", input.Id()) + fmt.Printf(" Index: %d\n", input.Index()) + fmt.Println("") + } + fmt.Printf(" Outputs:\n") + for _, output := range tx.Outputs() { + fmt.Printf(" Address: %s\n", output.Address()) + fmt.Printf(" Amount: %d\n", output.Amount()) + assets := output.Assets() + if assets != nil { + fmt.Printf(" Assets:\n") + for _, policyId := range assets.Policies() { + fmt.Printf(" Policy Id: %s\n", policyId) + fmt.Printf(" Policy assets:\n") + for _, assetName := range assets.Assets(policyId) { + fmt.Printf(" Asset name: %s\n", assetName) + fmt.Printf(" Amount: %d\n", assets.Asset(policyId, assetName)) + fmt.Println("") + } + } + } + if output.Datum() != nil { + jsonData, err := json.Marshal(output.Datum()) + if err != nil { + fmt.Printf(" Datum (hex): %x\n", output.Datum().Cbor()) + } else { + fmt.Printf(" Datum: %s\n", jsonData) + } + } + fmt.Println("") + } + } + } + */ +} diff --git a/protocol/txsubmission/client.go b/protocol/txsubmission/client.go index 3c217b4d..42a9f863 100644 --- a/protocol/txsubmission/client.go +++ b/protocol/txsubmission/client.go @@ -16,12 +16,15 @@ package txsubmission import ( "fmt" + "sync" + "github.com/blinklabs-io/gouroboros/protocol" ) type Client struct { *protocol.Protocol - config *Config + config *Config + onceInit sync.Once } func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { @@ -34,14 +37,14 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { } // Update state map with timeout stateMap := StateMap.Copy() - if entry, ok := stateMap[STATE_IDLE]; ok { + if entry, ok := stateMap[stateIdle]; ok { entry.Timeout = c.config.IdleTimeout - stateMap[STATE_IDLE] = entry + stateMap[stateIdle] = entry } // Configure underlying Protocol protoConfig := protocol.ProtocolConfig{ - Name: PROTOCOL_NAME, - ProtocolId: PROTOCOL_ID, + Name: ProtocolName, + ProtocolId: ProtocolId, Muxer: protoOptions.Muxer, ErrorChan: protoOptions.ErrorChan, Mode: protoOptions.Mode, @@ -49,35 +52,54 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client { MessageHandlerFunc: c.messageHandler, MessageFromCborFunc: NewMsgFromCbor, StateMap: stateMap, - InitialState: STATE_INIT, + InitialState: stateInit, } c.Protocol = protocol.New(protoConfig) return c } +// Init tells the server to begin asking us for transactions +func (c *Client) Init() { + c.onceInit.Do(func() { + // Send our Init message + msg := NewMsgInit() + _ = c.SendMessage(msg) + }) +} + func (c *Client) messageHandler(msg protocol.Message, isResponse bool) error { var err error switch msg.Type() { - case MESSAGE_TYPE_REQUEST_TX_IDS: + case MessageTypeRequestTxIds: err = c.handleRequestTxIds(msg) - case MESSAGE_TYPE_REQUEST_TXS: + case MessageTypeRequestTxs: err = c.handleRequestTxs(msg) default: - err = fmt.Errorf("%s: received unexpected message type %d", PROTOCOL_NAME, msg.Type()) + err = fmt.Errorf("%s: received unexpected message type %d", ProtocolName, msg.Type()) } return err } func (c *Client) handleRequestTxIds(msg protocol.Message) error { + fmt.Printf("msg = %#v\n", msg) if c.config.RequestTxIdsFunc == nil { return fmt.Errorf("received tx-submission RequestTxIds message but no callback function is defined") } msgRequestTxIds := msg.(*MsgRequestTxIds) // Call the user callback function - return c.config.RequestTxIdsFunc(msgRequestTxIds.Blocking, msgRequestTxIds.Ack, msgRequestTxIds.Req) + txIds, err := c.config.RequestTxIdsFunc(msgRequestTxIds.Blocking, msgRequestTxIds.Ack, msgRequestTxIds.Req) + if err != nil { + return err + } + resp := NewMsgReplyTxIds(txIds) + if err := c.SendMessage(resp); err != nil { + return err + } + return nil } func (c *Client) handleRequestTxs(msg protocol.Message) error { + fmt.Printf("msg = %#v\n", msg) if c.config.RequestTxsFunc == nil { return fmt.Errorf("received tx-submission RequestTxs message but no callback function is defined") } diff --git a/protocol/txsubmission/messages.go b/protocol/txsubmission/messages.go index 7d9291a7..17cb29f9 100644 --- a/protocol/txsubmission/messages.go +++ b/protocol/txsubmission/messages.go @@ -16,37 +16,38 @@ package txsubmission import ( "fmt" + "github.com/blinklabs-io/gouroboros/cbor" "github.com/blinklabs-io/gouroboros/protocol" ) const ( - MESSAGE_TYPE_REQUEST_TX_IDS = 0 - MESSAGE_TYPE_REPLY_TX_IDS = 1 - MESSAGE_TYPE_REQUEST_TXS = 2 - MESSAGE_TYPE_REPLY_TXS = 3 - MESSAGE_TYPE_DONE = 4 - MESSAGE_TYPE_INIT = 6 + MessageTypeRequestTxIds = 0 + MessageTypeReplyTxIds = 1 + MessageTypeRequestTxs = 2 + MessageTypeReplyTxs = 3 + MessageTypeDone = 4 + MessageTypeInit = 6 ) func NewMsgFromCbor(msgType uint, data []byte) (protocol.Message, error) { var ret protocol.Message switch msgType { - case MESSAGE_TYPE_REQUEST_TX_IDS: + case MessageTypeRequestTxIds: ret = &MsgRequestTxIds{} - case MESSAGE_TYPE_REPLY_TX_IDS: + case MessageTypeReplyTxIds: ret = &MsgReplyTxIds{} - case MESSAGE_TYPE_REQUEST_TXS: + case MessageTypeRequestTxs: ret = &MsgRequestTxs{} - case MESSAGE_TYPE_REPLY_TXS: + case MessageTypeReplyTxs: ret = &MsgReplyTxs{} - case MESSAGE_TYPE_DONE: + case MessageTypeDone: ret = &MsgDone{} - case MESSAGE_TYPE_INIT: + case MessageTypeInit: ret = &MsgInit{} } if _, err := cbor.Decode(data, ret); err != nil { - return nil, fmt.Errorf("%s: decode error: %s", PROTOCOL_NAME, err) + return nil, fmt.Errorf("%s: decode error: %s", ProtocolName, err) } if ret != nil { // Store the raw message CBOR @@ -65,7 +66,7 @@ type MsgRequestTxIds struct { func NewMsgRequestTxIds(blocking bool, ack uint16, req uint16) *MsgRequestTxIds { m := &MsgRequestTxIds{ MessageBase: protocol.MessageBase{ - MessageType: MESSAGE_TYPE_REQUEST_TX_IDS, + MessageType: MessageTypeRequestTxIds, }, Blocking: blocking, Ack: ack, @@ -82,7 +83,7 @@ type MsgReplyTxIds struct { func NewMsgReplyTxIds(txIds []TxIdAndSize) *MsgReplyTxIds { m := &MsgReplyTxIds{ MessageBase: protocol.MessageBase{ - MessageType: MESSAGE_TYPE_REPLY_TX_IDS, + MessageType: MessageTypeReplyTxIds, }, TxIds: txIds, } @@ -97,7 +98,7 @@ type MsgRequestTxs struct { func NewMsgRequestTxs(txIds []TxId) *MsgRequestTxs { m := &MsgRequestTxs{ MessageBase: protocol.MessageBase{ - MessageType: MESSAGE_TYPE_REQUEST_TXS, + MessageType: MessageTypeRequestTxs, }, TxIds: txIds, } @@ -112,7 +113,7 @@ type MsgReplyTxs struct { func NewMsgReplyTxs(txs []TxBody) *MsgReplyTxs { m := &MsgReplyTxs{ MessageBase: protocol.MessageBase{ - MessageType: MESSAGE_TYPE_REPLY_TXS, + MessageType: MessageTypeReplyTxs, }, Txs: txs, } @@ -126,7 +127,7 @@ type MsgDone struct { func NewMsgDone() *MsgDone { m := &MsgDone{ MessageBase: protocol.MessageBase{ - MessageType: MESSAGE_TYPE_DONE, + MessageType: MessageTypeDone, }, } return m @@ -139,7 +140,7 @@ type MsgInit struct { func NewMsgInit() *MsgInit { m := &MsgInit{ MessageBase: protocol.MessageBase{ - MessageType: MESSAGE_TYPE_INIT, + MessageType: MessageTypeInit, }, } return m diff --git a/protocol/txsubmission/messages_test.go b/protocol/txsubmission/messages_test.go index 77d3515f..940fa8d3 100644 --- a/protocol/txsubmission/messages_test.go +++ b/protocol/txsubmission/messages_test.go @@ -16,10 +16,11 @@ package txsubmission import ( "encoding/hex" - "github.com/blinklabs-io/gouroboros/cbor" - "github.com/blinklabs-io/gouroboros/protocol" "reflect" "testing" + + "github.com/blinklabs-io/gouroboros/cbor" + "github.com/blinklabs-io/gouroboros/protocol" ) type testDefinition struct { @@ -33,12 +34,12 @@ var tests = []testDefinition{ { CborHex: "8104", Message: NewMsgDone(), - MessageType: MESSAGE_TYPE_DONE, + MessageType: MessageTypeDONE, }, { CborHex: "8106", Message: NewMsgInit(), - MessageType: MESSAGE_TYPE_INIT, + MessageType: MessageTypeINIT, }, } diff --git a/protocol/txsubmission/server.go b/protocol/txsubmission/server.go index c44426fb..363526d2 100644 --- a/protocol/txsubmission/server.go +++ b/protocol/txsubmission/server.go @@ -16,6 +16,7 @@ package txsubmission import ( "fmt" + "github.com/blinklabs-io/gouroboros/protocol" ) @@ -29,8 +30,8 @@ func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server { config: cfg, } protoConfig := protocol.ProtocolConfig{ - Name: PROTOCOL_NAME, - ProtocolId: PROTOCOL_ID, + Name: ProtocolName, + ProtocolId: ProtocolId, Muxer: protoOptions.Muxer, ErrorChan: protoOptions.ErrorChan, Mode: protoOptions.Mode, @@ -38,7 +39,7 @@ func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server { MessageHandlerFunc: s.messageHandler, MessageFromCborFunc: NewMsgFromCbor, StateMap: StateMap, - InitialState: STATE_INIT, + InitialState: stateInit, } s.Protocol = protocol.New(protoConfig) return s @@ -47,16 +48,16 @@ func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server { func (s *Server) messageHandler(msg protocol.Message, isResponse bool) error { var err error switch msg.Type() { - case MESSAGE_TYPE_REPLY_TX_IDS: + case MessageTypeReplyTxIds: err = s.handleReplyTxIds(msg) - case MESSAGE_TYPE_REPLY_TXS: + case MessageTypeReplyTxs: err = s.handleReplyTxs(msg) - case MESSAGE_TYPE_DONE: + case MessageTypeDone: err = s.handleDone() - case MESSAGE_TYPE_INIT: + case MessageTypeInit: err = s.handleInit() default: - err = fmt.Errorf("%s: received unexpected message type %d", PROTOCOL_NAME, msg.Type()) + err = fmt.Errorf("%s: received unexpected message type %d", ProtocolName, msg.Type()) } return err } diff --git a/protocol/txsubmission/txsubmission.go b/protocol/txsubmission/txsubmission.go index 92900476..dd2bb7d1 100644 --- a/protocol/txsubmission/txsubmission.go +++ b/protocol/txsubmission/txsubmission.go @@ -21,35 +21,35 @@ import ( ) const ( - PROTOCOL_NAME = "tx-submission" - PROTOCOL_ID uint16 = 4 + ProtocolName = "tx-submission" + ProtocolId uint16 = 4 ) var ( - STATE_INIT = protocol.NewState(1, "Init") - STATE_IDLE = protocol.NewState(2, "Idle") - STATE_TX_IDS_BLOCKING = protocol.NewState(3, "TxIdsBlocking") - STATE_TX_IDS_NONBLOCKING = protocol.NewState(4, "TxIdsNonBlocking") - STATE_TXS = protocol.NewState(5, "Txs") - STATE_DONE = protocol.NewState(6, "Done") + stateInit = protocol.NewState(1, "Init") + stateIdle = protocol.NewState(2, "Idle") + stateTxIdsBlocking = protocol.NewState(3, "TxIdsBlocking") + stateTxIdsNonblocking = protocol.NewState(4, "TxIdsNonBlocking") + stateTxs = protocol.NewState(5, "Txs") + stateDone = protocol.NewState(6, "Done") ) var StateMap = protocol.StateMap{ - STATE_INIT: protocol.StateMapEntry{ + stateInit: protocol.StateMapEntry{ Agency: protocol.AgencyClient, Transitions: []protocol.StateTransition{ { - MsgType: MESSAGE_TYPE_INIT, - NewState: STATE_IDLE, + MsgType: MessageTypeInit, + NewState: stateIdle, }, }, }, - STATE_IDLE: protocol.StateMapEntry{ + stateIdle: protocol.StateMapEntry{ Agency: protocol.AgencyServer, Transitions: []protocol.StateTransition{ { - MsgType: MESSAGE_TYPE_REQUEST_TX_IDS, - NewState: STATE_TX_IDS_BLOCKING, + MsgType: MessageTypeRequestTxIds, + NewState: stateTxIdsBlocking, // Match if blocking MatchFunc: func(msg protocol.Message) bool { msgRequestTxIds := msg.(*MsgRequestTxIds) @@ -57,8 +57,8 @@ var StateMap = protocol.StateMap{ }, }, { - MsgType: MESSAGE_TYPE_REQUEST_TX_IDS, - NewState: STATE_TX_IDS_NONBLOCKING, + MsgType: MessageTypeRequestTxIds, + NewState: stateTxIdsNonblocking, // Metch if non-blocking MatchFunc: func(msg protocol.Message) bool { msgRequestTxIds := msg.(*MsgRequestTxIds) @@ -66,43 +66,43 @@ var StateMap = protocol.StateMap{ }, }, { - MsgType: MESSAGE_TYPE_REQUEST_TXS, - NewState: STATE_TXS, + MsgType: MessageTypeRequestTxs, + NewState: stateTxs, }, }, }, - STATE_TX_IDS_BLOCKING: protocol.StateMapEntry{ + stateTxIdsBlocking: protocol.StateMapEntry{ Agency: protocol.AgencyClient, Transitions: []protocol.StateTransition{ { - MsgType: MESSAGE_TYPE_REPLY_TX_IDS, - NewState: STATE_IDLE, + MsgType: MessageTypeReplyTxIds, + NewState: stateIdle, }, { - MsgType: MESSAGE_TYPE_DONE, - NewState: STATE_DONE, + MsgType: MessageTypeDone, + NewState: stateDone, }, }, }, - STATE_TX_IDS_NONBLOCKING: protocol.StateMapEntry{ + stateTxIdsNonblocking: protocol.StateMapEntry{ Agency: protocol.AgencyClient, Transitions: []protocol.StateTransition{ { - MsgType: MESSAGE_TYPE_REPLY_TX_IDS, - NewState: STATE_IDLE, + MsgType: MessageTypeReplyTxIds, + NewState: stateIdle, }, }, }, - STATE_TXS: protocol.StateMapEntry{ + stateTxs: protocol.StateMapEntry{ Agency: protocol.AgencyClient, Transitions: []protocol.StateTransition{ { - MsgType: MESSAGE_TYPE_REPLY_TXS, - NewState: STATE_IDLE, + MsgType: MessageTypeReplyTxs, + NewState: stateIdle, }, }, }, - STATE_DONE: protocol.StateMapEntry{ + stateDone: protocol.StateMapEntry{ Agency: protocol.AgencyNone, }, } @@ -123,7 +123,7 @@ type Config struct { } // Callback function types -type RequestTxIdsFunc func(bool, uint16, uint16) error +type RequestTxIdsFunc func(bool, uint16, uint16) ([]TxIdAndSize, error) type ReplyTxIdsFunc func(interface{}) error type RequestTxsFunc func(interface{}) error type ReplyTxsFunc func(interface{}) error