Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
rocktavious committed Jul 18, 2023
1 parent 0ff4599 commit 4927ae4
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 30 deletions.
3 changes: 3 additions & 0 deletions .changes/unreleased/Feature-20230718-161036.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Feature
body: Increase timeout on graphql requests to 60s
time: 2023-07-18T16:10:36.986992-05:00
3 changes: 3 additions & 0 deletions .changes/unreleased/Refactor-20230718-160958.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Refactor
body: Upgrade opslevel-go to 2023.7.17
time: 2023-07-18T16:09:58.960308-05:00
3 changes: 3 additions & 0 deletions .changes/unreleased/Refactor-20230718-161015.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Refactor
body: Convert to using opslevel.ID
time: 2023-07-18T16:10:15.610849-05:00
24 changes: 12 additions & 12 deletions src/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,29 @@ func doRun(cmd *cobra.Command, args []string) {
k8sClient := clientset.NewForConfigOrDie(config)

log.Info().Msgf("electing leader...")
go electLeader(k8sClient, runner.Id.(string))
go electLeader(k8sClient, runner.Id)
}

log.Info().Msgf("Starting runner for id '%s'", runner.Id)
pkg.StartMetricsServer(runner.Id.(string), viper.GetInt("metrics-port"))
pkg.StartMetricsServer(runner.Id, viper.GetInt("metrics-port"))
stop := opslevel_common.InitSignalHandler()
wg := startWorkers(runner.Id.(string), stop)
wg := startWorkers(runner.Id, stop)
<-stop // Enter Forever Loop
log.Info().Msgf("interupt - waiting for jobs to complete ...")
wg.Wait()
log.Info().Msgf("Unregister runner for id '%s'...", runner.Id)
client.RunnerUnregister(&runner.Id)
client.RunnerUnregister(runner.Id)
}

