From 01872655436ccb43e38b3c6df303fd2f6284563b Mon Sep 17 00:00:00 2001 From: Andreas Peters Date: Mon, 16 Oct 2023 23:11:33 +0200 Subject: [PATCH] ADD: contraint unique. --- .gitignore | 1 + .version.json | 2 +- Dockerfile | 2 +- api/compose.go | 51 +++++++++++++++++--------------------- changelog.md | 3 +++ init.go | 3 ++- scheduler/handle_offers.go | 33 +++++++++++++++++++++++- types/types.go | 1 + 8 files changed, 64 insertions(+), 32 deletions(-) diff --git a/.gitignore b/.gitignore index fbb519b..a6b9a07 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ bootstrap/*.db-wal mesos-compose .dccache mesos_cli/compose/__pycache__/**/** +.vscode \ No newline at end of file diff --git a/.version.json b/.version.json index 2c70f93..771c5d1 100644 --- a/.version.json +++ b/.version.json @@ -1 +1 @@ -[{ "version":"v0.4.2", "builddate":"2023-09-11T19:36:21Z" }] +[{ "version":"v0.4.2", "builddate":"2023-09-29T15:56:55Z" }] diff --git a/Dockerfile b/Dockerfile index 059bc4b..704079b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ WORKDIR /build COPY . /build/ -RUN apk update && apk add git && \ +RUN apk update && apk upgrade && apk add git && \ go get -d ARG TAG diff --git a/api/compose.go b/api/compose.go index a5fa2ac..da896d2 100644 --- a/api/compose.go +++ b/api/compose.go @@ -267,7 +267,7 @@ func (e *API) getDiscoveryInfoPorts(cmd cfg.Command) []mesosproto.Port { for i, c := range cmd.DockerPortMappings { var tmpport mesosproto.Port p := func() *string { x := cmd.TaskName + ":" + strconv.FormatUint(uint64(c.ContainerPort), 10); return &x }() - tmpport.Name = func() *string { x := strings.ReplaceAll(*p, ":", "_"); return &x }() + tmpport.Name = func() *string { x := strings.ReplaceAll(*p, ":", e.Config.DiscoveryPortNameDelimiter); return &x }() tmpport.Number = c.HostPort tmpport.Protocol = c.Protocol @@ -332,23 +332,12 @@ func (e *API) getVolumes(containerType string) []mesosproto.Volume { driver = e.Compose.Volumes[p[0]].Driver } - switch containerType { - case "docker": - tmp.Source = &mesosproto.Volume_Source{ - Type: mesosproto.Volume_Source_DOCKER_VOLUME, - DockerVolume: &mesosproto.Volume_Source_DockerVolume{ - Name: p[0], - Driver: func() *string { x := driver; return &x }(), - }, - } - default: - tmp.Source = &mesosproto.Volume_Source{ - Type: mesosproto.Volume_Source_DOCKER_VOLUME, - DockerVolume: &mesosproto.Volume_Source_DockerVolume{ - Name: p[0], - Driver: func() *string { x := driver; return &x }(), - }, - } + tmp.Source = &mesosproto.Volume_Source{ + Type: mesosproto.Volume_Source_DOCKER_VOLUME, + DockerVolume: &mesosproto.Volume_Source_DockerVolume{ + Name: p[0], + Driver: func() *string { x := driver; return &x }(), + }, } volume = append(volume, tmp) } @@ -544,18 +533,24 @@ func (e *API) addDockerParameter(current []mesosproto.Parameter, newValues mesos func (e *API) setConstraints(cmd *cfg.Command) { if len(e.Service.Deploy.Placement.Constraints) > 0 { for _, constraint := range e.Service.Deploy.Placement.Constraints { - cons := strings.Split(constraint, "==") - if len(cons) >= 2 { - if cons[0] == "node.hostname" { - cmd.Labels = append(cmd.Labels, mesosproto.Label{Key: "__mc_placement_node_hostname", Value: &cons[1]}) - } - if cons[0] == "node.platform.os" { - cmd.Labels = append(cmd.Labels, mesosproto.Label{Key: "__mc_placement_node_platform_os", Value: &cons[1]}) - } - if cons[0] == "node.platform.arch" { - cmd.Labels = append(cmd.Labels, mesosproto.Label{Key: "__mc_placement_node_platform_arch", Value: &cons[1]}) + if strings.Contains(constraint, "==") { + cons := strings.Split(constraint, "==") + if len(cons) >= 2 { + if cons[0] == "node.hostname" { + cmd.Labels = append(cmd.Labels, mesosproto.Label{Key: "__mc_placement_node_hostname", Value: &cons[1]}) + } + if cons[0] == "node.platform.os" { + cmd.Labels = append(cmd.Labels, mesosproto.Label{Key: "__mc_placement_node_platform_os", Value: &cons[1]}) + } + if cons[0] == "node.platform.arch" { + cmd.Labels = append(cmd.Labels, mesosproto.Label{Key: "__mc_placement_node_platform_arch", Value: &cons[1]}) + } } } + if strings.ToLower(constraint) == "unique" { + val := func() *string { x := "unique"; return &x }() + cmd.Labels = append(cmd.Labels, mesosproto.Label{Key: "__mc_placement", Value: val}) + } } } } diff --git a/changelog.md b/changelog.md index e527e35..06a4f3f 100644 --- a/changelog.md +++ b/changelog.md @@ -30,6 +30,9 @@ - ADD: Better support for mesos containerizer (thanks to @harryzz) - ADD: Command Attributes (thanks to @harryzz) - FIX: Conflict between reconcile and heatbeat could end in a task restart loop +- ADD: Parameter to configure the Mesos Task DiscoveryInfoName Delimiter `DISCOVERY_INFONAME_DELIMITER`. Default value is ".". +- ADD: Parameter to configure the Mesos Task DiscoveryPortName Delimiter `DISCOVERY_PORTNAME_DELIMITER`. Default value is "_". +- ADD: Constraint `unique` to run a only one instance of a task per node. ## 0.4.2 diff --git a/init.go b/init.go index a4642b2..b655d8b 100644 --- a/init.go +++ b/init.go @@ -50,7 +50,8 @@ func init() { config.VaultToken = util.Getenv("VAULT_TOKEN", "") config.VaultURL = util.Getenv("VAULT_URL", "http://127.0.0.1:8200") config.VaultTimeout, _ = time.ParseDuration(util.Getenv("VAULT_TIMEOUT", "10s")) - config.DiscoveryInfoNameDelimiter = util.Getenv("DISCOVERY_NAME_DELIMITER", ".") + config.DiscoveryInfoNameDelimiter = util.Getenv("DISCOVERY_INFONAME_DELIMITER", ".") + config.DiscoveryPortNameDelimiter = util.Getenv("DISCOVERY_PORTNAME_DELIMITER", "_") // The comunication to the mesos server should be via ssl or not if util.Getenv("MESOS_SSL", "false") == "true" { diff --git a/scheduler/handle_offers.go b/scheduler/handle_offers.go index b76cf30..502b4de 100644 --- a/scheduler/handle_offers.go +++ b/scheduler/handle_offers.go @@ -111,7 +111,13 @@ func (e *Scheduler) getOffer(offers *mesosproto.Event_Offers, cmd cfg.Command) ( if strings.ToLower(valHostname) == offer.GetHostname() { logrus.WithField("func", "scheduler.getOffer").Debug("Set Server Hostname Constraint to:", offer.GetHostname()) } else { - logrus.WithField("func", "scheduler.getOffer").Debug("Could not found hostname, get next offer") + logrus.WithField("func", "scheduler.getOffer").Debug("Could not find hostname, get next offer") + continue + } + } + + if e.getLabelValue("__mc_placement", cmd) == "unique" { + if e.alreadyRunningOnHostname(cmd, offer) { continue } } @@ -141,6 +147,31 @@ func (e *Scheduler) getAttributes(name string, offer mesosproto.Offer) string { return "" } +func (e *Scheduler) alreadyRunningOnHostname(cmd cfg.Command, offer mesosproto.Offer) bool { + keys := e.Redis.GetAllRedisKeys(cmd.TaskName + ":*") + for keys.Next(e.Redis.CTX) { + // continue if the key is not a mesos task + if e.Redis.CheckIfNotTask(keys) { + continue + } + // get the values of the current key + key := e.Redis.GetRedisKey(keys.Val()) + + task := e.Mesos.DecodeTask(key) + + // continue if it's a unvalid task + if task.TaskID == "" { + continue + } + + if task.MesosAgent.Hostname == cmd.MesosAgent.Hostname { + return true + } + } + + return false +} + func (e *Scheduler) isAttributeMachted(label, attribute string, cmd cfg.Command, offer mesosproto.Offer) bool { valOS := e.getLabelValue(label, cmd) if valOS != "" { diff --git a/types/types.go b/types/types.go index 7d7f97b..befb354 100644 --- a/types/types.go +++ b/types/types.go @@ -36,6 +36,7 @@ type Config struct { VaultTimeout time.Duration DefaultVolumeDriver string DiscoveryInfoNameDelimiter string + DiscoveryPortNameDelimiter string } // UserCredentials - The Username and Password to authenticate against this framework