From 709384605a0a8f0bbdf5ef99bcf6108e6579f940 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 19 Apr 2016 12:32:14 -0400 Subject: [PATCH] Retry 401 unauthorized responses from registries within a window DockerHub generates JWT tokens that are valid from "now", which means fast actions can get a 401 unauthorized if the second boundary on the hub servers is not aligned. We retry 401 unauthorized requests immediately (one time), then wait a small window of time before retrying. If our operation is too slow (longer than the window) we do not retry. Probably an issue with Docker Trusted Registries as well. --- pkg/image/importer/client.go | 373 ++++++++++++++++++++++++++++ pkg/image/importer/client_test.go | 296 ++++++++++++++++++++++ pkg/image/importer/importer.go | 215 ---------------- pkg/image/importer/importer_test.go | 136 ---------- 4 files changed, 669 insertions(+), 351 deletions(-) create mode 100644 pkg/image/importer/client.go create mode 100644 pkg/image/importer/client_test.go diff --git a/pkg/image/importer/client.go b/pkg/image/importer/client.go new file mode 100644 index 000000000000..c4919f13556e --- /dev/null +++ b/pkg/image/importer/client.go @@ -0,0 +1,373 @@ +package importer + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "path" + "time" + + "github.com/golang/glog" + gocontext "golang.org/x/net/context" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/registry/api/errcode" + registryclient "github.com/docker/distribution/registry/client" + "github.com/docker/distribution/registry/client/auth" + "github.com/docker/distribution/registry/client/transport" + + kapi "k8s.io/kubernetes/pkg/api" + + "github.com/openshift/origin/pkg/dockerregistry" + "github.com/openshift/origin/pkg/image/api" + "github.com/openshift/origin/pkg/image/api/dockerpre012" +) + +// ErrNotV2Registry is returned when the server does not report itself as a V2 Docker registry +type ErrNotV2Registry struct { + Registry string +} + +func (e *ErrNotV2Registry) Error() string { + return fmt.Sprintf("endpoint %q does not support v2 API", e.Registry) +} + +// NewContext is capable of creating RepositoryRetrievers. +func NewContext(transport, insecureTransport http.RoundTripper) Context { + return Context{ + Transport: transport, + InsecureTransport: insecureTransport, + Challenges: auth.NewSimpleChallengeManager(), + } +} + +type Context struct { + Transport http.RoundTripper + InsecureTransport http.RoundTripper + Challenges auth.ChallengeManager +} + +func (c Context) WithCredentials(credentials auth.CredentialStore) RepositoryRetriever { + return &repositoryRetriever{ + context: c, + credentials: credentials, + + pings: make(map[url.URL]error), + redirect: make(map[url.URL]*url.URL), + } +} + +type repositoryRetriever struct { + context Context + credentials auth.CredentialStore + + pings map[url.URL]error + redirect map[url.URL]*url.URL +} + +func (r *repositoryRetriever) Repository(ctx gocontext.Context, registry *url.URL, repoName string, insecure bool) (distribution.Repository, error) { + t := r.context.Transport + if insecure && r.context.InsecureTransport != nil { + t = r.context.InsecureTransport + } + src := *registry + // ping the registry to get challenge headers + if err, ok := r.pings[src]; ok { + if err != nil { + return nil, err + } + if redirect, ok := r.redirect[src]; ok { + src = *redirect + } + } else { + redirect, err := r.ping(src, insecure, t) + r.pings[src] = err + if err != nil { + return nil, err + } + if redirect != nil { + r.redirect[src] = redirect + src = *redirect + } + } + + rt := transport.NewTransport( + t, + // TODO: slightly smarter authorizer that retries unauthenticated requests + // TODO: make multiple attempts if the first credential fails + auth.NewAuthorizer( + r.context.Challenges, + auth.NewTokenHandler(t, r.credentials, repoName, "pull"), + auth.NewBasicHandler(r.credentials), + ), + ) + + repo, err := registryclient.NewRepository(context.Context(ctx), repoName, src.String(), rt) + if err != nil { + return nil, err + } + return NewRetryRepository(repo, 2, 3/2*time.Second), nil +} + +func (r *repositoryRetriever) ping(registry url.URL, insecure bool, transport http.RoundTripper) (*url.URL, error) { + pingClient := &http.Client{ + Transport: transport, + Timeout: 15 * time.Second, + } + target := registry + target.Path = path.Join(target.Path, "v2") + "/" + req, err := http.NewRequest("GET", target.String(), nil) + if err != nil { + return nil, err + } + resp, err := pingClient.Do(req) + if err != nil { + if insecure && registry.Scheme == "https" { + glog.V(5).Infof("Falling back to an HTTP check for an insecure registry %s: %v", registry, err) + registry.Scheme = "http" + _, nErr := r.ping(registry, true, transport) + if nErr != nil { + return nil, nErr + } + return ®istry, nil + } + return nil, err + } + defer resp.Body.Close() + + versions := auth.APIVersions(resp, "Docker-Distribution-API-Version") + if len(versions) == 0 { + glog.V(5).Infof("Registry responded to v2 Docker endpoint, but has no header for Docker Distribution %s: %d, %#v", req.URL, resp.StatusCode, resp.Header) + return nil, &ErrNotV2Registry{Registry: registry.String()} + } + + r.context.Challenges.AddResponse(resp) + + return nil, nil +} + +func schema1ToImage(manifest *schema1.SignedManifest, d digest.Digest) (*api.Image, error) { + if len(manifest.History) == 0 { + return nil, fmt.Errorf("image has no v1Compatibility history and cannot be used") + } + dockerImage, err := unmarshalDockerImage([]byte(manifest.History[0].V1Compatibility)) + if err != nil { + return nil, err + } + if len(d) > 0 { + dockerImage.ID = d.String() + } else { + if p, err := manifest.Payload(); err == nil { + d, err := digest.FromBytes(p) + if err != nil { + return nil, fmt.Errorf("unable to create digest from image payload: %v", err) + } + dockerImage.ID = d.String() + } else { + d, err := digest.FromBytes(manifest.Raw) + if err != nil { + return nil, fmt.Errorf("unable to create digest from image bytes: %v", err) + } + dockerImage.ID = d.String() + } + } + image := &api.Image{ + ObjectMeta: kapi.ObjectMeta{ + Name: dockerImage.ID, + }, + DockerImageMetadata: *dockerImage, + DockerImageManifest: string(manifest.Raw), + DockerImageMetadataVersion: "1.0", + } + + return image, nil +} + +func schema0ToImage(dockerImage *dockerregistry.Image, id string) (*api.Image, error) { + var baseImage api.DockerImage + if err := kapi.Scheme.Convert(&dockerImage.Image, &baseImage); err != nil { + return nil, fmt.Errorf("could not convert image: %#v", err) + } + + image := &api.Image{ + ObjectMeta: kapi.ObjectMeta{ + Name: dockerImage.ID, + }, + DockerImageMetadata: baseImage, + DockerImageMetadataVersion: "1.0", + } + + return image, nil +} + +func unmarshalDockerImage(body []byte) (*api.DockerImage, error) { + var image dockerpre012.DockerImage + if err := json.Unmarshal(body, &image); err != nil { + return nil, err + } + dockerImage := &api.DockerImage{} + if err := kapi.Scheme.Convert(&image, dockerImage); err != nil { + return nil, err + } + return dockerImage, nil +} + +func isDockerError(err error, code errcode.ErrorCode) bool { + switch t := err.(type) { + case errcode.Errors: + for _, err := range t { + if isDockerError(err, code) { + return true + } + } + case errcode.ErrorCode: + if code == t { + return true + } + case errcode.Error: + if t.ErrorCode() == code { + return true + } + } + return false +} + +var nowFn = time.Now + +type retryRepository struct { + distribution.Repository + + retries int + initial *time.Time + wait time.Duration + limit time.Duration +} + +// NewRetryRepository wraps a distribution.Repository with helpers that will retry authentication failures +// over a limited time window and duration. This primarily avoids a DockerHub issue where public images +// unexpectedly return a 401 error due to the JWT token created by the hub being created at the same second, +// but another server being in the previous second. +func NewRetryRepository(repo distribution.Repository, retries int, interval time.Duration) distribution.Repository { + return &retryRepository{ + Repository: repo, + + retries: retries, + wait: interval / time.Duration(retries), + limit: interval, + } +} + +// shouldRetry returns true if the error is not an unauthorized error, if there are no retries left, or if +// we have already retried once and it has been longer than r.limit since we retried the first time. +func (r *retryRepository) shouldRetry(err error) bool { + if err == nil { + return false + } + if !isDockerError(err, errcode.ErrorCodeUnauthorized) { + return false + } + + if r.retries <= 0 { + return false + } + r.retries-- + + now := nowFn() + switch { + case r.initial == nil: + // always retry the first time immediately + r.initial = &now + case r.limit != 0 && now.Sub(*r.initial) > r.limit: + // give up retrying after the window + r.retries = 0 + default: + // don't hot loop + time.Sleep(r.wait) + } + glog.V(4).Infof("Retrying request to a v2 Docker registry after encountering error (%d attempts remaining): %v", r.retries, err) + return true +} + +// Manifests wraps the manifest service in a retryManifest for shared retries. +func (r *retryRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) { + s, err := r.Repository.Manifests(ctx, options...) + if err != nil { + return nil, err + } + return retryManifest{ManifestService: s, repo: r}, nil +} + +type retryManifest struct { + distribution.ManifestService + repo *retryRepository +} + +// Exists returns true if the manifest exists. +func (r retryManifest) Exists(dgst digest.Digest) (bool, error) { + for { + if exists, err := r.ManifestService.Exists(dgst); r.repo.shouldRetry(err) { + continue + } else { + return exists, err + } + } +} + +// Get retrieves the identified by the digest, if it exists. +func (r retryManifest) Get(dgst digest.Digest) (*schema1.SignedManifest, error) { + for { + if m, err := r.ManifestService.Get(dgst); r.repo.shouldRetry(err) { + continue + } else { + return m, err + } + } +} + +// Enumerate returns an array of manifest revisions in repository. +func (r retryManifest) Enumerate() ([]digest.Digest, error) { + for { + if d, err := r.ManifestService.Enumerate(); r.repo.shouldRetry(err) { + continue + } else { + return d, err + } + } +} + +// Tags lists the tags under the named repository. +func (r retryManifest) Tags() ([]string, error) { + for { + if t, err := r.ManifestService.Tags(); r.repo.shouldRetry(err) { + continue + } else { + return t, err + } + } +} + +// ExistsByTag returns true if the manifest exists. +func (r retryManifest) ExistsByTag(tag string) (bool, error) { + for { + if exists, err := r.ManifestService.ExistsByTag(tag); r.repo.shouldRetry(err) { + continue + } else { + return exists, err + } + } +} + +// GetByTag retrieves the named manifest, if it exists. +func (r retryManifest) GetByTag(tag string, options ...distribution.ManifestServiceOption) (*schema1.SignedManifest, error) { + for { + if m, err := r.ManifestService.GetByTag(tag, options...); r.repo.shouldRetry(err) { + continue + } else { + return m, err + } + } +} diff --git a/pkg/image/importer/client_test.go b/pkg/image/importer/client_test.go new file mode 100644 index 000000000000..444188fab803 --- /dev/null +++ b/pkg/image/importer/client_test.go @@ -0,0 +1,296 @@ +package importer + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + "time" + + gocontext "golang.org/x/net/context" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest/schema1" + "github.com/docker/distribution/registry/api/errcode" + + kapi "k8s.io/kubernetes/pkg/api" + + "github.com/openshift/origin/pkg/dockerregistry" + "github.com/openshift/origin/pkg/image/api" +) + +type mockRetriever struct { + repo distribution.Repository + insecure bool + err error +} + +func (r *mockRetriever) Repository(ctx gocontext.Context, registry *url.URL, repoName string, insecure bool) (distribution.Repository, error) { + r.insecure = insecure + return r.repo, r.err +} + +type mockRepository struct { + repoErr, getErr, getByTagErr, tagsErr, err error + + manifest *schema1.SignedManifest + tags []string +} + +func (r *mockRepository) Name() string { return "test" } + +func (r *mockRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) { + return r, r.repoErr +} +func (r *mockRepository) Blobs(ctx context.Context) distribution.BlobStore { return nil } +func (r *mockRepository) Signatures() distribution.SignatureService { return nil } +func (r *mockRepository) Exists(dgst digest.Digest) (bool, error) { + return false, r.getErr +} +func (r *mockRepository) Get(dgst digest.Digest) (*schema1.SignedManifest, error) { + return r.manifest, r.getErr +} +func (r *mockRepository) Enumerate() ([]digest.Digest, error) { + return nil, r.getErr +} +func (r *mockRepository) Delete(dgst digest.Digest) error { return fmt.Errorf("not implemented") } +func (r *mockRepository) Put(manifest *schema1.SignedManifest) error { + return fmt.Errorf("not implemented") +} +func (r *mockRepository) Tags() ([]string, error) { return r.tags, r.tagsErr } +func (r *mockRepository) ExistsByTag(tag string) (bool, error) { + return false, r.tagsErr +} +func (r *mockRepository) GetByTag(tag string, options ...distribution.ManifestServiceOption) (*schema1.SignedManifest, error) { + return r.manifest, r.getByTagErr +} + +func TestSchema1ToImage(t *testing.T) { + m := &schema1.SignedManifest{} + if err := json.Unmarshal([]byte(etcdManifest), m); err != nil { + t.Fatal(err) + } + image, err := schema1ToImage(m, digest.Digest("sha256:test")) + if err != nil { + t.Fatal(err) + } + if image.DockerImageMetadata.ID != "sha256:test" { + t.Errorf("unexpected image: %#v", image.DockerImageMetadata.ID) + } +} + +func TestDockerV1Fallback(t *testing.T) { + var uri *url.URL + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Docker-Endpoints", uri.Host) + + // get all tags + if strings.HasSuffix(r.URL.Path, "/tags") { + fmt.Fprintln(w, `{"tag1":"image1", "test":"image2"}`) + w.WriteHeader(http.StatusOK) + return + } + if strings.HasSuffix(r.URL.Path, "/images") { + fmt.Fprintln(w, `{"tag1":"image1", "test":"image2"}`) + w.WriteHeader(http.StatusOK) + return + } + if strings.HasSuffix(r.URL.Path, "/json") { + fmt.Fprintln(w, `{"ID":"image2"}`) + w.WriteHeader(http.StatusOK) + return + } + t.Logf("tried to access %s", r.URL.Path) + w.WriteHeader(http.StatusNotFound) + })) + + client := dockerregistry.NewClient(10*time.Second, false) + ctx := gocontext.WithValue(gocontext.Background(), ContextKeyV1RegistryClient, client) + + uri, _ = url.Parse(server.URL) + isi := &api.ImageStreamImport{ + Spec: api.ImageStreamImportSpec{ + Repository: &api.RepositoryImportSpec{ + From: kapi.ObjectReference{Kind: "DockerImage", Name: uri.Host + "/test:test"}, + ImportPolicy: api.TagImportPolicy{Insecure: true}, + }, + }, + } + + retriever := &mockRetriever{err: fmt.Errorf("does not support v2 API")} + im := NewImageStreamImporter(retriever, 5, nil) + if err := im.Import(ctx, isi); err != nil { + t.Fatal(err) + } + if images := isi.Status.Repository.Images; len(images) != 2 || images[0].Tag != "tag1" || images[1].Tag != "test" { + t.Errorf("unexpected images: %#v", images) + } +} + +func TestPing(t *testing.T) { + retriever := NewContext(http.DefaultTransport, http.DefaultTransport).WithCredentials(NoCredentials).(*repositoryRetriever) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + uri, _ := url.Parse(server.URL) + + _, err := retriever.ping(*uri, true, retriever.context.InsecureTransport) + if !strings.Contains(err.Error(), "does not support v2 API") { + t.Errorf("Expected ErrNotV2Registry, got %v", err) + } + + uri.Scheme = "https" + _, err = retriever.ping(*uri, true, retriever.context.InsecureTransport) + if !strings.Contains(err.Error(), "does not support v2 API") { + t.Errorf("Expected ErrNotV2Registry, got %v", err) + } +} + +func TestShouldRetry(t *testing.T) { + r := NewRetryRepository(nil, 1, 0).(*retryRepository) + + // nil error doesn't consume retries + if r.shouldRetry(nil) { + t.Fatal(r) + } + if r.retries != 1 || r.initial != nil { + t.Fatal(r) + } + + // normal error doesn't consume retries + if r.shouldRetry(fmt.Errorf("error")) { + t.Fatal(r) + } + if r.retries != 1 || r.initial != nil { + t.Fatal(r) + } + + // docker error doesn't consume retries + if r.shouldRetry(errcode.ErrorCodeDenied) { + t.Fatal(r) + } + if r.retries != 1 || r.initial != nil { + t.Fatal(r) + } + + now := time.Unix(1, 0) + nowFn = func() time.Time { + return now + } + // should retry unauthorized + r = NewRetryRepository(nil, 1, 0).(*retryRepository) + if !r.shouldRetry(errcode.ErrorCodeUnauthorized) { + t.Fatal(r) + } + if r.retries != 0 || r.initial == nil || !r.initial.Equal(now) { + t.Fatal(r) + } + if r.shouldRetry(errcode.ErrorCodeUnauthorized) { + t.Fatal(r) + } + + // should not retry unauthorized after one second + r = NewRetryRepository(nil, 2, time.Second).(*retryRepository) + if !r.shouldRetry(errcode.ErrorCodeUnauthorized) { + t.Fatal(r) + } + if r.retries != 1 || r.initial == nil || !r.initial.Equal(time.Unix(1, 0)) || r.wait != (time.Second/2) { + t.Fatal(r) + } + now = time.Unix(3, 0) + if !r.shouldRetry(errcode.ErrorCodeUnauthorized) { + t.Fatal(r) + } + if r.retries != 0 || r.initial == nil || !r.initial.Equal(time.Unix(1, 0)) || r.wait != (time.Second/2) { + t.Fatal(r) + } + if r.shouldRetry(errcode.ErrorCodeUnauthorized) { + t.Fatal(r) + } + + // should retry unauthorized within one second and preserve initial time + now = time.Unix(0, 0) + r = NewRetryRepository(nil, 2, time.Millisecond).(*retryRepository) + if !r.shouldRetry(errcode.ErrorCodeUnauthorized) { + t.Fatal(r) + } + if r.retries != 1 || r.initial == nil || !r.initial.Equal(time.Unix(0, 0)) { + t.Fatal(r) + } + now = time.Unix(0, time.Millisecond.Nanoseconds()/2) + if !r.shouldRetry(errcode.ErrorCodeUnauthorized) { + t.Fatal(r) + } + if r.retries != 0 || r.initial == nil || !r.initial.Equal(time.Unix(0, 0)) { + t.Fatal(r) + } +} + +func TestRetryFailure(t *testing.T) { + if !isDockerError(errcode.ErrorCodeUnauthorized, errcode.ErrorCodeUnauthorized) { + t.Fatal("not an error") + } + + // do not retry on Manifests() + repo := &mockRepository{repoErr: fmt.Errorf("does not support v2 API")} + r := NewRetryRepository(repo, 1, 0).(*retryRepository) + if m, err := r.Manifests(nil); m != nil || err != repo.repoErr || r.retries != 1 { + t.Fatalf("unexpected: %v %v %#v", m, err, r) + } + + // do not retry on Manifests() + repo = &mockRepository{repoErr: errcode.ErrorCodeUnauthorized} + r = NewRetryRepository(repo, 4, 0).(*retryRepository) + if m, err := r.Manifests(nil); m != nil || err != repo.repoErr || r.retries != 4 { + t.Fatalf("unexpected: %v %v %#v", m, err, r) + } + + // do not retry on non standard errors + repo = &mockRepository{getByTagErr: fmt.Errorf("does not support v2 API")} + r = NewRetryRepository(repo, 4, 0).(*retryRepository) + m, err := r.Manifests(nil) + if err != nil { + t.Fatal(err) + } + if m, err := m.GetByTag("test"); m != nil || err != repo.getByTagErr || r.retries != 4 { + t.Fatalf("unexpected: %v %v %#v", m, err, r) + } + + // retry four times + repo = &mockRepository{ + getByTagErr: errcode.ErrorCodeUnauthorized, + getErr: errcode.ErrorCodeUnauthorized, + tagsErr: errcode.ErrorCodeUnauthorized, + } + r = NewRetryRepository(repo, 4, 0).(*retryRepository) + if m, err = r.Manifests(nil); err != nil { + t.Fatal(err) + } + if m, err := m.GetByTag("test"); m != nil || err != repo.getByTagErr || r.retries != 0 { + t.Fatalf("unexpected: %v %v %#v", m, err, r) + } + r.retries = 2 + if m, err := m.Get(digest.Digest("foo")); m != nil || err != repo.getErr || r.retries != 0 { + t.Fatalf("unexpected: %v %v %#v", m, err, r) + } + r.retries = 2 + if m, err := m.Exists("foo"); m || err != repo.getErr || r.retries != 0 { + t.Fatalf("unexpected: %v %v %#v", m, err, r) + } + r.retries = 2 + if m, err := m.Enumerate(); m != nil || err != repo.getErr || r.retries != 0 { + t.Fatalf("unexpected: %v %v %#v", m, err, r) + } + r.retries = 2 + if m, err := m.ExistsByTag("foo"); m || err != repo.getErr || r.retries != 0 { + t.Fatalf("unexpected: %v %v %#v", m, err, r) + } + r.retries = 2 + if m, err := m.Tags(); m != nil || err != repo.tagsErr || r.retries != 0 { + t.Fatalf("unexpected: %v %v %#v", m, err, r) + } +} diff --git a/pkg/image/importer/importer.go b/pkg/image/importer/importer.go index a3a3dd2b4967..8b9a05d0018e 100644 --- a/pkg/image/importer/importer.go +++ b/pkg/image/importer/importer.go @@ -1,29 +1,19 @@ package importer import ( - "encoding/json" "fmt" - "net/http" "net/url" - "path" "strings" - "time" "github.com/golang/glog" gocontext "golang.org/x/net/context" "github.com/docker/distribution" - "github.com/docker/distribution/context" "github.com/docker/distribution/digest" - "github.com/docker/distribution/manifest/schema1" "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/api/errcode" "github.com/docker/distribution/registry/api/v2" - registryclient "github.com/docker/distribution/registry/client" - "github.com/docker/distribution/registry/client/auth" - "github.com/docker/distribution/registry/client/transport" - kapi "k8s.io/kubernetes/pkg/api" kapierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/util" @@ -32,7 +22,6 @@ import ( "github.com/openshift/origin/pkg/dockerregistry" "github.com/openshift/origin/pkg/image/api" - "github.com/openshift/origin/pkg/image/api/dockerpre012" ) // Add a dockerregistry.Client to the passed context with this key to support v1 Docker registry importing @@ -50,15 +39,6 @@ type RepositoryRetriever interface { Repository(ctx gocontext.Context, registry *url.URL, repoName string, insecure bool) (distribution.Repository, error) } -// ErrNotV2Registry is returned when the server does not report itself as a V2 Docker registry -type ErrNotV2Registry struct { - Registry string -} - -func (e *ErrNotV2Registry) Error() string { - return fmt.Sprintf("endpoint %q does not support v2 API", e.Registry) -} - // ImageStreamImport implements an import strategy for Docker images. It keeps a cache of images // per distinct auth context to reduce duplicate loads. This type is not thread safe. type ImageStreamImporter struct { @@ -605,198 +585,3 @@ func setImageImportStatus(images *api.ImageStreamImport, i int, err error) { func invalidStatus(position string, errs ...*field.Error) unversioned.Status { return kapierrors.NewInvalid(api.Kind(""), position, errs).(kapierrors.APIStatus).Status() } - -// NewContext is capable of creating RepositoryRetrievers. -func NewContext(transport, insecureTransport http.RoundTripper) Context { - return Context{ - Transport: transport, - InsecureTransport: insecureTransport, - Challenges: auth.NewSimpleChallengeManager(), - } -} - -type Context struct { - Transport http.RoundTripper - InsecureTransport http.RoundTripper - Challenges auth.ChallengeManager -} - -func (c Context) WithCredentials(credentials auth.CredentialStore) RepositoryRetriever { - return &repositoryRetriever{ - context: c, - credentials: credentials, - - pings: make(map[url.URL]error), - redirect: make(map[url.URL]*url.URL), - } -} - -type repositoryRetriever struct { - context Context - credentials auth.CredentialStore - - pings map[url.URL]error - redirect map[url.URL]*url.URL -} - -func (r *repositoryRetriever) Repository(ctx gocontext.Context, registry *url.URL, repoName string, insecure bool) (distribution.Repository, error) { - t := r.context.Transport - if insecure && r.context.InsecureTransport != nil { - t = r.context.InsecureTransport - } - src := *registry - // ping the registry to get challenge headers - if err, ok := r.pings[src]; ok { - if err != nil { - return nil, err - } - if redirect, ok := r.redirect[src]; ok { - src = *redirect - } - } else { - redirect, err := r.ping(src, insecure, t) - r.pings[src] = err - if err != nil { - return nil, err - } - if redirect != nil { - r.redirect[src] = redirect - src = *redirect - } - } - - rt := transport.NewTransport( - t, - // TODO: slightly smarter authorizer that retries unauthenticated requests - // TODO: make multiple attempts if the first credential fails - auth.NewAuthorizer( - r.context.Challenges, - auth.NewTokenHandler(t, r.credentials, repoName, "pull"), - auth.NewBasicHandler(r.credentials), - ), - ) - return registryclient.NewRepository(context.Context(ctx), repoName, src.String(), rt) -} - -func (r *repositoryRetriever) ping(registry url.URL, insecure bool, transport http.RoundTripper) (*url.URL, error) { - pingClient := &http.Client{ - Transport: transport, - Timeout: 15 * time.Second, - } - target := registry - target.Path = path.Join(target.Path, "v2") + "/" - req, err := http.NewRequest("GET", target.String(), nil) - if err != nil { - return nil, err - } - resp, err := pingClient.Do(req) - if err != nil { - if insecure && registry.Scheme == "https" { - glog.V(5).Infof("Falling back to an HTTP check for an insecure registry %s: %v", registry, err) - registry.Scheme = "http" - _, nErr := r.ping(registry, true, transport) - if nErr != nil { - return nil, nErr - } - return ®istry, nil - } - return nil, err - } - defer resp.Body.Close() - - versions := auth.APIVersions(resp, "Docker-Distribution-API-Version") - if len(versions) == 0 { - glog.V(5).Infof("Registry responded to v2 Docker endpoint, but has no header for Docker Distribution %s: %d, %#v", req.URL, resp.StatusCode, resp.Header) - return nil, &ErrNotV2Registry{Registry: registry.String()} - } - - r.context.Challenges.AddResponse(resp) - - return nil, nil -} - -func schema1ToImage(manifest *schema1.SignedManifest, d digest.Digest) (*api.Image, error) { - if len(manifest.History) == 0 { - return nil, fmt.Errorf("image has no v1Compatibility history and cannot be used") - } - dockerImage, err := unmarshalDockerImage([]byte(manifest.History[0].V1Compatibility)) - if err != nil { - return nil, err - } - if len(d) > 0 { - dockerImage.ID = d.String() - } else { - if p, err := manifest.Payload(); err == nil { - d, err := digest.FromBytes(p) - if err != nil { - return nil, fmt.Errorf("unable to create digest from image payload: %v", err) - } - dockerImage.ID = d.String() - } else { - d, err := digest.FromBytes(manifest.Raw) - if err != nil { - return nil, fmt.Errorf("unable to create digest from image bytes: %v", err) - } - dockerImage.ID = d.String() - } - } - image := &api.Image{ - ObjectMeta: kapi.ObjectMeta{ - Name: dockerImage.ID, - }, - DockerImageMetadata: *dockerImage, - DockerImageManifest: string(manifest.Raw), - DockerImageMetadataVersion: "1.0", - } - - return image, nil -} - -func schema0ToImage(dockerImage *dockerregistry.Image, id string) (*api.Image, error) { - var baseImage api.DockerImage - if err := kapi.Scheme.Convert(&dockerImage.Image, &baseImage); err != nil { - return nil, fmt.Errorf("could not convert image: %#v", err) - } - - image := &api.Image{ - ObjectMeta: kapi.ObjectMeta{ - Name: dockerImage.ID, - }, - DockerImageMetadata: baseImage, - DockerImageMetadataVersion: "1.0", - } - - return image, nil -} - -func unmarshalDockerImage(body []byte) (*api.DockerImage, error) { - var image dockerpre012.DockerImage - if err := json.Unmarshal(body, &image); err != nil { - return nil, err - } - dockerImage := &api.DockerImage{} - if err := kapi.Scheme.Convert(&image, dockerImage); err != nil { - return nil, err - } - return dockerImage, nil -} - -func isDockerError(err error, code errcode.ErrorCode) bool { - switch t := err.(type) { - case errcode.Errors: - for _, err := range t { - if isDockerError(err, code) { - return true - } - } - case errcode.ErrorCode: - if code == t { - return true - } - case errcode.Error: - if t.ErrorCode() == code { - return true - } - } - return false -} diff --git a/pkg/image/importer/importer_test.go b/pkg/image/importer/importer_test.go index b63b473d8fab..d5ca92c2d0d6 100644 --- a/pkg/image/importer/importer_test.go +++ b/pkg/image/importer/importer_test.go @@ -4,73 +4,17 @@ import ( "encoding/json" "fmt" "net/http" - "net/http/httptest" - "net/url" "reflect" - "strings" "testing" - "time" - gocontext "golang.org/x/net/context" - - "github.com/docker/distribution" - "github.com/docker/distribution/context" - "github.com/docker/distribution/digest" "github.com/docker/distribution/manifest/schema1" kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" - "github.com/openshift/origin/pkg/dockerregistry" "github.com/openshift/origin/pkg/image/api" ) -type mockRetriever struct { - repo distribution.Repository - insecure bool - err error -} - -func (r *mockRetriever) Repository(ctx gocontext.Context, registry *url.URL, repoName string, insecure bool) (distribution.Repository, error) { - r.insecure = insecure - return r.repo, r.err -} - -type mockRepository struct { - repoErr, getErr, getByTagErr, tagsErr, err error - - manifest *schema1.SignedManifest - tags []string -} - -func (r *mockRepository) Name() string { return "test" } - -func (r *mockRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) { - return r, r.repoErr -} -func (r *mockRepository) Blobs(ctx context.Context) distribution.BlobStore { return nil } -func (r *mockRepository) Signatures() distribution.SignatureService { return nil } -func (r *mockRepository) Exists(dgst digest.Digest) (bool, error) { - return false, fmt.Errorf("not implemented") -} -func (r *mockRepository) Get(dgst digest.Digest) (*schema1.SignedManifest, error) { - return r.manifest, r.getErr -} -func (r *mockRepository) Enumerate() ([]digest.Digest, error) { - return nil, fmt.Errorf("not implemented") -} -func (r *mockRepository) Delete(dgst digest.Digest) error { return fmt.Errorf("not implemented") } -func (r *mockRepository) Put(manifest *schema1.SignedManifest) error { - return fmt.Errorf("not implemented") -} -func (r *mockRepository) Tags() ([]string, error) { return r.tags, r.tagsErr } -func (r *mockRepository) ExistsByTag(tag string) (bool, error) { - return false, fmt.Errorf("not implemented") -} -func (r *mockRepository) GetByTag(tag string, options ...distribution.ManifestServiceOption) (*schema1.SignedManifest, error) { - return r.manifest, r.getByTagErr -} - func TestImportNothing(t *testing.T) { ctx := NewContext(http.DefaultTransport, http.DefaultTransport).WithCredentials(NoCredentials) isi := &api.ImageStreamImport{} @@ -290,83 +234,3 @@ const etcdManifest = ` } ] }` - -func TestSchema1ToImage(t *testing.T) { - m := &schema1.SignedManifest{} - if err := json.Unmarshal([]byte(etcdManifest), m); err != nil { - t.Fatal(err) - } - image, err := schema1ToImage(m, digest.Digest("sha256:test")) - if err != nil { - t.Fatal(err) - } - if image.DockerImageMetadata.ID != "sha256:test" { - t.Errorf("unexpected image: %#v", image.DockerImageMetadata.ID) - } -} - -func TestDockerV1Fallback(t *testing.T) { - var uri *url.URL - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("X-Docker-Endpoints", uri.Host) - - // get all tags - if strings.HasSuffix(r.URL.Path, "/tags") { - fmt.Fprintln(w, `{"tag1":"image1", "test":"image2"}`) - w.WriteHeader(http.StatusOK) - return - } - if strings.HasSuffix(r.URL.Path, "/images") { - fmt.Fprintln(w, `{"tag1":"image1", "test":"image2"}`) - w.WriteHeader(http.StatusOK) - return - } - if strings.HasSuffix(r.URL.Path, "/json") { - fmt.Fprintln(w, `{"ID":"image2"}`) - w.WriteHeader(http.StatusOK) - return - } - t.Logf("tried to access %s", r.URL.Path) - w.WriteHeader(http.StatusNotFound) - })) - - client := dockerregistry.NewClient(10*time.Second, false) - ctx := gocontext.WithValue(gocontext.Background(), ContextKeyV1RegistryClient, client) - - uri, _ = url.Parse(server.URL) - isi := &api.ImageStreamImport{ - Spec: api.ImageStreamImportSpec{ - Repository: &api.RepositoryImportSpec{ - From: kapi.ObjectReference{Kind: "DockerImage", Name: uri.Host + "/test:test"}, - ImportPolicy: api.TagImportPolicy{Insecure: true}, - }, - }, - } - - retriever := &mockRetriever{err: fmt.Errorf("does not support v2 API")} - im := NewImageStreamImporter(retriever, 5, nil) - if err := im.Import(ctx, isi); err != nil { - t.Fatal(err) - } - if images := isi.Status.Repository.Images; len(images) != 2 || images[0].Tag != "tag1" || images[1].Tag != "test" { - t.Errorf("unexpected images: %#v", images) - } -} - -func TestPing(t *testing.T) { - retriever := NewContext(http.DefaultTransport, http.DefaultTransport).WithCredentials(NoCredentials).(*repositoryRetriever) - - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) - uri, _ := url.Parse(server.URL) - - _, err := retriever.ping(*uri, true, retriever.context.InsecureTransport) - if !strings.Contains(err.Error(), "does not support v2 API") { - t.Errorf("Expected ErrNotV2Registry, got %v", err) - } - - uri.Scheme = "https" - _, err = retriever.ping(*uri, true, retriever.context.InsecureTransport) - if !strings.Contains(err.Error(), "does not support v2 API") { - t.Errorf("Expected ErrNotV2Registry, got %v", err) - } -}