Skip to content

Commit

Permalink
Update CRI image pull to use transfer service
Browse files Browse the repository at this point in the history
Signed-off-by: Tony Fang <nhfang@amazon.com>
  • Loading branch information
fangn2 committed Jun 12, 2023
1 parent dc60137 commit 88d975b
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 7 deletions.
13 changes: 13 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
sandboxsapi "github.com/containerd/containerd/api/services/sandbox/v1"
snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
"github.com/containerd/containerd/api/services/tasks/v1"
transferapi "github.com/containerd/containerd/api/services/transfer/v1"
versionservice "github.com/containerd/containerd/api/services/version/v1"
apitypes "github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/containers"
Expand All @@ -51,6 +52,8 @@ import (
leasesproxy "github.com/containerd/containerd/leases/proxy"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/dialer"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/pkg/transfer/proxy"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
ptypes "github.com/containerd/containerd/protobuf/types"
Expand Down Expand Up @@ -724,6 +727,16 @@ func (c *Client) SandboxController() sandbox.Controller {
return sandboxproxy.NewSandboxController(sandboxsapi.NewControllerClient(c.conn))
}

// TransferService returns the underlying transfer service client
func (c *Client) TransferService() transfer.Transferrer {
if c.transferService != nil {
return c.transferService
}
c.connMu.Lock()
defer c.connMu.Unlock()
return proxy.NewTransferrer(transferapi.NewTransferClient(c.conn), c.streamCreator())
}

// VersionService returns the underlying VersionClient
func (c *Client) VersionService() versionservice.VersionClient {
c.connMu.Lock()
Expand Down
2 changes: 2 additions & 0 deletions pkg/cri/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ type PluginConfig struct {
// The string is in the golang duration format, see:
// https://golang.org/pkg/time/#ParseDuration
ImagePullProgressTimeout string `toml:"image_pull_progress_timeout" json:"imagePullProgressTimeout"`
// ImagePullWithTransferService specifies whether to pull image using transfer service otherwise it uses client library
ImagePullWithTransferService bool `toml:"image_pull_with_transfer_service" json:"imagePullWithTransferService"`
// DrainExecSyncIOTimeout is the maximum duration to wait for ExecSync
// API' IO EOF event after exec init process exits. A zero value means
// there is no timeout.
Expand Down
9 changes: 5 additions & 4 deletions pkg/cri/config/config_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ func DefaultConfig() PluginConfig {
ImageDecryption: ImageDecryption{
KeyModel: KeyModelNode,
},
EnableCDI: false,
CDISpecDirs: []string{"/etc/cdi", "/var/run/cdi"},
ImagePullProgressTimeout: time.Minute.String(),
DrainExecSyncIOTimeout: "0s",
EnableCDI: false,
CDISpecDirs: []string{"/etc/cdi", "/var/run/cdi"},
ImagePullProgressTimeout: time.Minute.String(),
ImagePullWithTransferService: true,
DrainExecSyncIOTimeout: "0s",
}
}
5 changes: 3 additions & 2 deletions pkg/cri/config/config_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func DefaultConfig() PluginConfig {
ImageDecryption: ImageDecryption{
KeyModel: KeyModelNode,
},
ImagePullProgressTimeout: time.Minute.String(),
DrainExecSyncIOTimeout: "0s",
ImagePullProgressTimeout: time.Minute.String(),
ImagePullWithTransferService: true,
DrainExecSyncIOTimeout: "0s",
}
}
1 change: 1 addition & 0 deletions pkg/cri/cri.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func init() {
plugin.EventPlugin,
plugin.ServicePlugin,
plugin.NRIApiPlugin,
plugin.TransferPlugin,
},
InitFn: initCRIService,
})
Expand Down
60 changes: 59 additions & 1 deletion pkg/cri/server/image_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ import (
"github.com/containerd/containerd/pkg/cri/annotations"
criconfig "github.com/containerd/containerd/pkg/cri/config"
snpkg "github.com/containerd/containerd/pkg/snapshotters"
transferimage "github.com/containerd/containerd/pkg/transfer/image"
"github.com/containerd/containerd/pkg/transfer/registry"
"github.com/containerd/containerd/platforms"
distribution "github.com/containerd/containerd/reference/docker"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/config"
Expand Down Expand Up @@ -134,6 +137,7 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
Headers: c.config.Registry.Headers,
Hosts: c.registryHosts(ctx, r.GetAuth(), pullReporter.optionUpdateClient),
})
image containerd.Image
isSchema1 bool
imageHandler containerdimages.HandlerFunc = func(_ context.Context,
desc imagespec.Descriptor) ([]imagespec.Descriptor, error) {
Expand All @@ -145,6 +149,7 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
)

