Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

share a pool of (un)marshallers #38

Merged
merged 6 commits into from
Sep 7, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions encoding/cloner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package encoding

import (
"sync"

refmt "github.com/polydawn/refmt"
"github.com/polydawn/refmt/obj/atlas"
)

// PooledCloner is a thread-safe pooled object cloner.
type PooledCloner struct {
pool sync.Pool
}

// NewPooledCloner returns a PooledCloner with the given atlas. Do not copy
// after use.
func NewPooledCloner(atl atlas.Atlas) PooledCloner {
return PooledCloner{
pool: sync.Pool{
New: func() interface{} {
return refmt.NewCloner(atl)
},
},
}
}

// Clone clones a into b using a cloner from the pool.
func (p *PooledCloner) Clone(a, b interface{}) error {
c := p.pool.Get().(refmt.Cloner)
err := c.Clone(a, b)
p.pool.Put(c)
return err
}
82 changes: 82 additions & 0 deletions encoding/marshaller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package encoding

import (
"bytes"
"io"
"sync"

cbor "github.com/polydawn/refmt/cbor"
"github.com/polydawn/refmt/obj/atlas"
)

type proxyWriter struct {
w io.Writer
}

func (w *proxyWriter) Write(b []byte) (int, error) {
return w.w.Write(b)
}

// Marshaller is a reusbale CBOR marshaller.
type Marshaller struct {
marshal *cbor.Marshaller
writer proxyWriter
warpfork marked this conversation as resolved.
Show resolved Hide resolved
}

// NewMarshallerAtlased constructs a new cbor Marshaller using the given atlas.
func NewMarshallerAtlased(atl atlas.Atlas) *Marshaller {
m := new(Marshaller)
m.marshal = cbor.NewMarshallerAtlased(&m.writer, atl)
return m
}

// Encode encodes the given object to the given writer.
func (m *Marshaller) Encode(obj interface{}, w io.Writer) error {
m.writer.w = w
err := m.marshal.Marshal(obj)
m.writer.w = nil
return err
}

