Skip to content

Commit

Permalink
feat: object streaming
Browse files Browse the repository at this point in the history
Stream attestations to and from the collection service.  grpc has a
default message size of 4MB and (from my limited research) isn't suited
well for large single messages.  Instead opting to stream many messages
is preferred.

Signed-off-by: Mikhail Swift <mikhail@testifysec.com>
  • Loading branch information
mikhailswift committed Jul 18, 2022
1 parent ff0d1e9 commit 3fbac57
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 214 deletions.
10 changes: 5 additions & 5 deletions cmd/archivist/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/testifysec/archivist-api/pkg/api/archivist"
"github.com/testifysec/archivist/internal/config"
"github.com/testifysec/archivist/internal/metadatastorage/mysqlstore"
"github.com/testifysec/archivist/internal/objectstorage/blobstore"
blob "github.com/testifysec/archivist/internal/objectstorage/blobstore"
"github.com/testifysec/archivist/internal/objectstorage/filestore"
"github.com/testifysec/archivist/internal/server"

Expand Down Expand Up @@ -105,7 +105,7 @@ func main() {
logrus.Fatalf("error initializing storage clients: %+v", err)
}

mysqlStore, mysqlStoreCh, err := mysqlstore.NewServer(ctx, cfg.SQLStoreConnectionString)
mysqlStore, mysqlStoreCh, err := mysqlstore.New(ctx, cfg.SQLStoreConnectionString)

log.FromContext(ctx).WithField("duration", time.Since(now)).Infof("completed phase 3: initializing storage clients")
// ********************************************************************************
Expand Down Expand Up @@ -166,13 +166,13 @@ func initSpiffeConnection(ctx context.Context, cfg *config.Config) []grpc.Server
return opts
}