func electLeader(k8sClient *clientset.Clientset, runnerId string) {
func electLeader(k8sClient *clientset.Clientset, runnerId opslevel.ID) {
leaseLockName := viper.GetString("runner-deployment")
leaseLockNamespace := viper.GetString("runner-pod-namespace")
lockIdentity := viper.GetString("runner-pod-name")

pkg.RunLeaderElection(k8sClient, runnerId, leaseLockName, lockIdentity, leaseLockNamespace)
}

func startWorkers(runnerId string, stop <-chan struct{}) *sync.WaitGroup {
func startWorkers(runnerId opslevel.ID, stop <-chan struct{}) *sync.WaitGroup {
wg := sync.WaitGroup{}
concurrency := getConcurrency()
wg.Add(concurrency)
Expand All @@ -96,7 +96,7 @@ func getConcurrency() int {
return concurrency
}

func jobWorker(wg *sync.WaitGroup, index int, runnerId string, jobQueue <-chan opslevel.RunnerJob) {
func jobWorker(wg *sync.WaitGroup, index int, runnerId opslevel.ID, jobQueue <-chan opslevel.RunnerJob) {
logMaxBytes := viper.GetInt("job-pod-log-max-size")
logMaxDuration := time.Duration(viper.GetInt("job-pod-log-max-interval")) * time.Second
logPrefix := func() string { return fmt.Sprintf("%s [%d] ", time.Now().UTC().Format(time.RFC3339), index) }
Expand All @@ -112,7 +112,7 @@ func jobWorker(wg *sync.WaitGroup, index int, runnerId string, jobQueue <-chan o
defer wg.Done()
for job := range jobQueue {
ctx := context.Background()
jobId := job.Id.(string)
jobId := job.Id
jobNumber := job.Number()

streamer := pkg.NewLogStreamer(
Expand Down Expand Up @@ -174,10 +174,10 @@ func jobWorker(wg *sync.WaitGroup, index int, runnerId string, jobQueue <-chan o
logger.Info().Msgf("Shutting down job processor %d ...", index)
}

func jobPoller(runnerId string, stop <-chan struct{}, jobQueue chan<- opslevel.RunnerJob) {
func jobPoller(runnerId opslevel.ID, stop <-chan struct{}, jobQueue chan<- opslevel.RunnerJob) {
logger := log.With().Int("worker", 0).Logger()
client := pkg.NewGraphClient()
token := opslevel.NewID("")
token := opslevel.ID("")
poll_wait_time := time.Second * time.Duration(viper.GetInt("poll-interval"))
logger.Info().Msg("Starting polling for jobs")
for {
Expand All @@ -190,14 +190,14 @@ func jobPoller(runnerId string, stop <-chan struct{}, jobQueue chan<- opslevel.R
logger.Trace().Msg("Polling for jobs ...")
continue_polling := true
for continue_polling {
logger.Debug().Msgf("Get pending jobs with lastUpdateToken '%v' ...", *token)
logger.Debug().Msgf("Get pending jobs with lastUpdateToken '%v' ...", token)
job, nextToken, err := client.RunnerGetPendingJob(runnerId, token)
if err != nil {
logger.Error().Err(err).Msg("got error when getting pending job")
continue_polling = false
} else {
token = nextToken
if job.Id == nil {
if job.Id == "" {
continue_polling = false
} else {
logger.Debug().Msgf("Enqueuing job '%s'", job.Number())
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func doTest(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
if job.Id == nil {
if job.Id == "" {
job.Id = "1"
}
streamer := pkg.NewLogStreamer(
Expand Down
6 changes: 3 additions & 3 deletions src/pkg/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type JobConfig struct {
}

type JobRunner struct {
runnerId string
runnerId opslevel.ID
logger zerolog.Logger
config *rest.Config
clientset *kubernetes.Clientset
Expand All @@ -58,7 +58,7 @@ type JobPodConfig struct {
MemLimit int64 //in MB
}

func NewJobRunner(runnerId string, logger zerolog.Logger, jobPodConfig JobPodConfig) (*JobRunner, error) {
func NewJobRunner(runnerId opslevel.ID, logger zerolog.Logger, jobPodConfig JobPodConfig) (*JobRunner, error) {
config, err := getKubernetesConfig()
if err != nil {
return nil, err
Expand Down Expand Up @@ -185,7 +185,7 @@ func (s *JobRunner) getPodObject(identifier string, labels map[string]string, jo

// TODO: Remove all usages of "Viper" they should be passed in at JobRunner configuraiton time
func (s *JobRunner) Run(job opslevel.RunnerJob, stdout, stderr *SafeBuffer) JobOutcome {
id := job.Id.(string)
id := string(job.Id)
identifier := fmt.Sprintf("opslevel-job-%s-%d", job.Number(), time.Now().Unix())
runnerIdentifier := fmt.Sprintf("runner-%s", s.runnerId)
labels := map[string]string{
Expand Down
5 changes: 3 additions & 2 deletions src/pkg/leaderElection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pkg

import (
"context"
"github.com/opslevel/opslevel-go/v2023"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -19,7 +20,7 @@ var (
isLeader bool
)

func RunLeaderElection(client *clientset.Clientset, runnerId, lockName, lockIdentity, lockNamespace string) {
func RunLeaderElection(client *clientset.Clientset, runnerId opslevel.ID, lockName, lockIdentity, lockNamespace string) {
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: lockName,
Expand Down Expand Up @@ -97,7 +98,7 @@ func RunLeaderElection(client *clientset.Clientset, runnerId, lockName, lockIden
})
}

func getReplicaCount(runnerId string, currentReplicas int) (int32, error) {
func getReplicaCount(runnerId opslevel.ID, currentReplicas int) (int32, error) {
clientGQL := NewGraphClient()
jobConcurrency := int(math.Max(float64(viper.GetInt("job-concurrency")), 1))
runnerScale, err := clientGQL.RunnerScale(runnerId, currentReplicas, jobConcurrency)
Expand Down
5 changes: 3 additions & 2 deletions src/pkg/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pkg

import (
"fmt"
"github.com/opslevel/opslevel-go/v2023"
"net/http"

"github.com/prometheus/client_golang/prometheus/collectors"
Expand Down Expand Up @@ -49,8 +50,8 @@ func initMetrics(id string) {
})
}

func StartMetricsServer(id string, port int) {
initMetrics(id)
func StartMetricsServer(id opslevel.ID, port int) {
initMetrics(string(id))
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{})) // Uses a clean instrumentation free handler
Expand Down
10 changes: 5 additions & 5 deletions src/pkg/opslevelAppendLogProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
type OpsLevelAppendLogProcessor struct {
client *opslevel.Client
logger zerolog.Logger
runnerId string
jobId string
runnerId opslevel.ID
jobId opslevel.ID
jobNumber string
maxBytes int
maxTime time.Duration
Expand All @@ -22,7 +22,7 @@ type OpsLevelAppendLogProcessor struct {
elapsed time.Duration
}

func NewOpsLevelAppendLogProcessor(client *opslevel.Client, logger zerolog.Logger, runnerId string, jobId string, jobNumber string, maxBytes int, maxTime time.Duration) *OpsLevelAppendLogProcessor {
func NewOpsLevelAppendLogProcessor(client *opslevel.Client, logger zerolog.Logger, runnerId opslevel.ID, jobId opslevel.ID, jobNumber string, maxBytes int, maxTime time.Duration) *OpsLevelAppendLogProcessor {
return &OpsLevelAppendLogProcessor{
client: client,
logger: logger,
Expand Down Expand Up @@ -86,8 +86,8 @@ func (s *OpsLevelAppendLogProcessor) Flush(outcome JobOutcome) {
func (s *OpsLevelAppendLogProcessor) submit() {
if s.client != nil && len(s.logLines) > 0 {
err := s.client.RunnerAppendJobLog(opslevel.RunnerAppendJobLogInput{
RunnerId: s.runnerId,
RunnerJobId: s.jobId,
RunnerId: opslevel.ID(s.runnerId),
RunnerJobId: opslevel.ID(s.jobId),
SentAt: opslevel.NewISO8601DateNow(),
Logs: s.logLines,
})
Expand Down
10 changes: 5 additions & 5 deletions src/pkg/setOutcomeVarLogProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ var endOutcomeVarExp = regexp.MustCompile(`^::end-multiline-outcome-var`)
type SetOutcomeVarLogProcessor struct {
client *opslevel.Client
logger zerolog.Logger
runnerId string
jobId string
runnerId opslevel.ID
jobId opslevel.ID
jobNumber string
multilineOutcomeVarKey Stack[string]
vars map[string]string
}

func NewSetOutcomeVarLogProcessor(client *opslevel.Client, logger zerolog.Logger, runnerId string, jobId string, jobNumber string) *SetOutcomeVarLogProcessor {
func NewSetOutcomeVarLogProcessor(client *opslevel.Client, logger zerolog.Logger, runnerId opslevel.ID, jobId opslevel.ID, jobNumber string) *SetOutcomeVarLogProcessor {
return &SetOutcomeVarLogProcessor{
client: client,
logger: logger,
Expand Down Expand Up @@ -98,8 +98,8 @@ func (s *SetOutcomeVarLogProcessor) Flush(outcome JobOutcome) {
}

err := s.client.RunnerReportJobOutcome(opslevel.RunnerReportJobOutcomeInput{
RunnerId: s.runnerId,
RunnerJobId: s.jobId,
RunnerId: opslevel.ID(s.runnerId),
RunnerJobId: opslevel.ID(s.jobId),
Outcome: outcome.Outcome,
OutcomeVariables: vars,
})
Expand Down

0 comments on commit 4927ae4

Please sign in to comment.