Skip to content

Commit

Permalink
feat: finish basic tx-submission client implementation
Browse files Browse the repository at this point in the history
Fixes #54
  • Loading branch information
agaffney committed Aug 9, 2023
1 parent f46e609 commit 1bc1cd9
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 74 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
/tx-monitor
/block-fetch
/peer-sharing
/tx-submission

# Test binary, built with `go test -c`
*.test
Expand Down
79 changes: 79 additions & 0 deletions cmd/tx-submission/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2023 Blink Labs, LLC.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"fmt"
"os"

ouroboros "github.com/blinklabs-io/gouroboros"
"github.com/blinklabs-io/gouroboros/cmd/common"
"github.com/blinklabs-io/gouroboros/protocol/txsubmission"
)

type txSubmissionFlags struct {
*common.GlobalFlags
}

func main() {
// Parse commandline
f := txSubmissionFlags{
GlobalFlags: common.NewGlobalFlags(),
}
f.Parse()
// Create connection
conn := common.CreateClientConnection(f.GlobalFlags)
errorChan := make(chan error)
go func() {
for {
err := <-errorChan
fmt.Printf("ERROR(async): %s\n", err)
os.Exit(1)
}
}()
o, err := ouroboros.New(
ouroboros.WithConnection(conn),
ouroboros.WithNetworkMagic(uint32(f.NetworkMagic)),
ouroboros.WithErrorChan(errorChan),
ouroboros.WithNodeToNode(f.NtnProto),
ouroboros.WithKeepAlive(true),
ouroboros.WithTxSubmissionConfig(
txsubmission.NewConfig(
txsubmission.WithRequestTxIdsFunc(
// TODO: do something more useful
func(blocking bool, ack uint16, req uint16) ([]txsubmission.TxIdAndSize, error) {
return []txsubmission.TxIdAndSize{}, nil
},
),
txsubmission.WithRequestTxsFunc(
// TODO: do something more useful
func(txIds []txsubmission.TxId) ([]txsubmission.TxBody, error) {
return []txsubmission.TxBody{}, nil
},
),
),
),
)
if err != nil {
fmt.Printf("ERROR: %s\n", err)
os.Exit(1)
}

// Start the TxSubmission activity loop
o.TxSubmission().Client.Init()

// Wait forever
select {}
}
50 changes: 39 additions & 11 deletions protocol/txsubmission/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package txsubmission

import (
"fmt"
"sync"

"github.com/blinklabs-io/gouroboros/protocol"
)

type Client struct {
*protocol.Protocol
config *Config
config *Config
onceInit sync.Once
}

func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
Expand All @@ -34,36 +37,45 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
}
// Update state map with timeout
stateMap := StateMap.Copy()
if entry, ok := stateMap[STATE_IDLE]; ok {
if entry, ok := stateMap[stateIdle]; ok {
entry.Timeout = c.config.IdleTimeout
stateMap[STATE_IDLE] = entry
stateMap[stateIdle] = entry
}
// Configure underlying Protocol
protoConfig := protocol.ProtocolConfig{
Name: PROTOCOL_NAME,
ProtocolId: PROTOCOL_ID,
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: protoOptions.Muxer,
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleClient,
MessageHandlerFunc: c.messageHandler,
MessageFromCborFunc: NewMsgFromCbor,
StateMap: stateMap,
InitialState: STATE_INIT,
InitialState: stateInit,
}
c.Protocol = protocol.New(protoConfig)
return c
}

// Init tells the server to begin asking us for transactions
func (c *Client) Init() {
c.onceInit.Do(func() {
// Send our Init message
msg := NewMsgInit()
_ = c.SendMessage(msg)
})
}

