Skip to content

Commit

Permalink
feat(ipld): vouchers as plain ipld.Node (#325)
Browse files Browse the repository at this point in the history
* feat(ipld): vouchers as plain ipld.Node

* feat: add ValidationResult#Equals() utility

* feat(ipld): introduce TypedVoucher tuple type

* chore(ipld): ipld.Node -> datamodel.Node

* chore: remove RegisterVoucherResultType

* fix: minor staticcheck fixes
  • Loading branch information
rvagg authored and hannahhoward committed Oct 7, 2022
1 parent 1b3f897 commit f54bd1a
Show file tree
Hide file tree
Showing 48 changed files with 938 additions and 1,214 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (vl *myValidator) ValidatePush(
sender peer.ID,
voucher datatransfer.Voucher,
baseCid cid.Cid,
selector ipld.Node) error {
selector datamodel.Node) error {
v := voucher.(*myVoucher)
if v.data == "" || v.data != "validpush" {
Expand All @@ -99,7 +99,7 @@ func (vl *myValidator) ValidatePull(
receiver peer.ID,
voucher datatransfer.Voucher,
baseCid cid.Cid,
selector ipld.Node) error {
selector datamodel.Node) error {
v := voucher.(*myVoucher)
if v.data == "" || v.data != "validpull" {
Expand Down Expand Up @@ -135,7 +135,7 @@ must be sent with the request. Using the trivial examples above:
For more detail, please see the [unit tests](https://github.com/filecoin-project/go-data-transfer/blob/master/impl/impl_test.go).

### Open a Push or Pull Request
For a push or pull request, provide a context, a `datatransfer.Voucher`, a host recipient `peer.ID`, a baseCID `cid.CID` and a selector `ipld.Node`. These
For a push or pull request, provide a context, a `datatransfer.Voucher`, a host recipient `peer.ID`, a baseCID `cid.CID` and a selector `datamodel.Node`. These
calls return a `datatransfer.ChannelID` and any error:
```go
channelID, err := dtm.OpenPullDataChannel(ctx, recipient, voucher, baseCid, selector)
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc,
timer := time.NewTimer(30 * time.Second)
start := time.Now()
for j := 0; j < numfiles; j++ {
_, err := pusher.Manager.OpenPushDataChannel(ctx, receiver.Peer, testutil.NewFakeDTType(), allCids[j], allSelector)
_, err := pusher.Manager.OpenPushDataChannel(ctx, receiver.Peer, testutil.NewTestTypedVoucher(), allCids[j], allSelector)
if err != nil {
b.Fatalf("received error on request: %s", err.Error())
}
Expand Down
3 changes: 1 addition & 2 deletions benchmarks/testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,7 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD
sv := testutil.NewStubbedValidator()
sv.StubSuccessPull()
sv.StubSuccessPush()
dt.RegisterVoucherType(testutil.NewFakeDTType(), sv)
dt.RegisterVoucherResultType(testutil.NewFakeDTType())
dt.RegisterVoucherType(testutil.TestVoucherType, sv)
return Instance{
Adapter: dtNet,
Peer: p,
Expand Down
79 changes: 42 additions & 37 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,19 @@ import (
"bytes"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/datamodel"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
peer "github.com/libp2p/go-libp2p-core/peer"

datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/go-data-transfer/v2/channels/internal"
ipldutils "github.com/filecoin-project/go-data-transfer/v2/ipldutils"
)

// channelState is immutable channel data plus mutable state
type channelState struct {
ic internal.ChannelState

// additional voucherResults
voucherResultDecoder DecoderByTypeFunc
voucherDecoder DecoderByTypeFunc
}

// EmptyChannelState is the zero value for channel state, meaning not present
Expand All @@ -45,7 +42,7 @@ func (c channelState) BaseCID() cid.Cid { return c.ic.BaseCid }

// Selector returns the IPLD selector for this data transfer (represented as
// an IPLD node)
func (c channelState) Selector() ipld.Node {
func (c channelState) Selector() datamodel.Node {
builder := basicnode.Prototype.Any.NewBuilder()
reader := bytes.NewReader(c.ic.Selector.Raw)
err := dagcbor.Decode(builder, reader)
Expand All @@ -56,13 +53,15 @@ func (c channelState) Selector() ipld.Node {
}

// Voucher returns the voucher for this data transfer
func (c channelState) Voucher() datatransfer.Voucher {
func (c channelState) Voucher() (datatransfer.TypedVoucher, error) {
if len(c.ic.Vouchers) == 0 {
return nil
return datatransfer.TypedVoucher{}, nil
}
node, err := ipldutils.DeferredToNode(c.ic.Vouchers[0].Voucher)
if err != nil {
return datatransfer.TypedVoucher{}, err
}
decoder, _ := c.voucherDecoder(c.ic.Vouchers[0].Type)
encodable, _ := decoder.DecodeFromCbor(c.ic.Vouchers[0].Voucher.Raw)
return encodable.(datatransfer.Voucher)
return datatransfer.TypedVoucher{Voucher: node, Type: c.ic.Vouchers[0].Type}, nil
}

// ReceivedCidsTotal returns the number of (non-unique) cids received so far
Expand Down Expand Up @@ -108,36 +107,46 @@ func (c channelState) Message() string {
return c.ic.Message
}

func (c channelState) Vouchers() []datatransfer.Voucher {
vouchers := make([]datatransfer.Voucher, 0, len(c.ic.Vouchers))
func (c channelState) Vouchers() ([]datatransfer.TypedVoucher, error) {
vouchers := make([]datatransfer.TypedVoucher, 0, len(c.ic.Vouchers))
for _, encoded := range c.ic.Vouchers {
decoder, _ := c.voucherDecoder(encoded.Type)
encodable, _ := decoder.DecodeFromCbor(encoded.Voucher.Raw)
vouchers = append(vouchers, encodable.(datatransfer.Voucher))
node, err := ipldutils.DeferredToNode(encoded.Voucher)
if err != nil {
return nil, err
}
vouchers = append(vouchers, datatransfer.TypedVoucher{Voucher: node, Type: encoded.Type})
}
return vouchers
return vouchers, nil
}

func (c channelState) LastVoucher() datatransfer.Voucher {
decoder, _ := c.voucherDecoder(c.ic.Vouchers[len(c.ic.Vouchers)-1].Type)
encodable, _ := decoder.DecodeFromCbor(c.ic.Vouchers[len(c.ic.Vouchers)-1].Voucher.Raw)
return encodable.(datatransfer.Voucher)
func (c channelState) LastVoucher() (datatransfer.TypedVoucher, error) {
ev := c.ic.Vouchers[len(c.ic.Vouchers)-1]
node, err := ipldutils.DeferredToNode(ev.Voucher)
if err != nil {
return datatransfer.TypedVoucher{}, err
}
return datatransfer.TypedVoucher{Voucher: node, Type: ev.Type}, nil
}

func (c channelState) LastVoucherResult() datatransfer.VoucherResult {
decoder, _ := c.voucherResultDecoder(c.ic.VoucherResults[len(c.ic.VoucherResults)-1].Type)
encodable, _ := decoder.DecodeFromCbor(c.ic.VoucherResults[len(c.ic.VoucherResults)-1].VoucherResult.Raw)
return encodable.(datatransfer.VoucherResult)
func (c channelState) LastVoucherResult() (datatransfer.TypedVoucher, error) {
evr := c.ic.VoucherResults[len(c.ic.VoucherResults)-1]
node, err := ipldutils.DeferredToNode(evr.VoucherResult)
if err != nil {
return datatransfer.TypedVoucher{}, err
}
return datatransfer.TypedVoucher{Voucher: node, Type: evr.Type}, nil
}

func (c channelState) VoucherResults() []datatransfer.VoucherResult {
voucherResults := make([]datatransfer.VoucherResult, 0, len(c.ic.VoucherResults))
func (c channelState) VoucherResults() ([]datatransfer.TypedVoucher, error) {
voucherResults := make([]datatransfer.TypedVoucher, 0, len(c.ic.VoucherResults))
for _, encoded := range c.ic.VoucherResults {
decoder, _ := c.voucherResultDecoder(encoded.Type)
encodable, _ := decoder.DecodeFromCbor(encoded.VoucherResult.Raw)
voucherResults = append(voucherResults, encodable.(datatransfer.VoucherResult))
node, err := ipldutils.DeferredToNode(encoded.VoucherResult)
if err != nil {
return nil, err
}
voucherResults = append(voucherResults, datatransfer.TypedVoucher{Voucher: node, Type: encoded.Type})
}
return voucherResults
return voucherResults, nil
}

func (c channelState) SelfPeer() peer.ID {
Expand Down Expand Up @@ -174,12 +183,8 @@ func (c channelState) Stages() *datatransfer.ChannelStages {
return c.ic.Stages
}

func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc) datatransfer.ChannelState {
return channelState{
ic: c,
voucherResultDecoder: voucherResultDecoder,
voucherDecoder: voucherDecoder,
}
func fromInternalChannelState(c internal.ChannelState) datatransfer.ChannelState {
return channelState{ic: c}
}

var _ datatransfer.ChannelState = channelState{}
42 changes: 15 additions & 27 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
Expand All @@ -20,11 +20,9 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/go-data-transfer/v2/channels/internal"
"github.com/filecoin-project/go-data-transfer/v2/channels/internal/migrations"
"github.com/filecoin-project/go-data-transfer/v2/encoding"
ipldutils "github.com/filecoin-project/go-data-transfer/v2/ipldutils"
)

type DecoderByTypeFunc func(identifier datatransfer.TypeIdentifier) (encoding.Decoder, bool)

type Notifier func(datatransfer.Event, datatransfer.ChannelState)

// ErrNotFound is returned when a channel cannot be found with a given channel ID
Expand All @@ -46,8 +44,6 @@ var ErrWrongType = errors.New("Cannot change type of implementation specific dat
// Channels is a thread safe list of channels
type Channels struct {
notifier Notifier
voucherDecoder DecoderByTypeFunc
voucherResultDecoder DecoderByTypeFunc
blockIndexCache *blockIndexCache
progressCache *progressCache
stateMachines fsm.Group
Expand All @@ -65,16 +61,10 @@ type ChannelEnvironment interface {
// New returns a new thread safe list of channels
func New(ds datastore.Batching,
notifier Notifier,
voucherDecoder DecoderByTypeFunc,
voucherResultDecoder DecoderByTypeFunc,
env ChannelEnvironment,
selfPeer peer.ID) (*Channels, error) {

c := &Channels{
notifier: notifier,
voucherDecoder: voucherDecoder,
voucherResultDecoder: voucherResultDecoder,
}
c := &Channels{notifier: notifier}
c.blockIndexCache = newBlockIndexCache()
c.progressCache = newProgressCache()
channelMigrations, err := migrations.GetChannelStateMigrations(selfPeer)
Expand Down Expand Up @@ -121,19 +111,19 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {

// CreateNew creates a new channel id and channel state and saves to channels.
// returns error if the channel exists already.
func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, baseCid cid.Cid, selector ipld.Node, voucher datatransfer.Voucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, error) {
func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, baseCid cid.Cid, selector datamodel.Node, voucher datatransfer.TypedVoucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, error) {
var responder peer.ID
if dataSender == initiator {
responder = dataReceiver
} else {
responder = dataSender
}
chid := datatransfer.ChannelID{Initiator: initiator, Responder: responder, ID: tid}
voucherBytes, err := encoding.Encode(voucher)
initialVoucher, err := ipldutils.NodeToDeferred(voucher.Voucher)
if err != nil {
return datatransfer.ChannelID{}, err
}
selBytes, err := encoding.Encode(selector)
selBytes, err := ipldutils.NodeToBytes(selector)
if err != nil {
return datatransfer.ChannelID{}, err
}
Expand All @@ -149,10 +139,8 @@ func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, base
Stages: &datatransfer.ChannelStages{},
Vouchers: []internal.EncodedVoucher{
{
Type: voucher.Type(),
Voucher: &cbg.Deferred{
Raw: voucherBytes,
},
Type: voucher.Type,
Voucher: initialVoucher,
},
},
Status: datatransfer.Requested,
Expand Down Expand Up @@ -289,21 +277,21 @@ func (c *Channels) ResumeResponder(chid datatransfer.ChannelID) error {
}

// NewVoucher records a new voucher for this channel
func (c *Channels) NewVoucher(chid datatransfer.ChannelID, voucher datatransfer.Voucher) error {
voucherBytes, err := encoding.Encode(voucher)
func (c *Channels) NewVoucher(chid datatransfer.ChannelID, voucher datatransfer.TypedVoucher) error {
voucherBytes, err := ipldutils.NodeToBytes(voucher.Voucher)
if err != nil {
return err
}
return c.send(chid, datatransfer.NewVoucher, voucher.Type(), voucherBytes)
return c.send(chid, datatransfer.NewVoucher, voucher.Type, voucherBytes)
}

// NewVoucherResult records a new voucher result for this channel
func (c *Channels) NewVoucherResult(chid datatransfer.ChannelID, voucherResult datatransfer.VoucherResult) error {
voucherResultBytes, err := encoding.Encode(voucherResult)
func (c *Channels) NewVoucherResult(chid datatransfer.ChannelID, voucherResult datatransfer.TypedVoucher) error {
voucherResultBytes, err := ipldutils.NodeToBytes(voucherResult.Voucher)
if err != nil {
return err
}
return c.send(chid, datatransfer.NewVoucherResult, voucherResult.Type(), voucherResultBytes)
return c.send(chid, datatransfer.NewVoucherResult, voucherResult.Type, voucherResultBytes)
}

// Complete indicates responder has completed sending/receiving data
Expand Down Expand Up @@ -485,5 +473,5 @@ func (c *Channels) checkChannelExists(chid datatransfer.ChannelID, code datatran

// Convert from the internally used channel state format to the externally exposed ChannelState
func (c *Channels) fromInternalChannelState(ch internal.ChannelState) datatransfer.ChannelState {
return fromInternalChannelState(ch, c.voucherDecoder, c.voucherResultDecoder)
return fromInternalChannelState(ch)
}
Loading

0 comments on commit f54bd1a

Please sign in to comment.