Skip to content

Commit

Permalink
Store Jobs under batch/v1 instead of deprecated extensions/v1beta1
Browse files Browse the repository at this point in the history
  • Loading branch information
soltysh committed Jan 20, 2017
1 parent 9c346fc commit 626690c
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 51 deletions.
2 changes: 1 addition & 1 deletion pkg/cmd/server/kubernetes/master_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func BuildDefaultAPIServer(options configapi.MasterConfig) (*apiserveroptions.Se
master.DefaultAPIResourceConfigSource(),
)*/
// the order here is important, it defines which version will be used for storage
storageFactory.AddCohabitatingResources(extensions.Resource("jobs"), batch.Resource("jobs"))
storageFactory.AddCohabitatingResources(batch.Resource("jobs"), extensions.Resource("jobs"))
storageFactory.AddCohabitatingResources(extensions.Resource("horizontalpodautoscalers"), autoscaling.Resource("horizontalpodautoscalers"))

return server, storageFactory, nil
Expand Down
138 changes: 88 additions & 50 deletions test/integration/storage_versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,27 @@ import (
"path"
"testing"

"golang.org/x/net/context"

etcd "github.com/coreos/etcd/client"
testutil "github.com/openshift/origin/test/util"
testserver "github.com/openshift/origin/test/util/server"
"golang.org/x/net/context"

kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/autoscaling"
"k8s.io/kubernetes/pkg/apis/batch"
batch_v1 "k8s.io/kubernetes/pkg/apis/batch/v1"
"k8s.io/kubernetes/pkg/apis/extensions"
extensions_v1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kautoscalingclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/autoscaling/internalversion"
kbatchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
kclientset15 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/runtime"
)

// TODO: enable once storage is separable
// func TestStorageVersionsSeparated(t *testing.T) {
// runStorageTest(t, "separated",
// autoscaling_v1.SchemeGroupVersion,
// batch_v1.SchemeGroupVersion,
// extensions_v1beta1.SchemeGroupVersion,
// )
// }

func TestStorageVersionsUnified(t *testing.T) {
defer testutil.DumpEtcdOnFailure(t)
runStorageTest(t, "unified",
extensions_v1beta1.SchemeGroupVersion,
extensions_v1beta1.SchemeGroupVersion,
extensions_v1beta1.SchemeGroupVersion,
)
}
configapi "github.com/openshift/origin/pkg/cmd/server/api"
testutil "github.com/openshift/origin/test/util"
testserver "github.com/openshift/origin/test/util/server"
)