func (c *Client) messageHandler(msg protocol.Message, isResponse bool) error {
var err error
switch msg.Type() {
case MESSAGE_TYPE_REQUEST_TX_IDS:
case MessageTypeRequestTxIds:
err = c.handleRequestTxIds(msg)
case MESSAGE_TYPE_REQUEST_TXS:
case MessageTypeRequestTxs:
err = c.handleRequestTxs(msg)
default:
err = fmt.Errorf("%s: received unexpected message type %d", PROTOCOL_NAME, msg.Type())
err = fmt.Errorf("%s: received unexpected message type %d", ProtocolName, msg.Type())
}
return err
}
Expand All @@ -74,7 +86,15 @@ func (c *Client) handleRequestTxIds(msg protocol.Message) error {
}
msgRequestTxIds := msg.(*MsgRequestTxIds)
// Call the user callback function
return c.config.RequestTxIdsFunc(msgRequestTxIds.Blocking, msgRequestTxIds.Ack, msgRequestTxIds.Req)
txIds, err := c.config.RequestTxIdsFunc(msgRequestTxIds.Blocking, msgRequestTxIds.Ack, msgRequestTxIds.Req)
if err != nil {
return err
}
resp := NewMsgReplyTxIds(txIds)
if err := c.SendMessage(resp); err != nil {
return err
}
return nil
}

func (c *Client) handleRequestTxs(msg protocol.Message) error {
Expand All @@ -83,5 +103,13 @@ func (c *Client) handleRequestTxs(msg protocol.Message) error {
}
msgRequestTxs := msg.(*MsgRequestTxs)
// Call the user callback function
return c.config.RequestTxsFunc(msgRequestTxs.TxIds)
txs, err := c.config.RequestTxsFunc(msgRequestTxs.TxIds)
if err != nil {
return err
}
resp := NewMsgReplyTxs(txs)
if err := c.SendMessage(resp); err != nil {
return err
}
return nil
}
39 changes: 20 additions & 19 deletions protocol/txsubmission/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,38 @@ package txsubmission

import (
"fmt"

"github.com/blinklabs-io/gouroboros/cbor"
"github.com/blinklabs-io/gouroboros/protocol"
)

const (
MESSAGE_TYPE_REQUEST_TX_IDS = 0
MESSAGE_TYPE_REPLY_TX_IDS = 1
MESSAGE_TYPE_REQUEST_TXS = 2
MESSAGE_TYPE_REPLY_TXS = 3
MESSAGE_TYPE_DONE = 4
MESSAGE_TYPE_INIT = 6
MessageTypeRequestTxIds = 0
MessageTypeReplyTxIds = 1
MessageTypeRequestTxs = 2
MessageTypeReplyTxs = 3
MessageTypeDone = 4
MessageTypeInit = 6
)

