Skip to content

Commit

Permalink
feat: finish tx-submission client implementation
Browse files Browse the repository at this point in the history
Fixes #54
  • Loading branch information
agaffney committed Aug 9, 2023
1 parent f46e609 commit ce8dcec
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 72 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
/tx-monitor
/block-fetch
/peer-sharing
/tx-submission

# Test binary, built with `go test -c`
*.test
Expand Down
149 changes: 149 additions & 0 deletions cmd/tx-submission/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2023 Blink Labs, LLC.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"fmt"
"os"

ouroboros "github.com/blinklabs-io/gouroboros"
"github.com/blinklabs-io/gouroboros/cmd/common"
"github.com/blinklabs-io/gouroboros/protocol/txsubmission"
)

type txSubmissionFlags struct {
*common.GlobalFlags
/*
slot uint64
hash string
all bool
*/
}

func main() {
// Parse commandline
f := txSubmissionFlags{
GlobalFlags: common.NewGlobalFlags(),
}
/*
f.Flagset.Uint64Var(&f.slot, "slot", 0, "slot for single block to fetch")
f.Flagset.StringVar(&f.hash, "hash", "", "hash for single block to fetch")
f.Flagset.BoolVar(&f.all, "all", false, "show all available detail for block")
*/
f.Parse()
// Create connection
conn := common.CreateClientConnection(f.GlobalFlags)
errorChan := make(chan error)
go func() {
for {
err := <-errorChan
fmt.Printf("ERROR(async): %s\n", err)
os.Exit(1)
}
}()
o, err := ouroboros.New(
ouroboros.WithConnection(conn),
ouroboros.WithNetworkMagic(uint32(f.NetworkMagic)),
ouroboros.WithErrorChan(errorChan),
ouroboros.WithNodeToNode(f.NtnProto),
ouroboros.WithKeepAlive(true),
ouroboros.WithTxSubmissionConfig(
txsubmission.NewConfig(
txsubmission.WithRequestTxIdsFunc(
// TODO
func(blocking bool, ack uint16, req uint16) ([]txsubmission.TxIdAndSize, error) {
return []txsubmission.TxIdAndSize{}, nil
},
),
),
),
)
if err != nil {
fmt.Printf("ERROR: %s\n", err)
os.Exit(1)
}

// Start the TxSubmission activity loop
o.TxSubmission().Client.Init()

// Wait forever
select {}

/*
blockHash, err := hex.DecodeString(f.hash)
if err != nil {
fmt.Printf("ERROR: failed to decode block hash: %s\n", err)
os.Exit(1)
}
block, err := o.BlockFetch().Client.GetBlock(ocommon.NewPoint(f.slot, blockHash))
if err != nil {
fmt.Printf("ERROR: failed to fetch block: %s\n", err)
os.Exit(1)
}
// Display block info
switch v := block.(type) {
case *ledger.ByronEpochBoundaryBlock:
fmt.Printf("era = Byron (EBB), epoch = %d, id = %s\n", v.Header.ConsensusData.Epoch, v.Hash())
case *ledger.ByronMainBlock:
fmt.Printf("era = Byron, epoch = %d, slot = %d, id = %s\n", v.Header.ConsensusData.SlotId.Epoch, v.SlotNumber(), v.Hash())
case ledger.Block:
fmt.Printf("era = %s, slot = %d, block_no = %d, id = %s\n", v.Era().Name, v.SlotNumber(), v.BlockNumber(), v.Hash())
}
if f.all {
// Display transaction info
fmt.Printf("\nTransactions:\n")
for _, tx := range block.Transactions() {
fmt.Printf(" Hash: %s\n", tx.Hash())
if tx.Metadata().Value() != nil {
fmt.Printf(" Metadata:\n %#v (%x)\n", tx.Metadata().Value(), tx.Metadata().Cbor())
}
fmt.Printf(" Inputs:\n")
for _, input := range tx.Inputs() {
fmt.Printf(" Id: %s\n", input.Id())
fmt.Printf(" Index: %d\n", input.Index())
fmt.Println("")
}
fmt.Printf(" Outputs:\n")
for _, output := range tx.Outputs() {
fmt.Printf(" Address: %s\n", output.Address())
fmt.Printf(" Amount: %d\n", output.Amount())
assets := output.Assets()
if assets != nil {
fmt.Printf(" Assets:\n")
for _, policyId := range assets.Policies() {
fmt.Printf(" Policy Id: %s\n", policyId)
fmt.Printf(" Policy assets:\n")
for _, assetName := range assets.Assets(policyId) {
fmt.Printf(" Asset name: %s\n", assetName)
fmt.Printf(" Amount: %d\n", assets.Asset(policyId, assetName))
fmt.Println("")
}
}
}
if output.Datum() != nil {
jsonData, err := json.Marshal(output.Datum())
if err != nil {
fmt.Printf(" Datum (hex): %x\n", output.Datum().Cbor())
} else {
fmt.Printf(" Datum: %s\n", jsonData)
}
}
fmt.Println("")
}
}
}
*/
}
42 changes: 32 additions & 10 deletions protocol/txsubmission/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package txsubmission

