Skip to content

Commit

Permalink
Workaround for container registry push/pull errors (#21862) (#22069)
Browse files Browse the repository at this point in the history
Backport of #21862
  • Loading branch information
KN4CK3R committed Dec 10, 2022
1 parent e93a4a0 commit e23ad87
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 4 deletions.
28 changes: 28 additions & 0 deletions integrations/api_packages_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ package integrations

import (
"bytes"
"crypto/sha256"
"encoding/base64"
"fmt"
"net/http"
"strings"
"sync"
"testing"

"code.gitea.io/gitea/models/db"
Expand Down Expand Up @@ -549,6 +551,32 @@ func TestPackageContainer(t *testing.T) {
})
}

// https://github.com/go-gitea/gitea/issues/19586
t.Run("ParallelUpload", func(t *testing.T) {
defer PrintCurrentTest(t)()

url := fmt.Sprintf("%sv2/%s/parallel", setting.AppURL, user.Name)

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)

content := []byte{byte(i)}
digest := fmt.Sprintf("sha256:%x", sha256.Sum256(content))

go func() {
defer wg.Done()

req := NewRequestWithBody(t, "POST", fmt.Sprintf("%s/blobs/uploads?digest=%s", url, digest), bytes.NewReader(content))
addTokenAuthHeader(req, userToken)
resp := MakeRequest(t, req, http.StatusCreated)

assert.Equal(t, digest, resp.Header().Get("Docker-Content-Digest"))
}()
}
wg.Wait()
})