func NewMsgFromCbor(msgType uint, data []byte) (protocol.Message, error) {
var ret protocol.Message
switch msgType {
case MESSAGE_TYPE_REQUEST_TX_IDS:
case MessageTypeRequestTxIds:
ret = &MsgRequestTxIds{}
case MESSAGE_TYPE_REPLY_TX_IDS:
case MessageTypeReplyTxIds:
ret = &MsgReplyTxIds{}
case MESSAGE_TYPE_REQUEST_TXS:
case MessageTypeRequestTxs:
ret = &MsgRequestTxs{}
case MESSAGE_TYPE_REPLY_TXS:
case MessageTypeReplyTxs:
ret = &MsgReplyTxs{}
case MESSAGE_TYPE_DONE:
case MessageTypeDone:
ret = &MsgDone{}
case MESSAGE_TYPE_INIT:
case MessageTypeInit:
ret = &MsgInit{}
}
if _, err := cbor.Decode(data, ret); err != nil {
return nil, fmt.Errorf("%s: decode error: %s", PROTOCOL_NAME, err)
return nil, fmt.Errorf("%s: decode error: %s", ProtocolName, err)
}
if ret != nil {
// Store the raw message CBOR
Expand All @@ -65,7 +66,7 @@ type MsgRequestTxIds struct {
func NewMsgRequestTxIds(blocking bool, ack uint16, req uint16) *MsgRequestTxIds {
m := &MsgRequestTxIds{
MessageBase: protocol.MessageBase{
MessageType: MESSAGE_TYPE_REQUEST_TX_IDS,
MessageType: MessageTypeRequestTxIds,
},
Blocking: blocking,
Ack: ack,
Expand All @@ -82,7 +83,7 @@ type MsgReplyTxIds struct {
func NewMsgReplyTxIds(txIds []TxIdAndSize) *MsgReplyTxIds {
m := &MsgReplyTxIds{
MessageBase: protocol.MessageBase{
MessageType: MESSAGE_TYPE_REPLY_TX_IDS,
MessageType: MessageTypeReplyTxIds,
},
TxIds: txIds,
}
Expand All @@ -97,7 +98,7 @@ type MsgRequestTxs struct {
func NewMsgRequestTxs(txIds []TxId) *MsgRequestTxs {
m := &MsgRequestTxs{
MessageBase: protocol.MessageBase{
MessageType: MESSAGE_TYPE_REQUEST_TXS,
MessageType: MessageTypeRequestTxs,
},
TxIds: txIds,
}
Expand All @@ -112,7 +113,7 @@ type MsgReplyTxs struct {
func NewMsgReplyTxs(txs []TxBody) *MsgReplyTxs {
m := &MsgReplyTxs{
MessageBase: protocol.MessageBase{
MessageType: MESSAGE_TYPE_REPLY_TXS,
MessageType: MessageTypeReplyTxs,
},
Txs: txs,
}
Expand All @@ -126,7 +127,7 @@ type MsgDone struct {
func NewMsgDone() *MsgDone {
m := &MsgDone{
MessageBase: protocol.MessageBase{
MessageType: MESSAGE_TYPE_DONE,
MessageType: MessageTypeDone,
},
}
return m
Expand All @@ -139,7 +140,7 @@ type MsgInit struct {
func NewMsgInit() *MsgInit {
m := &MsgInit{
MessageBase: protocol.MessageBase{
MessageType: MESSAGE_TYPE_INIT,
MessageType: MessageTypeInit,
},
}
return m
Expand Down
9 changes: 5 additions & 4 deletions protocol/txsubmission/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ package txsubmission

import (
"encoding/hex"
"github.com/blinklabs-io/gouroboros/cbor"
"github.com/blinklabs-io/gouroboros/protocol"
"reflect"
"testing"

"github.com/blinklabs-io/gouroboros/cbor"
"github.com/blinklabs-io/gouroboros/protocol"
)

type testDefinition struct {
Expand All @@ -33,12 +34,12 @@ var tests = []testDefinition{
{
CborHex: "8104",
Message: NewMsgDone(),
MessageType: MESSAGE_TYPE_DONE,
MessageType: MessageTypeDone,
},
{
CborHex: "8106",
Message: NewMsgInit(),
MessageType: MESSAGE_TYPE_INIT,
MessageType: MessageTypeInit,
},
}

Expand Down
17 changes: 9 additions & 8 deletions protocol/txsubmission/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package txsubmission

import (
"fmt"

"github.com/blinklabs-io/gouroboros/protocol"
)

Expand All @@ -29,16 +30,16 @@ func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server {
config: cfg,
}
protoConfig := protocol.ProtocolConfig{
Name: PROTOCOL_NAME,
ProtocolId: PROTOCOL_ID,
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: protoOptions.Muxer,
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleServer,
MessageHandlerFunc: s.messageHandler,
MessageFromCborFunc: NewMsgFromCbor,
StateMap: StateMap,
InitialState: STATE_INIT,
InitialState: stateInit,
}
s.Protocol = protocol.New(protoConfig)
return s
Expand All @@ -47,16 +48,16 @@ func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server {
func (s *Server) messageHandler(msg protocol.Message, isResponse bool) error {
var err error
switch msg.Type() {
case MESSAGE_TYPE_REPLY_TX_IDS:
case MessageTypeReplyTxIds:
err = s.handleReplyTxIds(msg)
case MESSAGE_TYPE_REPLY_TXS:
case MessageTypeReplyTxs:
err = s.handleReplyTxs(msg)
case MESSAGE_TYPE_DONE:
case MessageTypeDone:
err = s.handleDone()
case MESSAGE_TYPE_INIT:
case MessageTypeInit:
err = s.handleInit()
default:
err = fmt.Errorf("%s: received unexpected message type %d", PROTOCOL_NAME, msg.Type())
err = fmt.Errorf("%s: received unexpected message type %d", ProtocolName, msg.Type())
}
return err
}
Expand Down
Loading

0 comments on commit 1bc1cd9

Please sign in to comment.