// Marshal marshels the given object to a byte slice.
func (m *Marshaller) Marshal(obj interface{}) ([]byte, error) {
var buf bytes.Buffer
if err := m.Encode(obj, &buf); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

// PooledMarshaller is a thread-safe pooled CBOR marshaller.
type PooledMarshaller struct {
pool sync.Pool
}

// NewPooledMarshaller returns a PooledMarshaller with the given atlas. Do not
// copy after use.
func NewPooledMarshaller(atl atlas.Atlas) PooledMarshaller {
return PooledMarshaller{
pool: sync.Pool{
New: func() interface{} {
return NewMarshallerAtlased(atl)
},
},
}
}

// Marshal marshals the passed object using the pool of marshallers.
func (p *PooledMarshaller) Marshal(obj interface{}) ([]byte, error) {
m := p.pool.Get().(*Marshaller)
bts, err := m.Marshal(obj)
p.pool.Put(m)
return bts, err
}

// Encode encodes the passed object to the given writer using the pool of
// marshallers.
func (p *PooledMarshaller) Encode(obj interface{}, w io.Writer) error {
m := p.pool.Get().(*Marshaller)
err := m.Encode(obj, w)
p.pool.Put(m)
return err
}
79 changes: 79 additions & 0 deletions encoding/unmarshaller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package encoding

import (
"bytes"
"io"
"sync"

cbor "github.com/polydawn/refmt/cbor"
"github.com/polydawn/refmt/obj/atlas"
)

type proxyReader struct {
r io.Reader
}

func (r *proxyReader) Read(b []byte) (int, error) {
return r.r.Read(b)
}

// Unmarshaller is a reusable CBOR unmarshaller.
type Unmarshaller struct {
unmarshal *cbor.Unmarshaller
reader proxyReader
}

// NewUnmarshallerAtlased creates a new reusable unmarshaller.
func NewUnmarshallerAtlased(atl atlas.Atlas) *Unmarshaller {
m := new(Unmarshaller)
m.unmarshal = cbor.NewUnmarshallerAtlased(&m.reader, atl)
return m
}

// Decode reads a CBOR object from the given reader and decodes it into the
// given object.
func (m *Unmarshaller) Decode(r io.Reader, obj interface{}) error {
m.reader.r = r
err := m.unmarshal.Unmarshal(obj)
m.reader.r = nil
return err
}

// Unmarshal unmarshals the given CBOR byte slice into the given object.
func (m *Unmarshaller) Unmarshal(b []byte, obj interface{}) error {
return m.Decode(bytes.NewReader(b), obj)
}

// PooledUnmarshaller is a thread-safe pooled CBOR unmarshaller.
type PooledUnmarshaller struct {
pool sync.Pool
}

// NewPooledUnmarshaller returns a PooledUnmarshaller with the given atlas. Do
// not copy after use.
func NewPooledUnmarshaller(atl atlas.Atlas) PooledUnmarshaller {
return PooledUnmarshaller{
pool: sync.Pool{
New: func() interface{} {
return NewUnmarshallerAtlased(atl)
},
},
}
}

// Decode decodes an object from the passed reader into the given object using
// the pool of unmarshallers.
func (p *PooledUnmarshaller) Decode(r io.Reader, obj interface{}) error {
u := p.pool.Get().(*Unmarshaller)
err := u.Decode(r, obj)
p.pool.Put(u)
return err
}

// Unmarshal unmarshals the passed object using the pool of unmarshallers.
func (p *PooledUnmarshaller) Unmarshal(b []byte, obj interface{}) error {
u := p.pool.Get().(*Unmarshaller)
err := u.Unmarshal(b, obj)
p.pool.Put(u)
return err
}
69 changes: 15 additions & 54 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,13 @@ import (
"fmt"
"io"
"math"
"math/big"
"strconv"
"strings"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
node "github.com/ipfs/go-ipld-format"
mh "github.com/multiformats/go-multihash"

cbor "github.com/polydawn/refmt/cbor"
"github.com/polydawn/refmt/obj/atlas"
)

// CBORTagLink is the integer used to represent tags in CBOR.
Expand Down Expand Up @@ -47,49 +43,6 @@ var (
ErrNonStringLink = errors.New("link should have been a string")
)

// This atlas describes the CBOR Tag (42) for IPLD links, such that refmt can marshal and unmarshal them
var cidAtlasEntry = atlas.BuildEntry(cid.Cid{}).
UseTag(CBORTagLink).
Transform().
TransformMarshal(atlas.MakeMarshalTransformFunc(
castCidToBytes,
)).
TransformUnmarshal(atlas.MakeUnmarshalTransformFunc(
castBytesToCid,
)).
Complete()

var bigIntAtlasEntry = atlas.BuildEntry(big.Int{}).Transform().
TransformMarshal(atlas.MakeMarshalTransformFunc(
func(i big.Int) ([]byte, error) {
return i.Bytes(), nil
})).
TransformUnmarshal(atlas.MakeUnmarshalTransformFunc(
func(x []byte) (big.Int, error) {
return *big.NewInt(0).SetBytes(x), nil
})).
Complete()

var cborAtlas atlas.Atlas
var cborSortingMode = atlas.KeySortMode_RFC7049
var atlasEntries = []*atlas.AtlasEntry{cidAtlasEntry, bigIntAtlasEntry}

func init() {
cborAtlas = atlas.MustBuild(cidAtlasEntry, bigIntAtlasEntry).WithMapMorphism(atlas.MapMorphism{atlas.KeySortMode_RFC7049})
}

// RegisterCborType allows to register a custom cbor type
func RegisterCborType(i interface{}) {
var entry *atlas.AtlasEntry
if ae, ok := i.(*atlas.AtlasEntry); ok {
entry = ae
} else {
entry = atlas.BuildEntry(i).StructMap().AutogenerateWithSortingScheme(atlas.KeySortMode_RFC7049).Complete()
}
atlasEntries = append(atlasEntries, entry)
cborAtlas = atlas.MustBuild(atlasEntries...).WithMapMorphism(atlas.MapMorphism{atlas.KeySortMode_RFC7049})
}

// DecodeBlock decodes a CBOR encoded Block into an IPLD Node.
//
// This method *does not* canonicalize and *will* preserve the CID. As a matter
Expand All @@ -111,7 +64,10 @@ func decodeBlock(block blocks.Block) (*Node, error) {
if err := DecodeInto(block.RawData(), &m); err != nil {
return nil, err
}
return newObject(block, m)
}

func newObject(block blocks.Block, m interface{}) (*Node, error) {
tree, err := compTree(m)
if err != nil {
return nil, err
Expand Down Expand Up @@ -154,16 +110,22 @@ func Decode(b []byte, mhType uint64, mhLen int) (*Node, error) {

// DecodeInto decodes a serialized IPLD cbor object into the given object.
func DecodeInto(b []byte, v interface{}) error {
// The cbor library really doesnt make this sort of operation easy on us
return cbor.UnmarshalAtlased(b, v, cborAtlas)
return unmarshaller.Unmarshal(b, v)
}

// WrapObject converts an arbitrary object into a Node.
func WrapObject(m interface{}, mhType uint64, mhLen int) (*Node, error) {
data, err := cbor.MarshalAtlased(m, cborAtlas)
data, err := marshaller.Marshal(m)
if err != nil {
return nil, err
}

var obj interface{}
err = cloner.Clone(m, &obj)
if err != nil {
return nil, err
}

if mhType == math.MaxUint64 {
mhType = mh.SHA2_256
}
Expand All @@ -179,9 +141,8 @@ func WrapObject(m interface{}, mhType uint64, mhLen int) (*Node, error) {
// TODO: Shouldn't this just panic?
return nil, err
}
// Do not reuse `m`. We need to re-decode it to put it in the right
// form.
return decodeBlock(block)
// No need to deserialize. We can just deep copy.
return newObject(block, obj)
}

// Resolve resolves a given path, and returns the object found at the end, as well
Expand Down Expand Up @@ -457,7 +418,7 @@ func (n *Node) MarshalJSON() ([]byte, error) {
// DumpObject marshals any object into its CBOR serialized byte representation
// TODO: rename
func DumpObject(obj interface{}) (out []byte, err error) {
return cbor.MarshalAtlased(obj, cborAtlas)
return marshaller.Marshal(obj)
}

func toSaneMap(n map[interface{}]interface{}) (interface{}, error) {
Expand Down
69 changes: 69 additions & 0 deletions refmt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package cbornode

import (
"math/big"

cid "github.com/ipfs/go-cid"

encoding "github.com/ipfs/go-ipld-cbor/encoding"

"github.com/polydawn/refmt/obj/atlas"
)

// This atlas describes the CBOR Tag (42) for IPLD links, such that refmt can marshal and unmarshal them
var cidAtlasEntry = atlas.BuildEntry(cid.Cid{}).
UseTag(CBORTagLink).
Transform().
TransformMarshal(atlas.MakeMarshalTransformFunc(
castCidToBytes,
)).
TransformUnmarshal(atlas.MakeUnmarshalTransformFunc(
castBytesToCid,
)).
Complete()

var bigIntAtlasEntry = atlas.BuildEntry(big.Int{}).Transform().
TransformMarshal(atlas.MakeMarshalTransformFunc(
func(i big.Int) ([]byte, error) {
return i.Bytes(), nil
})).
TransformUnmarshal(atlas.MakeUnmarshalTransformFunc(
func(x []byte) (big.Int, error) {
return *big.NewInt(0).SetBytes(x), nil
})).
Complete()

var cborAtlas atlas.Atlas
var cborSortingMode = atlas.KeySortMode_RFC7049
var atlasEntries = []*atlas.AtlasEntry{cidAtlasEntry, bigIntAtlasEntry}

var (
cloner encoding.PooledCloner
unmarshaller encoding.PooledUnmarshaller
marshaller encoding.PooledMarshaller
)

func init() {
rebuildAtlas()
}

func rebuildAtlas() {
cborAtlas = atlas.MustBuild(atlasEntries...).
WithMapMorphism(atlas.MapMorphism{atlas.KeySortMode_RFC7049})

marshaller = encoding.NewPooledMarshaller(cborAtlas)
unmarshaller = encoding.NewPooledUnmarshaller(cborAtlas)
cloner = encoding.NewPooledCloner(cborAtlas)
}

// RegisterCborType allows to register a custom cbor type
func RegisterCborType(i interface{}) {
var entry *atlas.AtlasEntry
if ae, ok := i.(*atlas.AtlasEntry); ok {
entry = ae
} else {
entry = atlas.BuildEntry(i).StructMap().AutogenerateWithSortingScheme(atlas.KeySortMode_RFC7049).Complete()
}
atlasEntries = append(atlasEntries, entry)
rebuildAtlas()
}
Loading