Skip to content

Commit

Permalink
Verify manifest with remote layers
Browse files Browse the repository at this point in the history
Signed-off-by: Gladkov Alexey <agladkov@redhat.com>
  • Loading branch information
legionus committed Feb 22, 2017
1 parent 678ff3d commit ecc925b
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 191 deletions.
28 changes: 21 additions & 7 deletions pkg/dockerregistry/server/blobdescriptorservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
imageapi "github.com/openshift/origin/pkg/image/api"
)

// digestSHA256GzippedEmptyTar is the canonical sha256 digest of gzippedEmptyTar
const digestSHA256GzippedEmptyTar = digest.Digest("sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4")

// ByGeneration allows for sorting tag events from latest to oldest.
type ByGeneration []*imageapi.TagEvent

Expand Down Expand Up @@ -65,18 +68,29 @@ func (bs *blobDescriptorService) Stat(ctx context.Context, dgst digest.Digest) (

context.GetLogger(ctx).Debugf("could not stat layer link %q in repository %q: %v", dgst.String(), repo.Named().Name(), err)

// verify the blob is stored locally
// First attempt: looking for the blob locally
desc, err = dockerRegistry.BlobStatter().Stat(ctx, dgst)
if err != nil {
return desc, err
if err == nil {
// only non-empty layers is wise to check for existence in the image stream.
// schema v2 has no empty layers.
if dgst != digestSHA256GzippedEmptyTar {
// ensure it's referenced inside of corresponding image stream
if !imageStreamHasBlob(repo, dgst) {
return distribution.Descriptor{}, distribution.ErrBlobUnknown
}
}
return desc, nil
}

// ensure it's referenced inside of corresponding image stream
if imageStreamHasBlob(repo, dgst) {
return desc, nil
if err == distribution.ErrBlobUnknown {
// Second attempt: looking for the blob on a remote server
remoteGetter, found := RemoteBlobGetterFrom(ctx)
if found {
desc, err = remoteGetter.Stat(ctx, dgst)
}
}

return distribution.Descriptor{}, distribution.ErrBlobUnknown
return desc, err
}

func (bs *blobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
Expand Down
205 changes: 35 additions & 170 deletions pkg/dockerregistry/server/pullthroughblobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,15 @@ import (
"github.com/docker/distribution"
"github.com/docker/distribution/context"
"github.com/docker/distribution/digest"

"k8s.io/kubernetes/pkg/api/errors"

imageapi "github.com/openshift/origin/pkg/image/api"
"github.com/openshift/origin/pkg/image/importer"
)

// pullthroughBlobStore wraps a distribution.BlobStore and allows remote repositories to serve blobs from remote
// repositories.
type pullthroughBlobStore struct {
distribution.BlobStore

repo *repository
digestToStore map[string]distribution.BlobStore
pullFromInsecureRegistries bool
mirror bool
repo *repository
mirror bool
}

var _ distribution.BlobStore = &pullthroughBlobStore{}
Expand All @@ -45,76 +38,13 @@ func (r *pullthroughBlobStore) Stat(ctx context.Context, dgst digest.Digest) (di
return desc, err
}

return r.remoteStat(ctx, dgst)
}

// remoteStat attempts to find requested blob in candidate remote repositories and if found, it updates
// digestToRepository store. ErrBlobUnknown will be returned if not found.
func (r *pullthroughBlobStore) remoteStat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
// look up the potential remote repositories that this blob could be part of (at this time,
// we don't know which image in the image stream surfaced the content).
is, err := r.repo.getImageStream()
if err != nil {
if errors.IsNotFound(err) || errors.IsForbidden(err) {
return distribution.Descriptor{}, distribution.ErrBlobUnknown
}
context.GetLogger(ctx).Errorf("Error retrieving image stream for blob: %v", err)
return distribution.Descriptor{}, err
}

r.pullFromInsecureRegistries = false

if insecure, ok := is.Annotations[imageapi.InsecureRepositoryAnnotation]; ok {
r.pullFromInsecureRegistries = insecure == "true"
}

var localRegistry string
if local, err := imageapi.ParseDockerImageReference(is.Status.DockerImageRepository); err == nil {
// TODO: normalize further?
localRegistry = local.Registry
}

retriever := r.repo.importContext()
cached := r.repo.cachedLayers.RepositoriesForDigest(dgst)

// look at the first level of tagged repositories first
search := identifyCandidateRepositories(is, localRegistry, true)
if desc, err := r.findCandidateRepository(ctx, search, cached, dgst, retriever); err == nil {
return desc, nil
}

// look at all other repositories tagged by the server
secondary := identifyCandidateRepositories(is, localRegistry, false)
for k := range search {
delete(secondary, k)
}
if desc, err := r.findCandidateRepository(ctx, secondary, cached, dgst, retriever); err == nil {
return desc, nil
}

return distribution.Descriptor{}, distribution.ErrBlobUnknown
}

// proxyStat attempts to locate the digest in the provided remote repository or returns an error. If the digest is found,
// r.digestToStore saves the store.
func (r *pullthroughBlobStore) proxyStat(ctx context.Context, retriever importer.RepositoryRetriever, ref imageapi.DockerImageReference, dgst digest.Digest) (distribution.Descriptor, error) {
context.GetLogger(ctx).Infof("Trying to stat %q from %q", dgst, ref.Exact())
repo, err := retriever.Repository(ctx, ref.RegistryURL(), ref.RepositoryName(), r.pullFromInsecureRegistries)
if err != nil {
context.GetLogger(ctx).Errorf("Error getting remote repository for image %q: %v", ref.Exact(), err)
return distribution.Descriptor{}, err
}
pullthroughBlobStore := repo.Blobs(ctx)
desc, err := pullthroughBlobStore.Stat(ctx, dgst)
if err != nil {
if err != distribution.ErrBlobUnknown {
context.GetLogger(ctx).Errorf("Error getting pullthroughBlobStore for image %q: %v", ref.Exact(), err)
}
return distribution.Descriptor{}, err
remoteGetter, found := RemoteBlobGetterFrom(r.repo.ctx)
if !found {
context.GetLogger(ctx).Errorf("pullthroughBlobStore.Stat: failed to retrieve remote getter from context")
return distribution.Descriptor{}, distribution.ErrBlobUnknown
}

r.digestToStore[dgst.String()] = pullthroughBlobStore
return desc, nil
return remoteGetter.Stat(ctx, dgst)
}

// ServeBlob attempts to serve the requested digest onto w, using a remote proxy store if necessary.
Expand All @@ -123,130 +53,65 @@ func (r *pullthroughBlobStore) proxyStat(ctx context.Context, retriever importer
// success response with no actual body content.
// [1] https://docs.docker.com/registry/spec/api/#existing-layers
func (pbs *pullthroughBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, req *http.Request, dgst digest.Digest) error {
store, ok := pbs.digestToStore[dgst.String()]
if !ok {
return pbs.BlobStore.ServeBlob(ctx, w, req, dgst)
// This call should be done without BlobGetterService in the context.
err := pbs.BlobStore.ServeBlob(ctx, w, req, dgst)
switch {
case err == distribution.ErrBlobUnknown:
// continue on to the code below and look up the blob in a remote store since it is not in
// the local store
case err != nil:
context.GetLogger(ctx).Errorf("Failed to find blob %q: %#v", dgst.String(), err)
fallthrough
default:
return err
}

remoteGetter, found := RemoteBlobGetterFrom(pbs.repo.ctx)
if !found {
context.GetLogger(ctx).Errorf("pullthroughBlobStore.ServeBlob: failed to retrieve remote getter from context")
return distribution.ErrBlobUnknown
}

// store the content locally if requested, but ensure only one instance at a time
// is storing to avoid excessive local writes
if pbs.mirror {
mu.Lock()
if _, ok = inflight[dgst]; ok {
if _, ok := inflight[dgst]; ok {
mu.Unlock()
context.GetLogger(ctx).Infof("Serving %q while mirroring in background", dgst)
_, err := pbs.copyContent(store, ctx, dgst, w, req)
_, err := pbs.copyContent(remoteGetter, ctx, dgst, w, req)
return err
}
inflight[dgst] = struct{}{}
mu.Unlock()

go func(dgst digest.Digest) {
context.GetLogger(ctx).Infof("Start background mirroring of %q", dgst)
if err := pbs.storeLocal(store, ctx, dgst); err != nil {
if err := pbs.storeLocal(remoteGetter, ctx, dgst); err != nil {
context.GetLogger(ctx).Errorf("Error committing to storage: %s", err.Error())
}
context.GetLogger(ctx).Infof("Completed mirroring of %q", dgst)
}(dgst)
}

_, err := pbs.copyContent(store, ctx, dgst, w, req)
_, err = pbs.copyContent(remoteGetter, ctx, dgst, w, req)
return err
}

// Get attempts to fetch the requested blob by digest using a remote proxy store if necessary.
func (r *pullthroughBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
store, ok := r.digestToStore[dgst.String()]
if ok {
return store.Get(ctx, dgst)
}

data, originalErr := r.BlobStore.Get(ctx, dgst)
if originalErr == nil {
return data, nil
}

desc, err := r.remoteStat(ctx, dgst)
if err != nil {
context.GetLogger(ctx).Errorf("failed to stat blob %q in remote repositories: %v", dgst.String(), err)
return nil, originalErr
}
store, ok = r.digestToStore[desc.Digest.String()]
if !ok {
remoteGetter, found := RemoteBlobGetterFrom(r.repo.ctx)
if !found {
context.GetLogger(ctx).Errorf("pullthroughBlobStore.Get: failed to retrieve remote getter from context")
return nil, originalErr
}
return store.Get(ctx, desc.Digest)
}

// findCandidateRepository looks in search for a particular blob, referring to previously cached items
func (r *pullthroughBlobStore) findCandidateRepository(ctx context.Context, search map[string]*imageapi.DockerImageReference, cachedLayers []string, dgst digest.Digest, retriever importer.RepositoryRetriever) (distribution.Descriptor, error) {
// no possible remote locations to search, exit early
if len(search) == 0 {
return distribution.Descriptor{}, distribution.ErrBlobUnknown
}

// see if any of the previously located repositories containing this digest are in this
// image stream
for _, repo := range cachedLayers {
ref, ok := search[repo]
if !ok {
continue
}
desc, err := r.proxyStat(ctx, retriever, *ref, dgst)
if err != nil {
delete(search, repo)
continue
}
context.GetLogger(ctx).Infof("Found digest location from cache %q in %q", dgst, repo)
return desc, nil
}

// search the remaining registries for this digest
for repo, ref := range search {
desc, err := r.proxyStat(ctx, retriever, *ref, dgst)
if err != nil {
continue
}
r.repo.cachedLayers.RememberDigest(dgst, r.repo.blobrepositorycachettl, repo)
context.GetLogger(ctx).Infof("Found digest location by search %q in %q", dgst, repo)
return desc, nil
}

return distribution.Descriptor{}, distribution.ErrBlobUnknown
}

// identifyCandidateRepositories returns a map of remote repositories referenced by this image stream.
func identifyCandidateRepositories(is *imageapi.ImageStream, localRegistry string, primary bool) map[string]*imageapi.DockerImageReference {
// identify the canonical location of referenced registries to search
search := make(map[string]*imageapi.DockerImageReference)
for _, tagEvent := range is.Status.Tags {
var candidates []imageapi.TagEvent
if primary {
if len(tagEvent.Items) == 0 {
continue
}
candidates = tagEvent.Items[:1]
} else {
if len(tagEvent.Items) <= 1 {
continue
}
candidates = tagEvent.Items[1:]
}
for _, event := range candidates {
ref, err := imageapi.ParseDockerImageReference(event.DockerImageReference)
if err != nil {
continue
}
// skip anything that matches the innate registry
// TODO: there may be a better way to make this determination
if len(localRegistry) != 0 && localRegistry == ref.Registry {
continue
}
ref = ref.DockerClientDefaults()
search[ref.AsRepository().Exact()] = &ref
}
}
return search
return remoteGetter.Get(ctx, dgst)
}

// setResponseHeaders sets the appropriate content serving headers
Expand All @@ -264,7 +129,7 @@ var mu sync.Mutex

// copyContent attempts to load and serve the provided blob. If req != nil and writer is an instance of http.ResponseWriter,
// response headers will be set and range requests honored.
func (pbs *pullthroughBlobStore) copyContent(store distribution.BlobStore, ctx context.Context, dgst digest.Digest, writer io.Writer, req *http.Request) (distribution.Descriptor, error) {
func (pbs *pullthroughBlobStore) copyContent(store BlobGetterService, ctx context.Context, dgst digest.Digest, writer io.Writer, req *http.Request) (distribution.Descriptor, error) {
desc, err := store.Stat(ctx, dgst)
if err != nil {
return distribution.Descriptor{}, err
Expand Down Expand Up @@ -292,7 +157,7 @@ func (pbs *pullthroughBlobStore) copyContent(store distribution.BlobStore, ctx c
}

// storeLocal retrieves the named blob from the provided store and writes it into the local store.
func (pbs *pullthroughBlobStore) storeLocal(store distribution.BlobStore, ctx context.Context, dgst digest.Digest) error {
func (pbs *pullthroughBlobStore) storeLocal(remoteGetter BlobGetterService, ctx context.Context, dgst digest.Digest) error {
defer func() {
mu.Lock()
delete(inflight, dgst)
Expand All @@ -308,7 +173,7 @@ func (pbs *pullthroughBlobStore) storeLocal(store distribution.BlobStore, ctx co
return err
}

desc, err = pbs.copyContent(store, ctx, dgst, bw, nil)
desc, err = pbs.copyContent(remoteGetter, ctx, dgst, bw, nil)
if err != nil {
return err
}
Expand Down
36 changes: 25 additions & 11 deletions pkg/dockerregistry/server/pullthroughblobstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ func TestPullthroughServeBlob(t *testing.T) {
blobDigest: blob1Desc.Digest,
localBlobs: blob2Storage,
expectedContentLength: int64(len(blob1Content)),
expectedLocalCalls: map[string]int{"Stat": 1},
expectedLocalCalls: map[string]int{
"Stat": 1,
"ServeBlob": 1,
},
},

{
Expand All @@ -152,7 +155,10 @@ func TestPullthroughServeBlob(t *testing.T) {
blobDigest: blob1Desc.Digest,
expectedContentLength: int64(len(blob1Content)),
expectedBytesServed: int64(len(blob1Content)),
expectedLocalCalls: map[string]int{"Stat": 1},
expectedLocalCalls: map[string]int{
"Stat": 1,
"ServeBlob": 1,
},
},

{
Expand All @@ -169,17 +175,25 @@ func TestPullthroughServeBlob(t *testing.T) {
if err != nil {
t.Fatal(err)
}
repo := &repository{
ctx: ctx,
namespace: "user",
name: "app",
pullthrough: true,
cachedLayers: cachedLayers,
registryOSClient: client,
}

rbs := &remoteBlobGetterService{
repo: repo,
digestToStore: make(map[string]distribution.BlobStore),
}

ctx = WithRemoteBlobGetter(ctx, rbs)

ptbs := &pullthroughBlobStore{
BlobStore: localBlobStore,
repo: &repository{
ctx: ctx,
namespace: "user",
name: "app",
pullthrough: true,
cachedLayers: cachedLayers,
registryOSClient: client,
},
digestToStore: make(map[string]distribution.BlobStore),
repo: repo,
}

req, err := http.NewRequest(tc.method, fmt.Sprintf("http://example.org/v2/user/app/blobs/%s", tc.blobDigest), nil)
Expand Down
Loading

0 comments on commit ecc925b

Please sign in to comment.