Skip to content

Commit

Permalink
Merge pull request 'ADD: agent count check.' (#43) from add-unique in…
Browse files Browse the repository at this point in the history
…to master

Reviewed-on: https://gitea:3000/AVENTER/mesos-compose/pulls/43
  • Loading branch information
Andreas Peters committed Oct 18, 2023
2 parents 4a7701c + c512f03 commit b74711f
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 16 deletions.
2 changes: 1 addition & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
- FIX: Conflict between reconcile and heatbeat could end in a task restart loop
- ADD: Parameter to configure the Mesos Task DiscoveryInfoName Delimiter `DISCOVERY_INFONAME_DELIMITER`. Default value is ".".
- ADD: Parameter to configure the Mesos Task DiscoveryPortName Delimiter `DISCOVERY_PORTNAME_DELIMITER`. Default value is "_".
- ADD: Constraint `unique` to run a only one instance of a task per node.
- ADD: Constraint `unique` to run only one instance of a task per node.

## 0.4.2

Expand Down
1 change: 1 addition & 0 deletions docs/example/mesos-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ services:
- "node.hostname==localhost"
- "node.platform.os==linux"
- "node.platform.arch==arm"
- "unique"
replicas: 1
resources:
limits:
Expand Down
6 changes: 0 additions & 6 deletions docs/example/test-http.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,3 @@ services:
restart: always
deploy:
replicas: 1

networks:
default:
external: true
name: weave

8 changes: 6 additions & 2 deletions mesos/mesos.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Mesos struct {
Framework *cfg.FrameworkConfig
IsSuppress bool
IsRevive bool
CountAgent int
}

// Marshaler to serialize Protobuf Message to JSON
Expand Down Expand Up @@ -167,7 +168,7 @@ func (e *Mesos) Call(message *mesosproto.Call) error {
body, err := ioutil.ReadAll(res.Body)
if err != nil {
logrus.WithField("func", "mesos.Call").Error("Call Handling (could not read res.Body)")
return fmt.Errorf("Error %d", res.StatusCode)
return fmt.Errorf("error %d", res.StatusCode)
}

logrus.WithField("func", "mesos.Call").Error("Call Handling: ", string(body))
Expand Down Expand Up @@ -276,7 +277,7 @@ func (e *Mesos) GetAgentInfo(agentID string) cfg.MesosSlaves {
res, err := client.Do(req)

if err != nil {
logrus.WithField("func", "getAgentInfo").Error("Could not connect to agent: ", err.Error())
logrus.WithField("func", "mesos.getAgentInfo").Error("Could not connect to master: ", err.Error())
return cfg.MesosSlaves{}
}

Expand All @@ -295,6 +296,9 @@ func (e *Mesos) GetAgentInfo(agentID string) cfg.MesosSlaves {
return cfg.MesosSlaves{}
}

// save how many agents the cluster has
e.CountAgent = len(agent.Slaves)

// get the used agent info
for _, a := range agent.Slaves {
if a.ID == agentID {
Expand Down
5 changes: 4 additions & 1 deletion scheduler/handle_offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,18 @@ func (e *Scheduler) getOffer(offers *mesosproto.Event_Offers, cmd cfg.Command) (

if e.getLabelValue("__mc_placement", cmd) == "unique" {
if e.alreadyRunningOnHostname(cmd, offer) {
logrus.WithField("func", "scheduler.getOffer").Debug("UNIQUE: Already running on node: ", offer.GetHostname())
continue
}
}

if !e.isAttributeMachted("__mc_placement_node_platform_os", "os", cmd, offer) {
logrus.WithField("func", "scheduler.getOffer").Debug("OS: Does not match Attribute")
continue
}

if !e.isAttributeMachted("__mc_placement_node_platform_arch", "arch", cmd, offer) {
logrus.WithField("func", "scheduler.getOffer").Debug("OS: Does not match Attribute")
continue
}

Expand Down Expand Up @@ -164,7 +167,7 @@ func (e *Scheduler) alreadyRunningOnHostname(cmd cfg.Command, offer mesosproto.O
continue
}

if task.MesosAgent.Hostname == cmd.MesosAgent.Hostname {
if task.MesosAgent.Hostname == cmd.MesosAgent.Hostname && task.TaskID != cmd.TaskID {
return true
}
}
Expand Down
14 changes: 8 additions & 6 deletions scheduler/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ func (e *Scheduler) Heartbeat() {

// there are lesser instances are running as it should be
if e.Redis.CountRedisKey(task.TaskName+":*", "__KILL") < task.Instances {
logrus.WithField("func", "scheduler.CheckState").Info("Scale up Mesos Task: ", task.TaskName)
e.Mesos.Revive()
task.State = ""
task.TaskID = e.API.IncreaseTaskCount(task.TaskID)
e.Redis.SaveTaskRedis(task)
continue
if e.getLabelValue("__mc_placement", task) != "unique" && e.Mesos.CountAgent >= task.Instances {
logrus.WithField("func", "scheduler.CheckState").Info("Scale up Mesos Task: ", task.TaskName)
e.Mesos.Revive()
task.State = ""
task.TaskID = e.API.IncreaseTaskCount(task.TaskID)
e.Redis.SaveTaskRedis(task)
continue
}
}

// there are more instances are running as it should be
Expand Down

0 comments on commit b74711f

Please sign in to comment.