type legacyExtensionsAutoscaling struct {
kautoscalingclient.HorizontalPodAutoscalerInterface
Expand All @@ -64,30 +49,23 @@ func (c legacyExtensionsAutoscaling) Get(name string) (*autoscaling.HorizontalPo
return &result, c.client.Get().Resource("horizontalpodautoscalers").Namespace(c.namespace).Name(name).Do().Into(&result)
}

func runStorageTest(t *testing.T, ns string, autoscalingVersion, batchVersion, extensionsVersion unversioned.GroupVersion) {
etcdServer := testutil.RequireEtcd(t)
func getGVKFromEtcd(etcdClient etcd.Client, masterConfig *configapi.MasterConfig, prefix, ns, name string) (*unversioned.GroupVersionKind, error) {
keys := etcd.NewKeysAPI(etcdClient)
key := path.Join(masterConfig.EtcdStorageConfig.KubernetesStoragePrefix, prefix, ns, name)
resp, err := keys.Get(context.TODO(), key, nil)
if err != nil {
return nil, err
}
_, gvk, err := runtime.UnstructuredJSONScheme.Decode([]byte(resp.Node.Value), nil, nil)
return gvk, err
}

func setupStorageTests(t *testing.T, ns string) (*configapi.MasterConfig, kclientset.Interface, kclientset15.Interface) {
masterConfig, err := testserver.DefaultMasterOptions()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

keys := etcd.NewKeysAPI(etcdServer.Client)
getGVKFromEtcd := func(prefix, name string) (*unversioned.GroupVersionKind, error) {
key := path.Join(masterConfig.EtcdStorageConfig.KubernetesStoragePrefix, prefix, ns, name)
resp, err := keys.Get(context.TODO(), key, nil)
if err != nil {
return nil, err
}
_, gvk, err := runtime.UnstructuredJSONScheme.Decode([]byte(resp.Node.Value), nil, nil)
return gvk, err
}

// TODO: Set storage versions for API groups
// masterConfig.EtcdStorageConfig.StorageVersions[autoscaling.GroupName] = autoscalingVersion.String()
// masterConfig.EtcdStorageConfig.StorageVersions[batch.GroupName] = batchVersion.String()
// masterConfig.EtcdStorageConfig.StorageVersions[extensions.GroupName] = extensionsVersion.String()

clusterAdminKubeConfig, err := testserver.StartConfiguredMaster(masterConfig)
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand All @@ -109,15 +87,27 @@ func runStorageTest(t *testing.T, ns string, autoscalingVersion, batchVersion, e
if err != nil {
t.Fatalf("unexpected error getting project admin client: %v", err)
}
projectAdminKubeClient14 := kclientset15.NewForConfigOrDie(projectAdminKubeConfig)
projectAdminKubeClient15 := kclientset15.NewForConfigOrDie(projectAdminKubeConfig)
if err := testutil.WaitForPolicyUpdate(projectAdminClient, ns, "get", extensions.Resource("horizontalpodautoscalers"), true); err != nil {
t.Fatalf("unexpected error waiting for policy update: %v", err)
}

return masterConfig, projectAdminKubeClient, projectAdminKubeClient15
}

func TestStorageVersions(t *testing.T) {
ns := "storageversions"
autoscalingVersion := extensions_v1beta1.SchemeGroupVersion
batchVersion := batch_v1.SchemeGroupVersion

defer testutil.DumpEtcdOnFailure(t)
etcdServer := testutil.RequireEtcd(t)
masterConfig, kubeClient, kubeClient15 := setupStorageTests(t, ns)

jobTestcases := map[string]struct {
creator kbatchclient.JobInterface
}{
"batch": {creator: projectAdminKubeClient.Batch().Jobs(ns)},
"batch": {creator: kubeClient.Batch().Jobs(ns)},
}
for name, testcase := range jobTestcases {
job := batch.Job{
Expand All @@ -138,30 +128,30 @@ func runStorageTest(t *testing.T, ns string, autoscalingVersion, batchVersion, e
}

// Ensure it is persisted correctly
if gvk, err := getGVKFromEtcd("jobs", job.Name); err != nil {
if gvk, err := getGVKFromEtcd(etcdServer.Client, masterConfig, "jobs", ns, job.Name); err != nil {
t.Fatalf("%s: unexpected error reading Job: %v", name, err)
} else if *gvk != batchVersion.WithKind("Job") {
t.Fatalf("%s: expected api version %s in etcd, got %s reading Job", name, batchVersion, gvk)
}

// Ensure it is accessible from both APIs
if _, err := projectAdminKubeClient.Batch().Jobs(ns).Get(job.Name); err != nil {
if _, err := kubeClient.Batch().Jobs(ns).Get(job.Name); err != nil {
t.Errorf("%s: Error reading Job from the batch client: %#v", name, err)
}
if _, err := projectAdminKubeClient14.Extensions().Jobs(ns).Get(job.Name); err != nil {
if _, err := kubeClient15.Extensions().Jobs(ns).Get(job.Name); err != nil {
t.Errorf("%s: Error reading Job from the extensions client: %#v", name, err)
}
}

legacyClient := legacyExtensionsAutoscaling{
projectAdminKubeClient.Autoscaling().HorizontalPodAutoscalers(ns),
projectAdminKubeClient.Autoscaling().RESTClient(),
kubeClient.Autoscaling().HorizontalPodAutoscalers(ns),
kubeClient.Autoscaling().RESTClient(),
ns,
}
hpaTestcases := map[string]struct {
creator kautoscalingclient.HorizontalPodAutoscalerInterface
}{
"autoscaling": {creator: projectAdminKubeClient.Autoscaling().HorizontalPodAutoscalers(ns)},
"autoscaling": {creator: kubeClient.Autoscaling().HorizontalPodAutoscalers(ns)},
"extensions": {
creator: legacyClient,
},
Expand All @@ -181,18 +171,66 @@ func runStorageTest(t *testing.T, ns string, autoscalingVersion, batchVersion, e
}

// Make sure it is persisted correctly
if gvk, err := getGVKFromEtcd("horizontalpodautoscalers", hpa.Name); err != nil {
if gvk, err := getGVKFromEtcd(etcdServer.Client, masterConfig, "horizontalpodautoscalers", ns, hpa.Name); err != nil {
t.Fatalf("%s: unexpected error reading HPA: %v", name, err)
} else if *gvk != autoscalingVersion.WithKind("HorizontalPodAutoscaler") {
t.Fatalf("%s: expected api version %s in etcd, got %s reading HPA", name, autoscalingVersion, gvk)
}

// Make sure it is available from the api
if _, err := projectAdminKubeClient.Autoscaling().HorizontalPodAutoscalers(ns).Get(hpa.Name); err != nil {
if _, err := kubeClient.Autoscaling().HorizontalPodAutoscalers(ns).Get(hpa.Name); err != nil {
t.Errorf("%s: Error reading HPA.autoscaling from the autoscaling/v1 API: %#v", name, err)
}
if _, err := legacyClient.Get(hpa.Name); err != nil {
t.Errorf("%s: Error reading HPA.autoscaling from the extensions/v1beta1 API: %#v", name, err)
}
}
}

const extensionsv1beta1Job = `{"kind":"Job","apiVersion":"extensions/v1beta1","metadata":{"name":"extensionsv1beta1job","namespace":"storagemigration","selfLink":"/apis/batch/v1/namespaces/storagemigration/jobs/extensionsv1beta1job","uid":"4b5d9f60-dcd1-11e6-8d37-525400f25e34","creationTimestamp":"2017-01-17T16:23:35Z","labels":{"controller-uid":"4b5d9f60-dcd1-11e6-8d37-525400f25e34","job-name":"extensionsv1beta1job"}},"spec":{"parallelism":1,"completions":1,"selector":{"matchLabels":{"controller-uid":"4b5d9f60-dcd1-11e6-8d37-525400f25e34"}},"autoSelector":true,"template":{"metadata":{"creationTimestamp":null,"labels":{"controller-uid":"4b5d9f60-dcd1-11e6-8d37-525400f25e34","job-name":"extensionsv1beta1job"}},"spec":{"containers":[{"name":"containername","image":"containerimage","resources":{},"terminationMessagePath":"/dev/termination-log","imagePullPolicy":"Always"}],"restartPolicy":"Never","terminationGracePeriodSeconds":30,"dnsPolicy":"ClusterFirst","securityContext":{}}}},"status":{"startTime":"2017-01-17T16:23:35Z","active":1}}`

func TestStorageMigration(t *testing.T) {
ns := "storagemigration"
prefix := "jobs"
jobName := "extensionsv1beta1job"
batchVersion := batch_v1.SchemeGroupVersion

defer testutil.DumpEtcdOnFailure(t)
etcdServer := testutil.RequireEtcd(t)
masterConfig, kubeClient, kubeClient15 := setupStorageTests(t, ns)

// Save an extensions/v1beta1.Job directly in etcd
keys := etcd.NewKeysAPI(etcdServer.Client)
key := path.Join(masterConfig.EtcdStorageConfig.KubernetesStoragePrefix, prefix, ns, jobName)
if _, err := keys.Create(context.TODO(), key, extensionsv1beta1Job); err != nil {
t.Fatalf("Unexpected error saving extensions/v1beta1.Job: %v", err)
}

// Ensure it is accessible from both APIs
job, err := kubeClient.Batch().Jobs(ns).Get(jobName)
if err != nil {
t.Errorf("Error reading Job from the batch client: %#v", err)
}
if _, err := kubeClient15.Extensions().Jobs(ns).Get(job.Name); err != nil {
t.Errorf("Error reading Job from the extensions client: %#v", err)
}

// Update the job
job.Spec.Parallelism = newInt32(2)
if _, err := kubeClient.Batch().Jobs(ns).Update(job); err != nil {
t.Errorf("Error updating Job: %#v", err)
}

// Ensure it is persisted as batch/v1.Job
if gvk, err := getGVKFromEtcd(etcdServer.Client, masterConfig, prefix, ns, jobName); err != nil {
t.Fatalf("Unexpected error reading Job from etcd: %v", err)
} else if *gvk != batchVersion.WithKind("Job") {
t.Fatalf("Expected api version %s in etcd, got %s reading Job", batchVersion, gvk)
}
}

func newInt32(val int32) *int32 {
p := new(int32)
*p = val
return p
}

0 comments on commit 626690c

Please sign in to comment.