Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libct/cg/sd: reconnect and retry on dbus connection error #2923

Merged
merged 5 commits into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 50 additions & 34 deletions libcontainer/cgroups/systemd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package systemd

import (
"bufio"
"context"
"fmt"
"math"
"os"
Expand All @@ -29,10 +28,6 @@ const (
)

var (
connOnce sync.Once
connDbus *systemdDbus.Conn
connErr error

versionOnce sync.Once
version int

Expand Down Expand Up @@ -292,19 +287,6 @@ func generateDeviceProperties(rules []*devices.Rule) ([]systemdDbus.Property, er
return properties, nil
}

// getDbusConnection lazy initializes systemd dbus connection
// and returns it
func getDbusConnection(rootless bool) (*systemdDbus.Conn, error) {
connOnce.Do(func() {
if rootless {
connDbus, connErr = NewUserSystemdDbus()
} else {
connDbus, connErr = systemdDbus.NewWithContext(context.TODO())
}
})
return connDbus, connErr
}

func newProp(name string, units interface{}) systemdDbus.Property {
return systemdDbus.Property{
Name: name,
Expand All @@ -320,19 +302,29 @@ func getUnitName(c *configs.Cgroup) string {
return c.Name
}

// isUnitExists returns true if the error is that a systemd unit already exists.
func isUnitExists(err error) bool {
// isDbusError returns true if the error is a specific dbus error.
func isDbusError(err error, name string) bool {
if err != nil {
if dbusError, ok := err.(dbus.Error); ok {
return strings.Contains(dbusError.Name, "org.freedesktop.systemd1.UnitExists")
var derr *dbus.Error
if errors.As(err, &derr) {
return strings.Contains(derr.Name, name)
}
}
return false
}

func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []systemdDbus.Property) error {
// isUnitExists returns true if the error is that a systemd unit already exists.
func isUnitExists(err error) bool {
return isDbusError(err, "org.freedesktop.systemd1.UnitExists")
}

func startUnit(cm *dbusConnManager, unitName string, properties []systemdDbus.Property) error {
statusChan := make(chan string, 1)
if _, err := dbusConnection.StartTransientUnit(unitName, "replace", properties, statusChan); err == nil {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
_, err := c.StartTransientUnit(unitName, "replace", properties, statusChan)
return err
})
if err == nil {
timeout := time.NewTimer(30 * time.Second)
defer timeout.Stop()

Expand All @@ -341,11 +333,11 @@ func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []s
close(statusChan)
// Please refer to https://godoc.org/github.com/coreos/go-systemd/dbus#Conn.StartUnit
if s != "done" {
dbusConnection.ResetFailedUnit(unitName)
resetFailedUnit(cm, unitName)
return errors.Errorf("error creating systemd unit `%s`: got `%s`", unitName, s)
}
case <-timeout.C:
dbusConnection.ResetFailedUnit(unitName)
resetFailedUnit(cm, unitName)
return errors.New("Timeout waiting for systemd to create " + unitName)
}
} else if !isUnitExists(err) {
Expand All @@ -355,9 +347,13 @@ func startUnit(dbusConnection *systemdDbus.Conn, unitName string, properties []s
return nil
}

func stopUnit(dbusConnection *systemdDbus.Conn, unitName string) error {
func stopUnit(cm *dbusConnManager, unitName string) error {
statusChan := make(chan string, 1)
if _, err := dbusConnection.StopUnit(unitName, "replace", statusChan); err == nil {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
_, err := c.StopUnit(unitName, "replace", statusChan)
return err
})
if err == nil {
select {
case s := <-statusChan:
close(statusChan)
Expand All @@ -372,10 +368,30 @@ func stopUnit(dbusConnection *systemdDbus.Conn, unitName string) error {
return nil
}

func systemdVersion(conn *systemdDbus.Conn) int {
func resetFailedUnit(cm *dbusConnManager, name string) {
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
return c.ResetFailedUnit(name)
})
if err != nil {
logrus.Warnf("unable to reset failed unit: %v", err)
}
}

func setUnitProperties(cm *dbusConnManager, name string, properties ...systemdDbus.Property) error {
return cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
return c.SetUnitProperties(name, true, properties...)
})
}

func systemdVersion(cm *dbusConnManager) int {
versionOnce.Do(func() {
version = -1
verStr, err := conn.GetManagerProperty("Version")
var verStr string
err := cm.retryOnDisconnect(func(c *systemdDbus.Conn) error {
var err error
verStr, err = c.GetManagerProperty("Version")
return err
})
if err == nil {
version, err = systemdVersionAtoi(verStr)
}
Expand Down Expand Up @@ -403,10 +419,10 @@ func systemdVersionAtoi(verStr string) (int, error) {
return ver, errors.Wrapf(err, "can't parse version %s", verStr)
}

func addCpuQuota(conn *systemdDbus.Conn, properties *[]systemdDbus.Property, quota int64, period uint64) {
func addCpuQuota(cm *dbusConnManager, properties *[]systemdDbus.Property, quota int64, period uint64) {
if period != 0 {
// systemd only supports CPUQuotaPeriodUSec since v242
sdVer := systemdVersion(conn)
sdVer := systemdVersion(cm)
if sdVer >= 242 {
*properties = append(*properties,
newProp("CPUQuotaPeriodUSec", period))
Expand Down Expand Up @@ -437,13 +453,13 @@ func addCpuQuota(conn *systemdDbus.Conn, properties *[]systemdDbus.Property, quo
}
}

func addCpuset(conn *systemdDbus.Conn, props *[]systemdDbus.Property, cpus, mems string) error {
func addCpuset(cm *dbusConnManager, props *[]systemdDbus.Property, cpus, mems string) error {
if cpus == "" && mems == "" {
return nil
}

// systemd only supports AllowedCPUs/AllowedMemoryNodes since v244
sdVer := systemdVersion(conn)
sdVer := systemdVersion(cm)
if sdVer < 244 {
logrus.Debugf("systemd v%d is too old to support AllowedCPUs/AllowedMemoryNodes"+
" (settings will still be applied to cgroupfs)", sdVer)
Expand Down
90 changes: 90 additions & 0 deletions libcontainer/cgroups/systemd/dbus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// +build linux

package systemd

import (
"context"
"sync"

systemdDbus "github.com/coreos/go-systemd/v22/dbus"
dbus "github.com/godbus/dbus/v5"
)

type dbusConnManager struct {
conn *systemdDbus.Conn
rootless bool
sync.RWMutex
}

// newDbusConnManager initializes systemd dbus connection manager.
func newDbusConnManager(rootless bool) *dbusConnManager {
return &dbusConnManager{
rootless: rootless,
}
}

// getConnection lazily initializes and returns systemd dbus connection.
func (d *dbusConnManager) getConnection() (*systemdDbus.Conn, error) {
// In the case where d.conn != nil
// Use the read lock the first time to ensure
// that Conn can be acquired at the same time.
d.RLock()
if conn := d.conn; conn != nil {
d.RUnlock()
return conn, nil
}
d.RUnlock()
Comment on lines +31 to +36
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be

d.RLock()
conn := d.conn
d.RUnlock()
if conn != nil { ... }

But it doesn't really matter.


// In the case where d.conn == nil
// Use write lock to ensure that only one
// will be created
d.Lock()
defer d.Unlock()
if conn := d.conn; conn != nil {
return conn, nil
}

conn, err := d.newConnection()
if err != nil {
return nil, err
}
d.conn = conn
return conn, nil
}

func (d *dbusConnManager) newConnection() (*systemdDbus.Conn, error) {
if d.rootless {
return newUserSystemdDbus()
}
return systemdDbus.NewWithContext(context.TODO())
}

// resetConnection resets the connection to its initial state
// (so it can be reconnected if necessary).
func (d *dbusConnManager) resetConnection(conn *systemdDbus.Conn) {
d.Lock()
defer d.Unlock()
if d.conn != nil && d.conn == conn {
d.conn.Close()
d.conn = nil
}
}

var errDbusConnClosed = dbus.ErrClosed.Error()

// retryOnDisconnect calls op, and if the error it returns is about closed dbus
// connection, the connection is re-established and the op is retried. This helps
// with the situation when dbus is restarted and we have a stale connection.
func (d *dbusConnManager) retryOnDisconnect(op func(*systemdDbus.Conn) error) error {
for {
conn, err := d.getConnection()
if err != nil {
return err
}
err = op(conn)
if !isDbusError(err, errDbusConnClosed) {
return err
}
d.resetConnection(conn)
}
}
4 changes: 2 additions & 2 deletions libcontainer/cgroups/systemd/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/pkg/errors"
)

// NewUserSystemdDbus creates a connection for systemd user-instance.
func NewUserSystemdDbus() (*systemdDbus.Conn, error) {
// newUserSystemdDbus creates a connection for systemd user-instance.
func newUserSystemdDbus() (*systemdDbus.Conn, error) {
addr, err := DetectUserDbusSessionBusAddress()
if err != nil {
return nil, err
Expand Down
29 changes: 9 additions & 20 deletions libcontainer/cgroups/systemd/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ type legacyManager struct {
mu sync.Mutex
cgroups *configs.Cgroup
paths map[string]string
dbus *dbusConnManager
}

func NewLegacyManager(cg *configs.Cgroup, paths map[string]string) cgroups.Manager {
return &legacyManager{
cgroups: cg,
paths: paths,
dbus: newDbusConnManager(false),
}
}

Expand Down Expand Up @@ -56,7 +58,7 @@ var legacySubsystems = []subsystem{
&fs.NameGroup{GroupName: "name=systemd"},
}

func genV1ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]systemdDbus.Property, error) {
func genV1ResourcesProperties(c *configs.Cgroup, cm *dbusConnManager) ([]systemdDbus.Property, error) {
var properties []systemdDbus.Property
r := c.Resources

Expand All @@ -76,7 +78,7 @@ func genV1ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]syst
newProp("CPUShares", r.CpuShares))
}

addCpuQuota(conn, &properties, r.CpuQuota, r.CpuPeriod)
addCpuQuota(cm, &properties, r.CpuQuota, r.CpuPeriod)

if r.BlkioWeight != 0 {
properties = append(properties,
Expand All @@ -88,7 +90,7 @@ func genV1ResourcesProperties(c *configs.Cgroup, conn *systemdDbus.Conn) ([]syst
newProp("TasksMax", uint64(r.PidsLimit)))
}

err = addCpuset(conn, &properties, r.CpusetCpus, r.CpusetMems)
err = addCpuset(cm, &properties, r.CpusetCpus, r.CpusetMems)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -164,13 +166,9 @@ func (m *legacyManager) Apply(pid int) error {
properties = append(properties,
newProp("DefaultDependencies", false))

dbusConnection, err := getDbusConnection(false)
if err != nil {
return err
}
properties = append(properties, c.SystemdProps...)

if err := startUnit(dbusConnection, unitName, properties); err != nil {
if err := startUnit(m.dbus, unitName, properties); err != nil {
return err
}

Expand Down Expand Up @@ -208,13 +206,8 @@ func (m *legacyManager) Destroy() error {
m.mu.Lock()
defer m.mu.Unlock()

dbusConnection, err := getDbusConnection(false)
if err != nil {
return err
}
unitName := getUnitName(m.cgroups)
stopErr := stopUnit(m.dbus, getUnitName(m.cgroups))

stopErr := stopUnit(dbusConnection, unitName)
// Both on success and on error, cleanup all the cgroups we are aware of.
// Some of them were created directly by Apply() and are not managed by systemd.
if err := cgroups.RemovePaths(m.paths); err != nil {
Expand Down Expand Up @@ -341,11 +334,7 @@ func (m *legacyManager) Set(container *configs.Config) error {
if container.Cgroups.Resources.Unified != nil {
return cgroups.ErrV1NoUnified
}
dbusConnection, err := getDbusConnection(false)
if err != nil {
return err
}
properties, err := genV1ResourcesProperties(container.Cgroups, dbusConnection)
properties, err := genV1ResourcesProperties(container.Cgroups, m.dbus)
if err != nil {
return err
}
Expand Down Expand Up @@ -373,7 +362,7 @@ func (m *legacyManager) Set(container *configs.Config) error {
}
}

if err := dbusConnection.SetUnitProperties(getUnitName(container.Cgroups), true, properties...); err != nil {
if err := setUnitProperties(m.dbus, getUnitName(container.Cgroups), properties...); err != nil {
_ = m.Freeze(targetFreezerState)
return err
}
Expand Down
Loading