func initObjectStore(ctx context.Context, cfg *config.Config) (archivist.CollectorServer, <-chan error, error) {
func initObjectStore(ctx context.Context, cfg *config.Config) (server.ObjectStorer, <-chan error, error) {
switch strings.ToUpper(cfg.StorageBackend) {
case "FILE":
return filestore.NewServer(ctx, cfg.FileDir, cfg.FileServeOn)
return filestore.New(ctx, cfg.FileDir, cfg.FileServeOn)

case "BLOB":
return blob.NewMinioClient(
return blob.New(
ctx,
cfg.BlobStoreEndpoint,
cfg.BlobStoreAccessKeyId,
Expand Down
19 changes: 15 additions & 4 deletions cmd/archivistctl/cmd/retrieve.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package cmd

import (
"bytes"
"context"
"io"
"os"
"strings"

"github.com/spf13/cobra"
"github.com/testifysec/archivist-api/pkg/api/archivist"
Expand Down Expand Up @@ -46,13 +46,24 @@ func init() {
}

func retrieveEnvelope(ctx context.Context, client archivist.CollectorClient, gitoid string, out io.Writer) error {
resp, err := client.Get(ctx, &archivist.GetRequest{Gitoid: gitoid})
stream, err := client.Get(ctx, &archivist.GetRequest{Gitoid: gitoid})
if err != nil {
return err
}

if _, err := io.Copy(out, strings.NewReader(resp.Object)); err != nil {
return err
for {
chunk, err := stream.Recv()
if err == io.EOF {
break
}

if err != nil {
return err
}

if _, err := io.Copy(out, bytes.NewReader(chunk.GetChunk())); err != nil {
return err
}
}

return nil
Expand Down
58 changes: 45 additions & 13 deletions cmd/archivistctl/cmd/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/spf13/cobra"
"github.com/testifysec/archivist-api/pkg/api/archivist"
"github.com/testifysec/archivist/internal/server"
"github.com/testifysec/go-witness/dsse"
)

Expand All @@ -17,21 +18,23 @@ var (
Use: "store",
Short: "stores an attestation on the archivist server",
SilenceUsage: true,
Args: cobra.ExactArgs(1),
Args: cobra.MinimumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
conn, err := newConn(archivistUrl)
defer conn.Close()
if err != nil {
return err
}

file, err := os.Open(args[0])
defer file.Close()
if err != nil {
return err
for _, filePath := range args {
if gitoid, err := storeAttestationByPath(cmd.Context(), archivist.NewCollectorClient(conn), filePath); err != nil {
return fmt.Errorf("failed to store %s: %w", filePath, err)
} else {
fmt.Printf("%s stored with gitoid %s\n", filePath, gitoid)
}
}

return storeAttestation(cmd.Context(), archivist.NewCollectorClient(conn), file)
return nil
},
}
)
Expand All @@ -40,24 +43,53 @@ func init() {
rootCmd.AddCommand(storeCmd)
}

func storeAttestation(ctx context.Context, client archivist.CollectorClient, envelope io.Reader) error {
func storeAttestationByPath(ctx context.Context, client archivist.CollectorClient, path string) (string, error) {
file, err := os.Open(path)
defer file.Close()
if err != nil {
return "", err
}

return storeAttestation(ctx, client, file)
}

func storeAttestation(ctx context.Context, client archivist.CollectorClient, envelope io.Reader) (string, error) {
objBytes, err := io.ReadAll(envelope)
if err != nil {
return err
return "", err
}

obj := &dsse.Envelope{}
if err := json.Unmarshal(objBytes, &obj); err != nil {
return err
return "", err
}

if len(obj.Payload) == 0 || obj.PayloadType == "" || len(obj.Signatures) == 0 {
return fmt.Errorf("obj is not DSSE %d %d %d", len(obj.Payload), len(obj.PayloadType), len(obj.Signatures))
return "", fmt.Errorf("obj is not DSSE %d %d %d", len(obj.Payload), len(obj.PayloadType), len(obj.Signatures))
}

if _, err := client.Store(ctx, &archivist.StoreRequest{Object: string(objBytes)}); err != nil {
return err
stream, err := client.Store(ctx)
if err != nil {
return "", err
}

chunk := &archivist.Chunk{}
for curr := 0; curr < len(objBytes); curr += server.ChunkSize {
if curr+server.ChunkSize > len(objBytes) {
chunk.Chunk = objBytes[curr:]
} else {
chunk.Chunk = objBytes[curr : curr+server.ChunkSize]
}

if err := stream.Send(chunk); err != nil {
return "", err
}
}

resp, err := stream.CloseAndRecv()
if err != nil {
return "", err
}

return nil
return resp.GetGitoid(), nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ require (
github.com/testifysec/archivist-api v0.0.0-20220707182002-b803369e93a4
github.com/testifysec/go-witness v0.1.11
google.golang.org/grpc v1.46.0
google.golang.org/protobuf v1.28.0
)

require (
Expand Down Expand Up @@ -65,5 +64,6 @@ require (
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf // indirect
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
)
83 changes: 33 additions & 50 deletions internal/metadatastorage/mysqlstore/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"ariga.io/sqlcomment"
"entgo.io/ent/dialect/sql"
"github.com/git-bom/gitbom-go"
"github.com/sirupsen/logrus"
"github.com/testifysec/archivist-api/pkg/api/archivist"
"github.com/testifysec/archivist/ent"
Expand All @@ -38,25 +37,15 @@ import (
"github.com/testifysec/go-witness/cryptoutil"
"github.com/testifysec/go-witness/dsse"
"github.com/testifysec/go-witness/intoto"
"google.golang.org/protobuf/types/known/emptypb"

_ "github.com/go-sql-driver/mysql"
)

type UnifiedStorage interface {
archivist.ArchivistServer
archivist.CollectorServer
type Store struct {
client *ent.Client
}

type store struct {
archivist.UnimplementedArchivistServer
archivist.UnimplementedCollectorServer

client *ent.Client
objectStorage archivist.CollectorServer
}

func NewServer(ctx context.Context, connectionstring string) (UnifiedStorage, <-chan error, error) {
func New(ctx context.Context, connectionstring string) (*Store, <-chan error, error) {
drv, err := sql.Open("mysql", connectionstring)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -92,13 +81,12 @@ func NewServer(ctx context.Context, connectionstring string) (UnifiedStorage, <-
logrus.WithContext(ctx).Fatalf("failed creating schema resources: %v", err)
}

return &store{
return &Store{
client: client,
}, errCh, nil
}

func (s *store) GetBySubjectDigest(request *archivist.GetBySubjectDigestRequest, server archivist.Archivist_GetBySubjectDigestServer) error {
ctx := server.Context()
func (s *Store) GetBySubjectDigest(ctx context.Context, request *archivist.GetBySubjectDigestRequest) (<-chan *archivist.GetBySubjectDigestResponse, error) {
statementPredicates := []predicate.Statement{statement.HasSubjectsWith(
subject.HasSubjectDigestsWith(
subjectdigest.And(
Expand All @@ -122,26 +110,32 @@ func (s *store) GetBySubjectDigest(request *archivist.GetBySubjectDigestRequest,
}).All(ctx)

if err != nil {
return err
return nil, err
}

for _, curDsse := range res {
response := &archivist.GetBySubjectDigestResponse{}
response.Gitoid = curDsse.GitbomSha256
response.CollectionName = curDsse.Edges.Statement.Edges.AttestationCollections.Name
for _, curAttestation := range curDsse.Edges.Statement.Edges.AttestationCollections.Edges.Attestations {
response.Attestations = append(response.Attestations, curAttestation.Type)
}
out := make(chan *archivist.GetBySubjectDigestResponse, 1)
go func() {
defer close(out)
for _, curDsse := range res {
response := &archivist.GetBySubjectDigestResponse{}
response.Gitoid = curDsse.GitbomSha256
response.CollectionName = curDsse.Edges.Statement.Edges.AttestationCollections.Name
for _, curAttestation := range curDsse.Edges.Statement.Edges.AttestationCollections.Edges.Attestations {
response.Attestations = append(response.Attestations, curAttestation.Type)
}

if err := server.Send(response); err != nil {
return err
select {
case <-ctx.Done():
return
case out <- response:
}
}
}
}()

return nil
return out, nil
}

func (s *store) withTx(ctx context.Context, fn func(tx *ent.Tx) error) error {
func (s *Store) withTx(ctx context.Context, fn func(tx *ent.Tx) error) error {
tx, err := s.client.Tx(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -173,41 +167,31 @@ type parsedCollection struct {
} `json:"attestations"`
}

func (s *store) Store(ctx context.Context, request *archivist.StoreRequest) (*emptypb.Empty, error) {
fmt.Println("STORING")

obj := request.GetObject()
func (s *Store) Store(ctx context.Context, gitoid string, obj []byte) error {
envelope := &dsse.Envelope{}
if err := json.Unmarshal([]byte(obj), envelope); err != nil {
return nil, err
if err := json.Unmarshal(obj, envelope); err != nil {
return err
}

payloadDigestSet, err := cryptoutil.CalculateDigestSetFromBytes(envelope.Payload, []crypto.Hash{crypto.SHA256})
if err != nil {
return nil, err
return err
}

payload := &intoto.Statement{}
if err := json.Unmarshal(envelope.Payload, payload); err != nil {
return nil, err
return err
}

parsedCollection := &parsedCollection{}
if err := json.Unmarshal(payload.Predicate, parsedCollection); err != nil {
return nil, err
}

// generate gitbom
gb := gitbom.NewSha256GitBom()
if err := gb.AddReference([]byte(obj), nil); err != nil {
logrus.WithContext(ctx).Errorf("gitbom tag generation failed: %+v", err)
return nil, err
return err
}

err = s.withTx(ctx, func(tx *ent.Tx) error {
dsse, err := tx.Dsse.Create().
SetPayloadType(envelope.PayloadType).
SetGitbomSha256(gb.Identity()).
SetGitbomSha256(gitoid).
Save(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -288,9 +272,8 @@ func (s *store) Store(ctx context.Context, request *archivist.StoreRequest) (*em

if err != nil {
logrus.Errorf("unable to store metadata: %+v", err)
return nil, err
return err
}

fmt.Println("metadata stored")
return &emptypb.Empty{}, nil
return nil
}
Loading

0 comments on commit 3fbac57

Please sign in to comment.