Skip to content

Commit

Permalink
Merge pull request 'ADD: contraint unique.' (#42) from add-unique int…
Browse files Browse the repository at this point in the history
…o master

Reviewed-on: https://gitea:3000/AVENTER/mesos-compose/pulls/42
  • Loading branch information
Andreas Peters committed Oct 18, 2023
2 parents e0c8207 + 0187265 commit 4a7701c
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 32 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ bootstrap/*.db-wal
mesos-compose
.dccache
mesos_cli/compose/__pycache__/**/**
.vscode
2 changes: 1 addition & 1 deletion .version.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
[{ "version":"v0.4.2", "builddate":"2023-09-11T19:36:21Z" }]
[{ "version":"v0.4.2", "builddate":"2023-09-29T15:56:55Z" }]
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ WORKDIR /build

COPY . /build/

RUN apk update && apk add git && \
RUN apk update && apk upgrade && apk add git && \
go get -d

ARG TAG
Expand Down
51 changes: 23 additions & 28 deletions api/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (e *API) getDiscoveryInfoPorts(cmd cfg.Command) []mesosproto.Port {
for i, c := range cmd.DockerPortMappings {
var tmpport mesosproto.Port
p := func() *string { x := cmd.TaskName + ":" + strconv.FormatUint(uint64(c.ContainerPort), 10); return &x }()
tmpport.Name = func() *string { x := strings.ReplaceAll(*p, ":", "_"); return &x }()
tmpport.Name = func() *string { x := strings.ReplaceAll(*p, ":", e.Config.DiscoveryPortNameDelimiter); return &x }()
tmpport.Number = c.HostPort
tmpport.Protocol = c.Protocol

Expand Down Expand Up @@ -332,23 +332,12 @@ func (e *API) getVolumes(containerType string) []mesosproto.Volume {
driver = e.Compose.Volumes[p[0]].Driver
}

switch containerType {
case "docker":
tmp.Source = &mesosproto.Volume_Source{
Type: mesosproto.Volume_Source_DOCKER_VOLUME,
DockerVolume: &mesosproto.Volume_Source_DockerVolume{
Name: p[0],
Driver: func() *string { x := driver; return &x }(),
},
}
default:
tmp.Source = &mesosproto.Volume_Source{
Type: mesosproto.Volume_Source_DOCKER_VOLUME,
DockerVolume: &mesosproto.Volume_Source_DockerVolume{
Name: p[0],
Driver: func() *string { x := driver; return &x }(),
},
}
tmp.Source = &mesosproto.Volume_Source{
Type: mesosproto.Volume_Source_DOCKER_VOLUME,
DockerVolume: &mesosproto.Volume_Source_DockerVolume{
Name: p[0],
Driver: func() *string { x := driver; return &x }(),
},
}
volume = append(volume, tmp)
}
Expand Down Expand Up @@ -544,18 +533,24 @@ func (e *API) addDockerParameter(current []mesosproto.Parameter, newValues mesos
func (e *API) setConstraints(cmd *cfg.Command) {
if len(e.Service.Deploy.Placement.Constraints) > 0 {
for _, constraint := range e.Service.Deploy.Placement.Constraints {
cons := strings.Split(constraint, "==")
if len(cons) >= 2 {
if cons[0] == "node.hostname" {
cmd.Labels = append(cmd.Labels, mesosproto.Label{Key: "__mc_placement_node_hostname", Value: &cons[1]})
}
if cons[0] == "node.platform.os" {
cmd.Labels = append(cmd.Labels, mesosproto.Label{Key: "__mc_placement_node_platform_os", Value: &cons[1]})
}
if cons[0] == "node.platform.arch" {
cmd.Labels = append(cmd.Labels, mesosproto.Label{Key: "__mc_placement_node_platform_arch", Value: &cons[1]})
if strings.Contains(constraint, "==") {
cons := strings.Split(constraint, "==")
if len(cons) >= 2 {
if cons[0] == "node.hostname" {
cmd.Labels = append(cmd.Labels, mesosproto.Label{Key: "__mc_placement_node_hostname", Value: &cons[1]})
}
if cons[0] == "node.platform.os" {
cmd.Labels = append(cmd.Labels, mesosproto.Label{Key: "__mc_placement_node_platform_os", Value: &cons[1]})
}
if cons[0] == "node.platform.arch" {
cmd.Labels = append(cmd.Labels, mesosproto.Label{Key: "__mc_placement_node_platform_arch", Value: &cons[1]})
}
}
}
if strings.ToLower(constraint) == "unique" {
val := func() *string { x := "unique"; return &x }()
cmd.Labels = append(cmd.Labels, mesosproto.Label{Key: "__mc_placement", Value: val})
}
}
}
}
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
- ADD: Better support for mesos containerizer (thanks to @harryzz)
- ADD: Command Attributes (thanks to @harryzz)
- FIX: Conflict between reconcile and heatbeat could end in a task restart loop
- ADD: Parameter to configure the Mesos Task DiscoveryInfoName Delimiter `DISCOVERY_INFONAME_DELIMITER`. Default value is ".".
- ADD: Parameter to configure the Mesos Task DiscoveryPortName Delimiter `DISCOVERY_PORTNAME_DELIMITER`. Default value is "_".
- ADD: Constraint `unique` to run a only one instance of a task per node.

## 0.4.2

Expand Down
3 changes: 2 additions & 1 deletion init.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func init() {
config.VaultToken = util.Getenv("VAULT_TOKEN", "")
config.VaultURL = util.Getenv("VAULT_URL", "http://127.0.0.1:8200")
config.VaultTimeout, _ = time.ParseDuration(util.Getenv("VAULT_TIMEOUT", "10s"))
config.DiscoveryInfoNameDelimiter = util.Getenv("DISCOVERY_NAME_DELIMITER", ".")
config.DiscoveryInfoNameDelimiter = util.Getenv("DISCOVERY_INFONAME_DELIMITER", ".")
config.DiscoveryPortNameDelimiter = util.Getenv("DISCOVERY_PORTNAME_DELIMITER", "_")

// The comunication to the mesos server should be via ssl or not
if util.Getenv("MESOS_SSL", "false") == "true" {
Expand Down
33 changes: 32 additions & 1 deletion scheduler/handle_offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,13 @@ func (e *Scheduler) getOffer(offers *mesosproto.Event_Offers, cmd cfg.Command) (
if strings.ToLower(valHostname) == offer.GetHostname() {
logrus.WithField("func", "scheduler.getOffer").Debug("Set Server Hostname Constraint to:", offer.GetHostname())
} else {
logrus.WithField("func", "scheduler.getOffer").Debug("Could not found hostname, get next offer")
logrus.WithField("func", "scheduler.getOffer").Debug("Could not find hostname, get next offer")
continue
}
}

if e.getLabelValue("__mc_placement", cmd) == "unique" {
if e.alreadyRunningOnHostname(cmd, offer) {
continue
}
}
Expand Down Expand Up @@ -141,6 +147,31 @@ func (e *Scheduler) getAttributes(name string, offer mesosproto.Offer) string {
return ""
}

func (e *Scheduler) alreadyRunningOnHostname(cmd cfg.Command, offer mesosproto.Offer) bool {
keys := e.Redis.GetAllRedisKeys(cmd.TaskName + ":*")
for keys.Next(e.Redis.CTX) {
// continue if the key is not a mesos task
if e.Redis.CheckIfNotTask(keys) {
continue
}
// get the values of the current key
key := e.Redis.GetRedisKey(keys.Val())

task := e.Mesos.DecodeTask(key)

// continue if it's a unvalid task
if task.TaskID == "" {
continue
}

if task.MesosAgent.Hostname == cmd.MesosAgent.Hostname {
return true
}
}

return false
}

func (e *Scheduler) isAttributeMachted(label, attribute string, cmd cfg.Command, offer mesosproto.Offer) bool {
valOS := e.getLabelValue(label, cmd)
if valOS != "" {
Expand Down
1 change: 1 addition & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Config struct {
VaultTimeout time.Duration
DefaultVolumeDriver string
DiscoveryInfoNameDelimiter string
DiscoveryPortNameDelimiter string
}

// UserCredentials - The Username and Password to authenticate against this framework
Expand Down

0 comments on commit 4a7701c

Please sign in to comment.