import (
"fmt"
"sync"

"github.com/blinklabs-io/gouroboros/protocol"
)

type Client struct {
*protocol.Protocol
config *Config
config *Config
onceInit sync.Once
}

func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
Expand All @@ -34,50 +37,69 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
}
// Update state map with timeout
stateMap := StateMap.Copy()
if entry, ok := stateMap[STATE_IDLE]; ok {
if entry, ok := stateMap[stateIdle]; ok {
entry.Timeout = c.config.IdleTimeout
stateMap[STATE_IDLE] = entry
stateMap[stateIdle] = entry
}
// Configure underlying Protocol
protoConfig := protocol.ProtocolConfig{
Name: PROTOCOL_NAME,
ProtocolId: PROTOCOL_ID,
Name: ProtocolName,
ProtocolId: ProtocolId,
Muxer: protoOptions.Muxer,
ErrorChan: protoOptions.ErrorChan,
Mode: protoOptions.Mode,
Role: protocol.ProtocolRoleClient,
MessageHandlerFunc: c.messageHandler,
MessageFromCborFunc: NewMsgFromCbor,
StateMap: stateMap,
InitialState: STATE_INIT,
InitialState: stateInit,
}
c.Protocol = protocol.New(protoConfig)
return c
}

// Init tells the server to begin asking us for transactions
func (c *Client) Init() {
c.onceInit.Do(func() {
// Send our Init message
msg := NewMsgInit()
_ = c.SendMessage(msg)
})
}

func (c *Client) messageHandler(msg protocol.Message, isResponse bool) error {
var err error
switch msg.Type() {
case MESSAGE_TYPE_REQUEST_TX_IDS:
case MessageTypeRequestTxIds:
err = c.handleRequestTxIds(msg)
case MESSAGE_TYPE_REQUEST_TXS:
case MessageTypeRequestTxs:
err = c.handleRequestTxs(msg)
default:
err = fmt.Errorf("%s: received unexpected message type %d", PROTOCOL_NAME, msg.Type())
err = fmt.Errorf("%s: received unexpected message type %d", ProtocolName, msg.Type())
}
return err
}

func (c *Client) handleRequestTxIds(msg protocol.Message) error {
fmt.Printf("msg = %#v\n", msg)
if c.config.RequestTxIdsFunc == nil {
return fmt.Errorf("received tx-submission RequestTxIds message but no callback function is defined")
}
msgRequestTxIds := msg.(*MsgRequestTxIds)
// Call the user callback function
return c.config.RequestTxIdsFunc(msgRequestTxIds.Blocking, msgRequestTxIds.Ack, msgRequestTxIds.Req)
txIds, err := c.config.RequestTxIdsFunc(msgRequestTxIds.Blocking, msgRequestTxIds.Ack, msgRequestTxIds.Req)
if err != nil {
return err
}
resp := NewMsgReplyTxIds(txIds)
if err := c.SendMessage(resp); err != nil {
return err
}
return nil
}

