From d8358f6a509d6b4dcbb29f019ff89aeb2fddd5f1 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 9 Jun 2016 00:12:01 -0400 Subject: [PATCH] Add a migration framework for mutable resources Implement a migration framework and an image reference migrator: oadm migrate image-references registry1.com/*=registry2.com/* --confirm Reuse resource builder to manage output --- pkg/client/images.go | 8 + pkg/cmd/admin/admin.go | 9 +- pkg/cmd/admin/migrate/images/images.go | 272 ++++++++++++++++++++ pkg/cmd/admin/migrate/images/images_test.go | 62 +++++ pkg/cmd/admin/migrate/migrate.go | 196 ++++++++++++++ pkg/cmd/cli/cli.go | 2 +- pkg/cmd/openshift/openshift.go | 4 +- pkg/image/api/helper.go | 1 + 8 files changed, 550 insertions(+), 4 deletions(-) create mode 100644 pkg/cmd/admin/migrate/images/images.go create mode 100644 pkg/cmd/admin/migrate/images/images_test.go create mode 100644 pkg/cmd/admin/migrate/migrate.go diff --git a/pkg/client/images.go b/pkg/client/images.go index 040cc01b3e3e..da282fa961fb 100644 --- a/pkg/client/images.go +++ b/pkg/client/images.go @@ -16,6 +16,7 @@ type ImageInterface interface { List(opts kapi.ListOptions) (*imageapi.ImageList, error) Get(name string) (*imageapi.Image, error) Create(image *imageapi.Image) (*imageapi.Image, error) + Update(image *imageapi.Image) (*imageapi.Image, error) Delete(name string) error } @@ -56,6 +57,13 @@ func (c *images) Create(image *imageapi.Image) (result *imageapi.Image, err erro return } +// Update alters an existingimage. Returns the server's representation of the image and error if one occurs. +func (c *images) Update(image *imageapi.Image) (result *imageapi.Image, err error) { + result = &imageapi.Image{} + err = c.r.Put().Resource("images").Name(image.Name).Body(image).Do().Into(result) + return +} + // Delete deletes an image, returns error if one occurs. func (c *images) Delete(name string) (err error) { err = c.r.Delete().Resource("images").Name(name).Do().Error() diff --git a/pkg/cmd/admin/admin.go b/pkg/cmd/admin/admin.go index 885ca950b6cb..8073bc807bc6 100644 --- a/pkg/cmd/admin/admin.go +++ b/pkg/cmd/admin/admin.go @@ -10,6 +10,8 @@ import ( "github.com/openshift/origin/pkg/cmd/admin/cert" diagnostics "github.com/openshift/origin/pkg/cmd/admin/diagnostics" "github.com/openshift/origin/pkg/cmd/admin/groups" + "github.com/openshift/origin/pkg/cmd/admin/migrate" + migrateimages "github.com/openshift/origin/pkg/cmd/admin/migrate/images" "github.com/openshift/origin/pkg/cmd/admin/node" "github.com/openshift/origin/pkg/cmd/admin/policy" "github.com/openshift/origin/pkg/cmd/admin/project" @@ -32,7 +34,7 @@ Administrative Commands Commands for managing a cluster are exposed here. Many administrative actions involve interaction with the command-line client as well.` -func NewCommandAdmin(name, fullName string, out io.Writer, errout io.Writer) *cobra.Command { +func NewCommandAdmin(name, fullName string, in io.Reader, out io.Writer, errout io.Writer) *cobra.Command { // Main command cmds := &cobra.Command{ Use: name, @@ -67,6 +69,11 @@ func NewCommandAdmin(name, fullName string, out io.Writer, errout io.Writer) *co diagnostics.NewCmdDiagnostics(diagnostics.DiagnosticsRecommendedName, fullName+" "+diagnostics.DiagnosticsRecommendedName, out), node.NewCommandManageNode(f, node.ManageNodeCommandName, fullName+" "+node.ManageNodeCommandName, out), prune.NewCommandPrune(prune.PruneRecommendedName, fullName+" "+prune.PruneRecommendedName, f, out), + migrate.NewCommandMigrate( + migrate.MigrateRecommendedName, fullName+" "+migrate.MigrateRecommendedName, f, out, + // Migration commands + migrateimages.NewCmdMigrateImageReferences("image-references", fullName+" "+migrate.MigrateRecommendedName+" image-references", f, in, out, errout), + ), }, }, { diff --git a/pkg/cmd/admin/migrate/images/images.go b/pkg/cmd/admin/migrate/images/images.go new file mode 100644 index 000000000000..c6181b5662ab --- /dev/null +++ b/pkg/cmd/admin/migrate/images/images.go @@ -0,0 +1,272 @@ +package images + +import ( + "fmt" + "io" + "os" + "strings" + + "github.com/spf13/cobra" + + kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" + "k8s.io/kubernetes/pkg/kubectl/resource" + "k8s.io/kubernetes/pkg/runtime" + + "github.com/openshift/origin/pkg/client" + "github.com/openshift/origin/pkg/cmd/admin/migrate" + cmdutil "github.com/openshift/origin/pkg/cmd/util" + "github.com/openshift/origin/pkg/cmd/util/clientcmd" + imageapi "github.com/openshift/origin/pkg/image/api" +) + +const ( + internalMigrateImagesLong = ` +Single line title + +Description body` + + internalMigrateImagesExample = `%s` +) + +type ImageReferenceMapping struct { + FromRegistry string + FromName string + ToRegistry string + ToName string +} + +func Parse(s string) (ImageReferenceMapping, error) { + parts := strings.SplitN(s, "=", 2) + from := strings.SplitN(parts[0], "/", 2) + to := strings.SplitN(parts[1], "/", 2) + if len(from) < 2 || len(to) < 2 { + return ImageReferenceMapping{}, fmt.Errorf("all arguments must be of the form REGISTRY/NAME=REGISTRY/NAME, where registry or name may be '*' or a value") + } + if len(from[0]) == 0 { + return ImageReferenceMapping{}, fmt.Errorf("%q is not a valid source: registry must be specified (may be '*')", parts[0]) + } + if len(from[1]) == 0 { + return ImageReferenceMapping{}, fmt.Errorf("%q is not a valid source: name must be specified (may be '*')", parts[0]) + } + if len(to[0]) == 0 { + return ImageReferenceMapping{}, fmt.Errorf("%q is not a valid target: registry must be specified (may be '*')", parts[1]) + } + if len(to[1]) == 0 { + return ImageReferenceMapping{}, fmt.Errorf("%q is not a valid target: name must be specified (may be '*')", parts[1]) + } + if from[0] == "*" { + from[0] = "" + } + if from[1] == "*" { + from[1] = "" + } + if to[0] == "*" { + to[0] = "" + } + if to[1] == "*" { + to[1] = "" + } + if to[0] == "" && to[1] == "" { + return ImageReferenceMapping{}, fmt.Errorf("%q is not a valid target: at least one change must be specified", parts[1]) + } + if from[0] == from[1] && to[0] == to[1] { + return ImageReferenceMapping{}, fmt.Errorf("%q is not valid: must target at least one field to change", s) + } + return ImageReferenceMapping{ + FromRegistry: from[0], + FromName: from[1], + ToRegistry: to[0], + ToName: to[1], + }, nil +} + +type ImageReferenceMappings []ImageReferenceMapping + +func (m ImageReferenceMappings) Map(in string) string { + ref, err := imageapi.ParseDockerImageReference(in) + if err != nil { + return in + } + for _, mapping := range m { + registry := ref.Registry + if len(registry) == 0 { + registry = "docker.io" + } + if len(mapping.FromRegistry) > 0 && mapping.FromRegistry != registry { + continue + } + name := ref.RepositoryName() + if len(mapping.FromName) > 0 && mapping.FromName != name { + continue + } + if len(mapping.ToRegistry) > 0 { + ref.Registry = mapping.ToRegistry + } + if len(mapping.ToName) > 0 { + ref.Namespace = "" + ref.Name = mapping.ToName + } + return ref.Exact() + } + return in +} + +type MigrateImageReferenceOptions struct { + migrate.ResourceOptions + + Client client.Interface + Mappings ImageReferenceMappings +} + +// NewCmdMigrateImageReferences implements a MigrateImages command +// This is an example type for templating. +func NewCmdMigrateImageReferences(name, fullName string, f *clientcmd.Factory, in io.Reader, out, errout io.Writer) *cobra.Command { + options := &MigrateImageReferenceOptions{ + ResourceOptions: migrate.ResourceOptions{ + In: in, + Out: out, + ErrOut: errout, + }, + } + cmd := &cobra.Command{ + Use: fmt.Sprintf("%s REGISTRY/NAME=REGISTRY/NAME [...]", name), + Short: "A short description", + Long: internalMigrateImagesLong, + Example: fmt.Sprintf(internalMigrateImagesExample, fullName), + Run: func(cmd *cobra.Command, args []string) { + kcmdutil.CheckErr(options.Complete(f, cmd, args)) + kcmdutil.CheckErr(options.Validate()) + if err := options.Run(); err != nil { + // TODO: move met to kcmdutil + if err == cmdutil.ErrExit { + os.Exit(1) + } + kcmdutil.CheckErr(err) + } + }, + } + options.ResourceOptions.Bind(cmd) + + return cmd +} + +func (o *MigrateImageReferenceOptions) Complete(f *clientcmd.Factory, c *cobra.Command, args []string) error { + var remainingArgs []string + for _, s := range args { + if !strings.Contains(s, "=") { + remainingArgs = append(remainingArgs, s) + continue + } + mapping, err := Parse(s) + if err != nil { + return err + } + o.Mappings = append(o.Mappings, mapping) + } + + o.ResourceOptions.SaveFn = o.save + if err := o.ResourceOptions.Complete(f, c, remainingArgs); err != nil { + return err + } + o.Builder.ResourceTypes("imagestream", "image") + + osclient, _, err := f.Clients() + if err != nil { + return err + } + o.Client = osclient + + return nil +} + +func (o *MigrateImageReferenceOptions) Validate() error { + if len(o.Mappings) == 0 { + return fmt.Errorf("at least one mapping argument must be specified: REGISTRY/NAME=REGISTRY/NAME") + } + return o.ResourceOptions.Validate() +} + +func (o *MigrateImageReferenceOptions) Run() error { + return o.ResourceOptions.Visit(func(info *resource.Info) (migrate.Reporter, error) { + return transformImageReferences(info.Object, o.Mappings.Map) + }) +} + +// save invokes the API to alter an object +func (o *MigrateImageReferenceOptions) save(info *resource.Info, reporter migrate.Reporter) error { + switch t := info.Object.(type) { + case *imageapi.Image: + _, err := o.Client.Images().Update(t) + return err + case *imageapi.ImageStream: + if reporter.(imageChangeInfo).status { + updated, err := o.Client.ImageStreams(t.Namespace).UpdateStatus(t) + if err != nil { + return err + } + info.Refresh(updated, true) + return migrate.ErrRecalculate + } + if reporter.(imageChangeInfo).spec { + updated, err := o.Client.ImageStreams(t.Namespace).Update(t) + if err != nil { + return err + } + info.Refresh(updated, true) + } + return nil + default: + return fmt.Errorf("resource %q does not have a save method implemented (%T)", info.Mapping.Resource, t) + } + return nil +} + +type reporter bool + +func (r reporter) Changed() bool { + return bool(r) +} + +// imageChangeInfo indicates whether the spec or status of an image stream was changed +type imageChangeInfo struct { + spec, status bool +} + +func (i imageChangeInfo) Changed() bool { + return i.spec || i.status +} + +// transformImageReferences checks image references on the provided object and returns either a reporter (indicating +// that the object was recognized and whether it was updated) or an error. +func transformImageReferences(obj runtime.Object, fn func(s string) string) (migrate.Reporter, error) { + switch t := obj.(type) { + case *imageapi.Image: + var changed bool + if updated := fn(t.DockerImageReference); updated != t.DockerImageReference { + changed = true + t.DockerImageReference = updated + } + return reporter(changed), nil + case *imageapi.ImageStream: + var info imageChangeInfo + for _, ref := range t.Spec.Tags { + if ref.From == nil || ref.From.Kind != "DockerImage" { + continue + } + if updated := fn(ref.From.Name); updated != ref.From.Name { + info.spec = true + ref.From.Name = updated + } + } + for _, events := range t.Status.Tags { + for i, event := range events.Items { + if updated := fn(event.DockerImageReference); updated != event.DockerImageReference { + info.status = true + events.Items[i].DockerImageReference = updated + } + } + } + return info, nil + } + return nil, nil +} diff --git a/pkg/cmd/admin/migrate/images/images_test.go b/pkg/cmd/admin/migrate/images/images_test.go new file mode 100644 index 000000000000..95a23b504b90 --- /dev/null +++ b/pkg/cmd/admin/migrate/images/images_test.go @@ -0,0 +1,62 @@ +package images + +import ( + "testing" +) + +func TestImageReferenceMappingsMap(t *testing.T) { + testCases := []struct { + mappings ImageReferenceMappings + results map[string]string + }{ + { + mappings: ImageReferenceMappings{{FromRegistry: "docker.io", ToRegistry: "index.docker.io"}}, + results: map[string]string{ + "mysql": "index.docker.io/mysql", + "mysql:latest": "index.docker.io/mysql:latest", + "default/mysql:latest": "index.docker.io/default/mysql:latest", + + "mysql@sha256:b2f400f4a5e003b0543decf61a0a010939f3fba07bafa226f11ed7b5f1e81237": "index.docker.io/mysql@sha256:b2f400f4a5e003b0543decf61a0a010939f3fba07bafa226f11ed7b5f1e81237", + + "docker.io/mysql": "index.docker.io/library/mysql", + "docker.io/mysql:latest": "index.docker.io/library/mysql:latest", + "docker.io/default/mysql:latest": "index.docker.io/default/mysql:latest", + + "docker.io/mysql@sha256:b2f400f4a5e003b0543decf61a0a010939f3fba07bafa226f11ed7b5f1e81237": "index.docker.io/library/mysql@sha256:b2f400f4a5e003b0543decf61a0a010939f3fba07bafa226f11ed7b5f1e81237", + }, + }, + { + mappings: ImageReferenceMappings{{FromName: "test/other", ToRegistry: "another.registry"}}, + results: map[string]string{ + "test/other": "another.registry/test/other", + "test/other:latest": "another.registry/test/other:latest", + "myregistry.com/test/other:latest": "another.registry/test/other:latest", + + "myregistry.com/b/test/other:latest": "myregistry.com/b/test/other:latest", + }, + }, + { + mappings: ImageReferenceMappings{{FromName: "test/other", ToName: "other/test"}}, + results: map[string]string{ + "test/other": "other/test", + "test/other:latest": "other/test:latest", + "myregistry.com/test/other:latest": "myregistry.com/other/test:latest", + + "test/other/b:latest": "test/other/b:latest", + + // TODO: this is possibly wrong with V2 and latest daemon + "b/test/other:latest": "b/other/test:latest", + }, + }, + } + + for i, test := range testCases { + for in, out := range test.results { + result := test.mappings.Map(in) + if result != out { + t.Errorf("%d: expect %s -> %s, got %q", i, in, out, result) + continue + } + } + } +} diff --git a/pkg/cmd/admin/migrate/migrate.go b/pkg/cmd/admin/migrate/migrate.go new file mode 100644 index 000000000000..d6f1d48bbc13 --- /dev/null +++ b/pkg/cmd/admin/migrate/migrate.go @@ -0,0 +1,196 @@ +package migrate + +import ( + "fmt" + "io" + "text/tabwriter" + + "github.com/golang/glog" + "github.com/spf13/cobra" + + "k8s.io/kubernetes/pkg/kubectl/resource" + + cmdutil "github.com/openshift/origin/pkg/cmd/util" + "github.com/openshift/origin/pkg/cmd/util/clientcmd" +) + +const MigrateRecommendedName = "migrate" + +const migrateLong = `Migrate resources on the cluster + +These commands assist administrators in performing preventative maintenance on a cluster.` + +func NewCommandMigrate(name, fullName string, f *clientcmd.Factory, out io.Writer, cmds ...*cobra.Command) *cobra.Command { + // Parent command to which all subcommands are added. + cmd := &cobra.Command{ + Use: name, + Short: "Migrate data in the cluster", + Long: migrateLong, + Run: cmdutil.DefaultSubCommandRun(out), + } + cmd.AddCommand(cmds...) + return cmd +} + +type ResourceOptions struct { + In io.Reader + Out, ErrOut io.Writer + + Confirm bool + Builder *resource.Builder + SaveFn MigrateSaveFunc +} + +func (o *ResourceOptions) Bind(c *cobra.Command) { + c.Flags().BoolVar(&o.Confirm, "confirm", false, "If false (the default) no resources will be updated") +} + +func (o *ResourceOptions) Complete(f *clientcmd.Factory, c *cobra.Command, args []string) error { + o.Builder = f.Factory.NewBuilder(false).AllNamespaces(true).ContinueOnError().RequireObject(true).SelectAllParam(true).Flatten() + return nil +} + +func (o *ResourceOptions) Validate() error { return nil } + +var ErrRecalculate = fmt.Errorf("recalculate migration") + +type migrateTracker struct { + errOut io.Writer + migrateFn MigrateVisitFunc + saveFn MigrateSaveFunc + + found, ignored, unchanged, errors int + retries int +} + +type attemptResult int + +const ( + attemptResultSuccess attemptResult = iota + attemptResultError attemptResult = iota + attemptResultUnchanged attemptResult = iota + attemptResultIgnore attemptResult = iota +) + +func (t *migrateTracker) report(prefix string, info *resource.Info, err error) { + if err != nil { + fmt.Fprintf(t.errOut, "%s:\t%s/%s/%s: %v\n", prefix, info.Mapping.Resource, info.Namespace, info.Name, err) + } else { + fmt.Fprintf(t.errOut, "%s:\t%s/%s/%s\n", prefix, info.Mapping.Resource, info.Namespace, info.Name) + } +} + +func (t *migrateTracker) attempt(info *resource.Info, retries int) { + t.found++ + t.retries = retries + result, err := t.try(info) + switch { + case err != nil: + t.errors++ + t.report("error", info, err) + case result == attemptResultIgnore: + t.ignored++ + if glog.V(2) { + t.report("ignored", info, nil) + } + case result == attemptResultUnchanged: + t.unchanged++ + if glog.V(2) { + t.report("unchanged", info, nil) + } + case result == attemptResultSuccess: + if glog.V(1) { + if t.saveFn != nil { + t.report("migrated", info, nil) + } else { + t.report("migrated (DRY RUN)", info, nil) + } + } + } +} + +func (t *migrateTracker) try(info *resource.Info) (attemptResult, error) { + reporter, err := t.migrateFn(info) + if err != nil { + return attemptResultError, err + } + if reporter == nil { + return attemptResultIgnore, nil + } + if !reporter.Changed() { + return attemptResultUnchanged, nil + } + if t.saveFn != nil { + switch err := t.saveFn(info, reporter); { + case err == ErrRecalculate: + t2 := *t + t2.retries-- + if t2.retries > 0 { + fmt.Fprintf(t2.errOut, "retry:\t%s/%s/%s\n", info.Mapping.Resource, info.Namespace, info.Name) + result, err := t2.try(info) + switch result { + case attemptResultUnchanged, attemptResultIgnore: + result = attemptResultSuccess + } + return result, err + } + fallthrough + case err != nil: + return attemptResultError, err + } + } + return attemptResultSuccess, nil +} + +func (o *ResourceOptions) Visit(fn MigrateVisitFunc) error { + saveFn := o.SaveFn + if !o.Confirm { + saveFn = nil + } + + result := o.Builder.Do() + if result.Err() != nil { + return result.Err() + } + + out := tabwriter.NewWriter(o.ErrOut, 1, 8, 1, ' ', 0) + defer out.Flush() + + t := migrateTracker{ + errOut: out, + migrateFn: fn, + saveFn: saveFn, + } + + err := result.Visit(func(info *resource.Info, err error) error { + if err != nil { + t.found++ + t.report("error", info, err) + return nil + } + t.attempt(info, 10) + if t.found%10 == 0 { + out.Flush() + } + return nil + }) + + fmt.Fprintf(out, "summary:\ttotal=%d errors=%d ignored=%d unchanged=%d migrated=%d\n", t.found, t.errors, t.ignored, t.unchanged, t.found-t.errors-t.unchanged-t.ignored) + + switch { + case err != nil: + fmt.Fprintf(out, "error: exited without processing all resources: %v\n", err) + err = cmdutil.ErrExit + case t.errors > 0: + fmt.Fprintf(out, "error: %d resources failed to migrate\n", t.errors) + err = cmdutil.ErrExit + } + return err +} + +type MigrateSaveFunc func(info *resource.Info, reporter Reporter) error +type MigrateVisitFunc func(info *resource.Info) (Reporter, error) + +type Reporter interface { + Changed() bool +} diff --git a/pkg/cmd/cli/cli.go b/pkg/cmd/cli/cli.go index cfaad6e00a34..30a2bd00e25a 100644 --- a/pkg/cmd/cli/cli.go +++ b/pkg/cmd/cli/cli.go @@ -147,7 +147,7 @@ func NewCommandCLI(name, fullName string, in io.Reader, out, errout io.Writer) * { Message: "Advanced Commands:", Commands: []*cobra.Command{ - admin.NewCommandAdmin("adm", fullName+" "+"adm", out, errout), + admin.NewCommandAdmin("adm", fullName+" "+"adm", in, out, errout), cmd.NewCmdCreate(fullName, f, out), cmd.NewCmdReplace(fullName, f, out), cmd.NewCmdApply(fullName, f, out), diff --git a/pkg/cmd/openshift/openshift.go b/pkg/cmd/openshift/openshift.go index e72f6c8f22e7..de86c89714c1 100644 --- a/pkg/cmd/openshift/openshift.go +++ b/pkg/cmd/openshift/openshift.go @@ -69,7 +69,7 @@ func CommandFor(basename string) *cobra.Command { case "oc", "osc": cmd = cli.NewCommandCLI(basename, basename, in, out, errout) case "oadm", "osadm": - cmd = admin.NewCommandAdmin(basename, basename, out, errout) + cmd = admin.NewCommandAdmin(basename, basename, in, out, errout) case "kubectl": cmd = cli.NewCmdKubectl(basename, out) case "kube-apiserver": @@ -111,7 +111,7 @@ func NewCommandOpenShift(name string) *cobra.Command { startAllInOne, _ := start.NewCommandStartAllInOne(name, out) root.AddCommand(startAllInOne) - root.AddCommand(admin.NewCommandAdmin("admin", name+" admin", out, errout)) + root.AddCommand(admin.NewCommandAdmin("admin", name+" admin", in, out, errout)) root.AddCommand(cli.NewCommandCLI("cli", name+" cli", in, out, errout)) root.AddCommand(cli.NewCmdKubectl("kube", out)) root.AddCommand(newExperimentalCommand("ex", name+" ex")) diff --git a/pkg/image/api/helper.go b/pkg/image/api/helper.go index 8bead9ba78a6..4c4670d3f385 100644 --- a/pkg/image/api/helper.go +++ b/pkg/image/api/helper.go @@ -169,6 +169,7 @@ func ParseDockerImageReference(spec string) (DockerImageReference, error) { ref.ID = id break default: + // TODO: this is no longer true with V2 return ref, fmt.Errorf("the docker pull spec %q must be two or three segments separated by slashes", spec) }