defer pcancel()

snapshotter, err := c.snapshotterFromPodSandboxConfig(ctx, ref, r.SandboxConfig)
if err != nil {
return nil, err
Expand All @@ -154,6 +159,32 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
tracing.Attribute("image.ref", ref),
tracing.Attribute("snapshotter.name", snapshotter),
)

// Pull image using transfer service
if c.config.PluginConfig.ImagePullWithTransferService {

var sopts []transferimage.StoreOpt

sopts = append(sopts, transferimage.WithPlatforms(platforms.DefaultSpec()))
sopts = append(sopts, transferimage.WithUnpack(platforms.DefaultSpec(), snapshotter))
sopts = append(sopts, transferimage.WithImageLabels(map[string]string{imageLabelKey: imageLabelValue}))

ch, _ := newcriCredentials(ctx, ref, r.GetAuth())
reg := registry.NewOCIRegistry(ref, nil, ch)
is := transferimage.NewStore(ref, sopts...)

// pf, done := ProgressHandler(ctx, os.Stdout)
// defer done()
//Todo handle procress
err = c.client.TransferService().Transfer(ctx, reg, is)
if err != nil {
return nil, fmt.Errorf("failed to pull and unpack image %q: %w", ref, err)
}

return &runtime.PullImageResponse{ImageRef: ref}, nil

}

pullOpts := []containerd.RemoteOpt{
containerd.WithSchema1Conversion, //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
containerd.WithResolver(resolver),
Expand All @@ -180,7 +211,7 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
}

pullReporter.start(pctx)
image, err := c.client.Pull(pctx, ref, pullOpts...)
image, err = c.client.Pull(pctx, ref, pullOpts...)
pcancel()
if err != nil {
return nil, fmt.Errorf("failed to pull and unpack image %q: %w", ref, err)
Expand Down Expand Up @@ -762,3 +793,30 @@ func (c *criService) snapshotterFromPodSandboxConfig(ctx context.Context, imageR
log.G(ctx).Infof("experimental: PullImage %q for runtime %s, using snapshotter %s", imageRef, runtimeHandler, snapshotter)
return snapshotter, nil
}

func newcriCredentials(ctx context.Context, ref string, auth *runtime.AuthConfig) (registry.CredentialHelper, error) {
return &criCredentials{
ref: ref,
authConfig: auth,
}, nil
}

type criCredentials struct {
ref string
authConfig *runtime.AuthConfig
}

// GetCredentials gets credential from criCredentials makes criCredentials a registry.CredentialHelper
func (cc *criCredentials) GetCredentials(ctx context.Context, ref string, host string) (registry.Credentials, error) {
if ref == cc.ref {
username, secret, err := ParseAuth(cc.authConfig, host)
if err != nil {
return registry.Credentials{
Host: host,
Username: username,
Secret: secret,
}, nil
}
}
return registry.Credentials{}, nil
}
12 changes: 12 additions & 0 deletions services.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/sandbox"
srv "github.com/containerd/containerd/services"
Expand All @@ -50,6 +51,7 @@ type services struct {
introspectionService introspection.Service
sandboxStore sandbox.Store
sandboxController sandbox.Controller
transferService transfer.Transferrer
}

// ServicesOpt allows callers to set options on the services
Expand Down Expand Up @@ -177,6 +179,13 @@ func WithSandboxController(client sandbox.Controller) ServicesOpt {
}
}

// WithTransferService sets the transfer service.
func WithTransferService(tr transfer.Transferrer) ServicesOpt {
return func(s *services) {
s.transferService = tr
}
}

// WithInMemoryServices is suitable for cases when there is need to use containerd's client from
// another (in-memory) containerd plugin (such as CRI).
func WithInMemoryServices(ic *plugin.InitContext) ClientOpt {
Expand All @@ -195,6 +204,9 @@ func WithInMemoryServices(ic *plugin.InitContext) ClientOpt {
plugin.SandboxControllerPlugin: func(i interface{}) ServicesOpt {
return WithSandboxController(i.(sandbox.Controller))
},
plugin.TransferPlugin: func(i interface{}) ServicesOpt {
return WithTransferService(i.(transfer.Transferrer))
},
} {
i, err := ic.Get(t)
if err != nil {
Expand Down

0 comments on commit 88d975b

Please sign in to comment.