Skip to content

Commit

Permalink
add PartiallyFailed phase for backups, log+continue on errors
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Kriss <krisss@vmware.com>
  • Loading branch information
skriss committed Apr 25, 2019
1 parent 42f351b commit ab31ae3
Show file tree
Hide file tree
Showing 12 changed files with 350 additions and 141 deletions.
14 changes: 14 additions & 0 deletions pkg/apis/velero/v1/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ const (
// errors.
BackupPhaseCompleted BackupPhase = "Completed"

// BackupPhasePartiallyFailed means the backup has run to completion
// but encountered 1+ errors backing up individual items.
BackupPhasePartiallyFailed BackupPhase = "PartiallyFailed"

// BackupPhaseFailed means the backup ran but encountered an error that
// prevented it from completing successfully.
BackupPhaseFailed BackupPhase = "Failed"
Expand Down Expand Up @@ -191,6 +195,16 @@ type BackupStatus struct {
// VolumeSnapshotsCompleted is the total number of successfully
// completed volume snapshots for this backup.
VolumeSnapshotsCompleted int `json:"volumeSnapshotsCompleted"`

// Warnings is a count of all warning messages that were generated during
// execution of the backup. The actual warnings are in the backup's log
// file in object storage.
Warnings int `json:"warnings"`

// Errors is a count of all error messages that were generated during
// execution of the backup. The actual errors are in the backup's log
// file in object storage.
Errors int `json:"errors"`
}

// +genclient
Expand Down
25 changes: 8 additions & 17 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
kuberrs "k8s.io/apimachinery/pkg/util/errors"

api "github.com/heptio/velero/pkg/apis/velero/v1"
"github.com/heptio/velero/pkg/client"
Expand All @@ -39,7 +38,6 @@ import (
"github.com/heptio/velero/pkg/podexec"
"github.com/heptio/velero/pkg/restic"
"github.com/heptio/velero/pkg/util/collections"
kubeutil "github.com/heptio/velero/pkg/util/kube"
)

// BackupVersion is the current backup version for Velero.
Expand Down Expand Up @@ -207,17 +205,18 @@ type VolumeSnapshotterGetter interface {
}

// Backup backs up the items specified in the Backup, placing them in a gzip-compressed tar file
// written to backupFile. The finalized api.Backup is written to metadata.
func (kb *kubernetesBackupper) Backup(logger logrus.FieldLogger, backupRequest *Request, backupFile io.Writer, actions []velero.BackupItemAction, volumeSnapshotterGetter VolumeSnapshotterGetter) error {
// written to backupFile. The finalized api.Backup is written to metadata. Any error that represents
// a complete backup failure is returned. Errors that constitute partial failures (i.e. failures to
// back up individual resources that don't prevent the backup from continuing to be processed) are logged
// to the backup log.
func (kb *kubernetesBackupper) Backup(log logrus.FieldLogger, backupRequest *Request, backupFile io.Writer, actions []velero.BackupItemAction, volumeSnapshotterGetter VolumeSnapshotterGetter) error {
gzippedData := gzip.NewWriter(backupFile)
defer gzippedData.Close()

tw := tar.NewWriter(gzippedData)
defer tw.Close()

log := logger.WithField("backup", kubeutil.NamespaceAndName(backupRequest))
log.Info("Starting backup")

log.Info("Writing backup version file")
if err := kb.writeBackupVersion(tw); err != nil {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -276,21 +275,13 @@ func (kb *kubernetesBackupper) Backup(logger logrus.FieldLogger, backupRequest *
volumeSnapshotterGetter,
)

var errs []error
for _, group := range kb.discoveryHelper.Resources() {
if err := gb.backupGroup(group); err != nil {
errs = append(errs, err)
log.WithError(err).WithField("apiGroup", group.String()).Error("Error backing up API group")
}
}

err = kuberrs.Flatten(kuberrs.NewAggregate(errs))
if err == nil {
log.Infof("Backup completed successfully")
} else {
log.Infof("Backup completed with errors: %v", err)
}

return err
return nil
}

func (kb *kubernetesBackupper) writeBackupVersion(tw *tar.Writer) error {
Expand Down
70 changes: 67 additions & 3 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,64 @@ func TestBackup(t *testing.T) {
tests := []struct {
name string
backup *v1.Backup
actions []velero.BackupItemAction
expectedNamespaces *collections.IncludesExcludes
expectedResources *collections.IncludesExcludes
expectedHooks []resourceHook
backupGroupErrors map[*metav1.APIResourceList]error
expectedError error
}{
{
name: "error resolving actions returns an error",
backup: &v1.Backup{
Spec: v1.BackupSpec{
// cm - shortcut in legacy api group
// csr - shortcut in certificates.k8s.io api group
// roles - fully qualified in rbac.authorization.k8s.io api group
IncludedResources: []string{"cm", "csr", "roles"},
IncludedNamespaces: []string{"a", "b"},
ExcludedNamespaces: []string{"c", "d"},
},
},
actions: []velero.BackupItemAction{new(appliesToErrorAction)},
expectedNamespaces: collections.NewIncludesExcludes().Includes("a", "b").Excludes("c", "d"),
expectedResources: collections.NewIncludesExcludes().Includes("configmaps", "certificatesigningrequests.certificates.k8s.io", "roles.rbac.authorization.k8s.io"),
expectedHooks: []resourceHook{},
expectedError: errors.New("error calling AppliesTo"),
},
{
name: "error resolving hooks returns an error",
backup: &v1.Backup{
Spec: v1.BackupSpec{
// cm - shortcut in legacy api group
// csr - shortcut in certificates.k8s.io api group
// roles - fully qualified in rbac.authorization.k8s.io api group
IncludedResources: []string{"cm", "csr", "roles"},
IncludedNamespaces: []string{"a", "b"},
ExcludedNamespaces: []string{"c", "d"},
Hooks: v1.BackupHooks{
Resources: []v1.BackupResourceHookSpec{
{
Name: "hook-with-invalid-label-selector",
LabelSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "foo",
Operator: metav1.LabelSelectorOperator("nonexistent-operator"),
Values: []string{"bar"},
},
},
},
},
},
},
},
},
expectedNamespaces: collections.NewIncludesExcludes().Includes("a", "b").Excludes("c", "d"),
expectedResources: collections.NewIncludesExcludes().Includes("configmaps", "certificatesigningrequests.certificates.k8s.io", "roles.rbac.authorization.k8s.io"),
expectedHooks: []resourceHook{},
expectedError: errors.New("\"nonexistent-operator\" is not a valid pod selector operator"),
},
{
name: "happy path, no actions, no hooks, no errors",
backup: &v1.Backup{
Expand Down Expand Up @@ -411,7 +463,7 @@ func TestBackup(t *testing.T) {
certificatesGroup: nil,
rbacGroup: errors.New("rbac error"),
},
expectedError: errors.New("[v1 error, rbac error]"),
expectedError: nil,
},
{
name: "hooks",
Expand Down Expand Up @@ -509,7 +561,7 @@ func TestBackup(t *testing.T) {
mock.Anything, // restic backupper
mock.Anything, // pvc snapshot tracker
mock.Anything, // volume snapshotter getter
).Return(groupBackupper)
).Maybe().Return(groupBackupper)

for group, err := range test.backupGroupErrors {
groupBackupper.On("backupGroup", group).Return(err)
Expand All @@ -522,7 +574,7 @@ func TestBackup(t *testing.T) {
groupBackupperFactory: groupBackupperFactory,
}

err := kb.Backup(logging.DefaultLogger(logrus.DebugLevel), req, new(bytes.Buffer), nil, nil)
err := kb.Backup(logging.DefaultLogger(logrus.DebugLevel), req, new(bytes.Buffer), test.actions, nil)

assert.Equal(t, test.expectedNamespaces, req.NamespaceIncludesExcludes)
assert.Equal(t, test.expectedResources, req.ResourceIncludesExcludes)
Expand All @@ -538,6 +590,18 @@ func TestBackup(t *testing.T) {
}
}

// appliesToErrorAction is a backup item action that always returns
// an error when AppliesTo() is called.
type appliesToErrorAction struct{}

func (a *appliesToErrorAction) AppliesTo() (velero.ResourceSelector, error) {
return velero.ResourceSelector{}, errors.New("error calling AppliesTo")
}

func (a *appliesToErrorAction) Execute(item runtime.Unstructured, backup *v1.Backup) (runtime.Unstructured, []velero.ResourceIdentifier, error) {
panic("not implemented")
}

func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) {
groupBackupperFactory := &mockGroupBackupperFactory{}
kb := &kubernetesBackupper{
Expand Down
37 changes: 17 additions & 20 deletions pkg/backup/group_backupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
kuberrs "k8s.io/apimachinery/pkg/util/errors"

"github.com/heptio/velero/pkg/client"
"github.com/heptio/velero/pkg/discovery"
Expand Down Expand Up @@ -101,23 +100,7 @@ type defaultGroupBackupper struct {

// backupGroup backs up a single API group.
func (gb *defaultGroupBackupper) backupGroup(group *metav1.APIResourceList) error {
var (
errs []error
log = gb.log.WithField("group", group.GroupVersion)
rb = gb.resourceBackupperFactory.newResourceBackupper(
log,
gb.backupRequest,
gb.dynamicFactory,
gb.discoveryHelper,
gb.backedUpItems,
gb.cohabitatingResources,
gb.podCommandExecutor,
gb.tarWriter,
gb.resticBackupper,
gb.resticSnapshotTracker,
gb.volumeSnapshotterGetter,
)
)
log := gb.log.WithField("group", group.GroupVersion)

log.Infof("Backing up group")

Expand All @@ -132,13 +115,27 @@ func (gb *defaultGroupBackupper) backupGroup(group *metav1.APIResourceList) erro
sortCoreGroup(group)
}

rb := gb.resourceBackupperFactory.newResourceBackupper(
log,
gb.backupRequest,
gb.dynamicFactory,
gb.discoveryHelper,
gb.backedUpItems,
gb.cohabitatingResources,
gb.podCommandExecutor,
gb.tarWriter,
gb.resticBackupper,
gb.resticSnapshotTracker,
gb.volumeSnapshotterGetter,
)

for _, resource := range group.APIResources {
if err := rb.backupResource(group, resource); err != nil {
errs = append(errs, err)
log.WithError(err).WithField("resource", resource.String()).Error("Error backing up API resource")
}
}

return kuberrs.NewAggregate(errs)
return nil
}

// sortCoreGroup sorts group as a coreGroup.
Expand Down
18 changes: 5 additions & 13 deletions pkg/backup/item_backupper.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim
}
ib.backedUpItems[key] = struct{}{}

log.Info("Backing up resource")
log.Info("Backing up item")

log.Debug("Executing pre hooks")
if err := ib.itemHookHandler.handleHooks(log, groupResource, obj, ib.backupRequest.ResourceHooks, hookPhasePre); err != nil {
Expand Down Expand Up @@ -192,7 +192,6 @@ func (ib *defaultItemBackupper) backupItem(logger logrus.FieldLogger, obj runtim

updatedObj, err := ib.executeActions(log, obj, groupResource, name, namespace, metadata)
if err != nil {
log.WithError(err).Error("Error executing item actions")
backupErrs = append(backupErrs, err)

// if there was an error running actions, execute post hooks and return
Expand Down Expand Up @@ -309,11 +308,6 @@ func (ib *defaultItemBackupper) executeActions(

updatedItem, additionalItemIdentifiers, err := action.Execute(obj, ib.backupRequest.Backup)
if err != nil {
// We want this to show up in the log file at the place where the error occurs. When we return
// the error, it get aggregated with all the other ones at the end of the backup, making it
// harder to tell when it happened.
log.WithError(err).Error("error executing custom action")

return nil, errors.Wrapf(err, "error executing custom action (groupResource=%s, namespace=%s, name=%s)", groupResource.String(), namespace, name)
}
obj = updatedItem
Expand All @@ -331,7 +325,7 @@ func (ib *defaultItemBackupper) executeActions(

additionalItem, err := client.Get(additionalItem.Name, metav1.GetOptions{})
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}

if err = ib.additionalItemBackupper.backupItem(log, additionalItem, gvr.GroupResource()); err != nil {
Expand Down Expand Up @@ -393,7 +387,7 @@ func (ib *defaultItemBackupper) takePVSnapshot(obj runtime.Unstructured, log log
// of this PV. If so, don't take a snapshot.
if pv.Spec.ClaimRef != nil {
if ib.resticSnapshotTracker.Has(pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name) {
log.Info("Skipping Persistent Volume snapshot because volume has already been backed up.")
log.Info("Skipping persistent volume snapshot because volume has already been backed up with restic.")
return nil
}
}
Expand Down Expand Up @@ -433,7 +427,7 @@ func (ib *defaultItemBackupper) takePVSnapshot(obj runtime.Unstructured, log log
}

if volumeSnapshotter == nil {
log.Info("PersistentVolume is not a supported volume type for snapshots, skipping.")
log.Info("Persistent volume is not a supported volume type for snapshots, skipping.")
return nil
}

Expand All @@ -447,17 +441,15 @@ func (ib *defaultItemBackupper) takePVSnapshot(obj runtime.Unstructured, log log
log.Info("Getting volume information")
volumeType, iops, err := volumeSnapshotter.GetVolumeInfo(volumeID, pvFailureDomainZone)
if err != nil {
log.WithError(err).Error("error getting volume info")
return errors.WithMessage(err, "error getting volume info")
}

log.Info("Snapshotting PersistentVolume")
log.Info("Snapshotting persistent volume")
snapshot := volumeSnapshot(ib.backupRequest.Backup, pv.Name, volumeID, volumeType, pvFailureDomainZone, location, iops)

var errs []error
snapshotID, err := volumeSnapshotter.CreateSnapshot(snapshot.Spec.ProviderVolumeID, snapshot.Spec.VolumeAZ, tags)
if err != nil {
log.WithError(err).Error("error creating snapshot")
errs = append(errs, errors.Wrap(err, "error taking snapshot of volume"))
snapshot.Status.Phase = volume.SnapshotPhaseFailed
} else {
Expand Down
Loading

0 comments on commit ab31ae3

Please sign in to comment.