func (c *Client) handleRequestTxs(msg protocol.Message) error {
fmt.Printf("msg = %#v\n", msg)
if c.config.RequestTxsFunc == nil {
return fmt.Errorf("received tx-submission RequestTxs message but no callback function is defined")
}
Expand Down
39 changes: 20 additions & 19 deletions protocol/txsubmission/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,38 @@ package txsubmission

import (
"fmt"

"github.com/blinklabs-io/gouroboros/cbor"
"github.com/blinklabs-io/gouroboros/protocol"
)

const (
MESSAGE_TYPE_REQUEST_TX_IDS = 0
MESSAGE_TYPE_REPLY_TX_IDS = 1
MESSAGE_TYPE_REQUEST_TXS = 2
MESSAGE_TYPE_REPLY_TXS = 3
MESSAGE_TYPE_DONE = 4
MESSAGE_TYPE_INIT = 6
MessageTypeRequestTxIds = 0
MessageTypeReplyTxIds = 1
MessageTypeRequestTxs = 2
MessageTypeReplyTxs = 3
MessageTypeDone = 4
MessageTypeInit = 6
)

func NewMsgFromCbor(msgType uint, data []byte) (protocol.Message, error) {
var ret protocol.Message
switch msgType {
case MESSAGE_TYPE_REQUEST_TX_IDS:
case MessageTypeRequestTxIds:
ret = &MsgRequestTxIds{}
case MESSAGE_TYPE_REPLY_TX_IDS:
case MessageTypeReplyTxIds:
ret = &MsgReplyTxIds{}
case MESSAGE_TYPE_REQUEST_TXS:
case MessageTypeRequestTxs:
ret = &MsgRequestTxs{}
case MESSAGE_TYPE_REPLY_TXS:
case MessageTypeReplyTxs:
ret = &MsgReplyTxs{}
case MESSAGE_TYPE_DONE:
case MessageTypeDone:
ret = &MsgDone{}
case MESSAGE_TYPE_INIT:
case MessageTypeInit:
ret = &MsgInit{}
}
if _, err := cbor.Decode(data, ret); err != nil {
return nil, fmt.Errorf("%s: decode error: %s", PROTOCOL_NAME, err)
return nil, fmt.Errorf("%s: decode error: %s", ProtocolName, err)
}
if ret != nil {
// Store the raw message CBOR
Expand All @@ -65,7 +66,7 @@ type MsgRequestTxIds struct {
func NewMsgRequestTxIds(blocking bool, ack uint16, req uint16) *MsgRequestTxIds {
m := &MsgRequestTxIds{
MessageBase: protocol.MessageBase{
MessageType: MESSAGE_TYPE_REQUEST_TX_IDS,
MessageType: MessageTypeRequestTxIds,
},
Blocking: blocking,
Ack: ack,
Expand All @@ -82,7 +83,7 @@ type MsgReplyTxIds struct {
func NewMsgReplyTxIds(txIds []TxIdAndSize) *MsgReplyTxIds {
m := &MsgReplyTxIds{
MessageBase: protocol.MessageBase{
MessageType: MESSAGE_TYPE_REPLY_TX_IDS,
MessageType: MessageTypeReplyTxIds,
},
TxIds: txIds,
}
Expand All @@ -97,7 +98,7 @@ type MsgRequestTxs struct {
func NewMsgRequestTxs(txIds []TxId) *MsgRequestTxs {
m := &MsgRequestTxs{
MessageBase: protocol.MessageBase{
MessageType: MESSAGE_TYPE_REQUEST_TXS,
MessageType: MessageTypeRequestTxs,
},
TxIds: txIds,
}
Expand All @@ -112,7 +113,7 @@ type MsgReplyTxs struct {
func NewMsgReplyTxs(txs []TxBody) *MsgReplyTxs {
m := &MsgReplyTxs{
MessageBase: protocol.MessageBase{
MessageType: MESSAGE_TYPE_REPLY_TXS,
MessageType: MessageTypeReplyTxs,
},
Txs: txs,
}
Expand All @@ -126,7 +127,7 @@ type MsgDone struct {
func NewMsgDone() *MsgDone {
m := &MsgDone{
MessageBase: protocol.MessageBase{
MessageType: MESSAGE_TYPE_DONE,
MessageType: MessageTypeDone,
},
}
return m
Expand All @@ -139,7 +140,7 @@ type MsgInit struct {
func NewMsgInit() *MsgInit {
m := &MsgInit{
MessageBase: protocol.MessageBase{
MessageType: MESSAGE_TYPE_INIT,
MessageType: MessageTypeInit,
},
}
return m
Expand Down
9 changes: 5 additions & 4 deletions protocol/txsubmission/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ package txsubmission

import (
"encoding/hex"
"github.com/blinklabs-io/gouroboros/cbor"
"github.com/blinklabs-io/gouroboros/protocol"
"reflect"
"testing"

"github.com/blinklabs-io/gouroboros/cbor"
"github.com/blinklabs-io/gouroboros/protocol"
)

type testDefinition struct {
Expand All @@ -33,12 +34,12 @@ var tests = []testDefinition{
{
CborHex: "8104",
Message: NewMsgDone(),
MessageType: MESSAGE_TYPE_DONE,
MessageType: MessageTypeDONE,
},
{
CborHex: "8106",
Message: NewMsgInit(),
MessageType: MESSAGE_TYPE_INIT,
MessageType: MessageTypeINIT,
},
}

Expand Down
Loading

0 comments on commit ce8dcec

Please sign in to comment.