Skip to content

Commit

Permalink
fix(file.Store): fix race condition on restoring the same named conte…
Browse files Browse the repository at this point in the history
…nt (#731)

Fix #730

Signed-off-by: Shiwei Zhang <shizh@microsoft.com>
  • Loading branch information
shizhMSFT committed Mar 21, 2024
1 parent 8f9f505 commit 9b6f321
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 3 deletions.
15 changes: 12 additions & 3 deletions content/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ func (s *Store) push(ctx context.Context, expected ocispec.Descriptor, content i
return nil
}

// restoreDuplicates restores successor files with same content but different names.
// restoreDuplicates restores successor files with same content but different
// names.
// See Store.ForceCAS for more info.
func (s *Store) restoreDuplicates(ctx context.Context, desc ocispec.Descriptor) error {
successors, err := content.Successors(ctx, s, desc)
Expand All @@ -310,8 +311,16 @@ func (s *Store) restoreDuplicates(ctx context.Context, desc ocispec.Descriptor)
return fmt.Errorf("%q: %s: %w", name, desc.MediaType, err)
}
return nil
}(); err != nil && !errors.Is(err, errdef.ErrNotFound) {
return err
}(); err != nil {
switch {
case errors.Is(err, errdef.ErrNotFound):
// allow pushing manifests before blobs
case errors.Is(err, ErrDuplicateName):
// in case multiple goroutines are pushing or restoring the same
// named content, the error is ignored
default:
return err
}
}
}
return nil
Expand Down
142 changes: 142 additions & 0 deletions content/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/content/memory"
"oras.land/oras-go/v2/errdef"
"oras.land/oras-go/v2/internal/cas"
"oras.land/oras-go/v2/internal/descriptor"
"oras.land/oras-go/v2/internal/spec"
)
Expand Down Expand Up @@ -1613,6 +1614,147 @@ func TestStore_File_Push_RestoreDuplicates_NotFound(t *testing.T) {
}
}

type storageMock struct {
content.Storage

OnFetch func(ctx context.Context, desc ocispec.Descriptor) error
}

func (m *storageMock) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
if m.OnFetch != nil {
if err := m.OnFetch(ctx, desc); err != nil {
return nil, err
}
}
return m.Storage.Fetch(ctx, desc)
}

func TestStore_File_Push_RestoreDuplicates_DuplicateName(t *testing.T) {
mediaType := "test"
content := []byte("hello world")
blobDesc := ocispec.Descriptor{
MediaType: mediaType,
Digest: digest.FromBytes(content),
Size: int64(len(content)),
Annotations: map[string]string{
ocispec.AnnotationTitle: "blob",
},
}
config := []byte("{}")
configDesc := ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageConfig,
Digest: digest.FromBytes(config),
Size: int64(len(config)),
}
manifest := ocispec.Manifest{
MediaType: ocispec.MediaTypeImageManifest,
Config: configDesc,
Layers: []ocispec.Descriptor{blobDesc},
}
manifestJSON, err := json.Marshal(manifest)
if err != nil {
t.Fatal("json.Marshal() error =", err)
}
manifestDesc := ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageManifest,
Digest: digest.FromBytes(manifestJSON),
Size: int64(len(manifestJSON)),
}
tempDir := t.TempDir()
fallbackMock := &storageMock{
Storage: cas.NewMemory(),
}
s, err := NewWithFallbackStorage(tempDir, fallbackMock)
if err != nil {
t.Fatal("NewWithFallbackStorage() error =", err)
}
defer s.Close()
ctx := context.Background()

// push blob as unnamed
if err := fallbackMock.Push(ctx, blobDesc, bytes.NewReader(content)); err != nil {
t.Fatal("Store.Push() error =", err)
}
// push manifest
fallbackMock.OnFetch = func(ctx context.Context, desc ocispec.Descriptor) error {
if desc.Digest == blobDesc.Digest {
// push blob before being restored by manifest put to simulate
// concurrent pushing for race condition
if err := s.Push(ctx, blobDesc, bytes.NewReader(content)); err != nil {
t.Fatal("Store.Push() error =", err)
}
}
return nil
}
if err := s.Push(ctx, manifestDesc, bytes.NewReader(manifestJSON)); err != nil {
t.Fatal("Store.Push() error =", err)
}

// verify blob is restored
got, err := os.ReadFile(filepath.Join(tempDir, "blob"))
if err != nil {
t.Fatal("os.ReadFile() error =", err)
}
if !bytes.Equal(got, content) {
t.Errorf("os.ReadFile() = %v, want %v", got, content)
}
}

func TestStore_File_Push_RestoreDuplicates_Failure(t *testing.T) {
mediaType := "test"
content := []byte("hello world")
blobDesc := ocispec.Descriptor{
MediaType: mediaType,
Digest: digest.FromBytes(content),
Size: int64(len(content)),
Annotations: map[string]string{
ocispec.AnnotationTitle: "blob",
},
}
config := []byte("{}")
configDesc := ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageConfig,
Digest: digest.FromBytes(config),
Size: int64(len(config)),
}
manifest := ocispec.Manifest{
MediaType: ocispec.MediaTypeImageManifest,
Config: configDesc,
Layers: []ocispec.Descriptor{blobDesc},
}
manifestJSON, err := json.Marshal(manifest)
if err != nil {
t.Fatal("json.Marshal() error =", err)
}
manifestDesc := ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageManifest,
Digest: digest.FromBytes(manifestJSON),
Size: int64(len(manifestJSON)),
}
tempDir := t.TempDir()
fallbackMock := &storageMock{
Storage: cas.NewMemory(),
}
s, err := NewWithFallbackStorage(tempDir, fallbackMock)
if err != nil {
t.Fatal("NewWithFallbackStorage() error =", err)
}
defer s.Close()
ctx := context.Background()

// push manifest
wantErr := errors.New("restoreDuplicates: fetch error")
fallbackMock.OnFetch = func(ctx context.Context, desc ocispec.Descriptor) error {
if desc.Digest == blobDesc.Digest {
return wantErr
}
return nil
}
if err := s.Push(ctx, manifestDesc, bytes.NewReader(manifestJSON)); !errors.Is(err, wantErr) {
t.Fatalf("Store.Push() error = %v, wantErr %v", err, wantErr)
}
}

func TestStore_File_Fetch_SameDigest_NoName(t *testing.T) {
mediaType := "test"
content := []byte("hello world")
Expand Down

0 comments on commit 9b6f321

Please sign in to comment.