t.Run("OwnerNameChange", func(t *testing.T) {
defer PrintCurrentTest(t)()

Expand Down
7 changes: 7 additions & 0 deletions modules/packages/content_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ func (s *ContentStore) Get(key BlobHash256Key) (storage.Object, error) {
return s.store.Open(KeyToRelativePath(key))
}

// FIXME: Workaround to be removed in v1.20
// https://github.com/go-gitea/gitea/issues/19586
func (s *ContentStore) Has(key BlobHash256Key) error {
_, err := s.store.Stat(KeyToRelativePath(key))
return err
}

// Save stores a package blob
func (s *ContentStore) Save(key BlobHash256Key, r io.Reader, size int64) error {
_, err := s.store.Save(KeyToRelativePath(key), r, size)
Expand Down
31 changes: 30 additions & 1 deletion routers/api/packages/container/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ package container
import (
"context"
"encoding/hex"
"errors"
"fmt"
"os"
"strings"
"sync"

"code.gitea.io/gitea/models/db"
packages_model "code.gitea.io/gitea/models/packages"
Expand All @@ -19,6 +22,8 @@ import (
packages_service "code.gitea.io/gitea/services/packages"
)

var uploadVersionMutex sync.Mutex

// saveAsPackageBlob creates a package blob from an upload
// The uploaded blob gets stored in a special upload version to link them to the package/image
func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_service.PackageInfo) (*packages_model.PackageBlob, error) {
Expand All @@ -28,6 +33,11 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic

contentStore := packages_module.NewContentStore()

var uploadVersion *packages_model.PackageVersion

// FIXME: Replace usage of mutex with database transaction
// https://github.com/go-gitea/gitea/pull/21862
uploadVersionMutex.Lock()
err := db.WithTx(func(ctx context.Context) error {
created := true
p := &packages_model.Package{
Expand Down Expand Up @@ -68,11 +78,30 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic
}
}

uploadVersion = pv

return nil
})
uploadVersionMutex.Unlock()
if err != nil {
return nil, err
}

err = db.WithTx(func(ctx context.Context) error {
pb, exists, err = packages_model.GetOrInsertBlob(ctx, pb)
if err != nil {
log.Error("Error inserting package blob: %v", err)
return err
}
// FIXME: Workaround to be removed in v1.20
// https://github.com/go-gitea/gitea/issues/19586
if exists {
err = contentStore.Has(packages_module.BlobHash256Key(pb.HashSHA256))
if err != nil && errors.Is(err, os.ErrNotExist) {
log.Debug("Package registry inconsistent: blob %s does not exist on file system", pb.HashSHA256)
exists = false
}
}
if !exists {
if err := contentStore.Save(packages_module.BlobHash256Key(pb.HashSHA256), hsr, hsr.Size()); err != nil {
log.Error("Error saving package blob in content store: %v", err)
Expand All @@ -83,7 +112,7 @@ func saveAsPackageBlob(hsr packages_module.HashedSizeReader, pi *packages_servic
filename := strings.ToLower(fmt.Sprintf("sha256_%s", pb.HashSHA256))

pf := &packages_model.PackageFile{
VersionID: pv.ID,
VersionID: uploadVersion.ID,
BlobID: pb.ID,
Name: filename,
LowerName: filename,
Expand Down
27 changes: 24 additions & 3 deletions routers/api/packages/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"net/http"
"net/url"
"os"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -193,7 +194,7 @@ func InitiateUploadBlob(ctx *context.Context) {
mount := ctx.FormTrim("mount")
from := ctx.FormTrim("from")
if mount != "" {
blob, _ := container_model.GetContainerBlob(ctx, &container_model.BlobSearchOptions{
blob, _ := workaroundGetContainerBlob(ctx, &container_model.BlobSearchOptions{
Image: from,
Digest: mount,
})
Expand Down Expand Up @@ -361,7 +362,7 @@ func getBlobFromContext(ctx *context.Context) (*packages_model.PackageFileDescri
return nil, container_model.ErrContainerBlobNotExist
}

return container_model.GetContainerBlob(ctx, &container_model.BlobSearchOptions{
return workaroundGetContainerBlob(ctx, &container_model.BlobSearchOptions{
OwnerID: ctx.Package.Owner.ID,
Image: ctx.Params("image"),
Digest: digest,
Expand Down Expand Up @@ -503,7 +504,7 @@ func getManifestFromContext(ctx *context.Context) (*packages_model.PackageFileDe
return nil, container_model.ErrContainerBlobNotExist
}

return container_model.GetContainerBlob(ctx, opts)
return workaroundGetContainerBlob(ctx, opts)
}

// https://github.com/opencontainers/distribution-spec/blob/main/spec.md#checking-if-content-exists-in-the-registry
Expand Down Expand Up @@ -643,3 +644,23 @@ func GetTagList(ctx *context.Context) {
Tags: tags,
})
}

// FIXME: Workaround to be removed in v1.20
// https://github.com/go-gitea/gitea/issues/19586
func workaroundGetContainerBlob(ctx *context.Context, opts *container_model.BlobSearchOptions) (*packages_model.PackageFileDescriptor, error) {
blob, err := container_model.GetContainerBlob(ctx, opts)
if err != nil {
return nil, err
}

err = packages_module.NewContentStore().Has(packages_module.BlobHash256Key(blob.Blob.HashSHA256))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
log.Debug("Package registry inconsistent: blob %s does not exist on file system", blob.Blob.HashSHA256)
return nil, container_model.ErrContainerBlobNotExist
}
return nil, err
}

return blob, nil
}
11 changes: 11 additions & 0 deletions routers/api/packages/container/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package container

import (
"context"
"errors"
"fmt"
"io"
"os"
"strings"

"code.gitea.io/gitea/models/db"
Expand Down Expand Up @@ -403,6 +405,15 @@ func createManifestBlob(ctx context.Context, mci *manifestCreationInfo, pv *pack
log.Error("Error inserting package blob: %v", err)
return nil, false, "", err
}
// FIXME: Workaround to be removed in v1.20
// https://github.com/go-gitea/gitea/issues/19586
if exists {
err = packages_module.NewContentStore().Has(packages_module.BlobHash256Key(pb.HashSHA256))
if err != nil && errors.Is(err, os.ErrNotExist) {
log.Debug("Package registry inconsistent: blob %s does not exist on file system", pb.HashSHA256)
exists = false
}
}
if !exists {
contentStore := packages_module.NewContentStore()
if err := contentStore.Save(packages_module.BlobHash256Key(pb.HashSHA256), buf, buf.Size()); err != nil {
Expand Down

0 comments on commit e23ad87

Please sign in to comment.