Skip to content

Commit

Permalink
send annotations to run-service (actions#2574)
Browse files Browse the repository at this point in the history
* send annotations to run-service

* skip message deletion

* actually don't skip deletion

* enum as numbers

* fix enum

* linting

* remove unncessary file

* feedback
  • Loading branch information
yaananth authored and nikola-jokic committed May 12, 2023
1 parent 0db9b07 commit 7b629c8
Show file tree
Hide file tree
Showing 13 changed files with 332 additions and 59 deletions.
20 changes: 17 additions & 3 deletions src/Runner.Common/RunServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@ public interface IRunServer : IRunnerService

Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken token);

Task CompleteJobAsync(Guid planId, Guid jobId, TaskResult result, Dictionary<String, VariableValue> outputs, IList<StepResult> stepResults, CancellationToken token);
Task CompleteJobAsync(
Guid planId,
Guid jobId,
TaskResult result,
Dictionary<String, VariableValue> outputs,
IList<StepResult> stepResults,
IList<Annotation> jobAnnotations,
CancellationToken token);

Task<RenewJobResponse> RenewJobAsync(Guid planId, Guid jobId, CancellationToken token);
}
Expand Down Expand Up @@ -56,11 +63,18 @@ public Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationTo
shouldRetry: ex => ex is not TaskOrchestrationJobAlreadyAcquiredException);
}

public Task CompleteJobAsync(Guid planId, Guid jobId, TaskResult result, Dictionary<String, VariableValue> outputs, IList<StepResult> stepResults, CancellationToken cancellationToken)
public Task CompleteJobAsync(
Guid planId,
Guid jobId,
TaskResult result,
Dictionary<String, VariableValue> outputs,
IList<StepResult> stepResults,
IList<Annotation> jobAnnotations,
CancellationToken cancellationToken)
{
CheckConnection();
return RetryRequest(
async () => await _runServiceHttpClient.CompleteJobAsync(requestUri, planId, jobId, result, outputs, stepResults, cancellationToken), cancellationToken);
async () => await _runServiceHttpClient.CompleteJobAsync(requestUri, planId, jobId, result, outputs, stepResults, jobAnnotations, cancellationToken), cancellationToken);
}

public Task<RenewJobResponse> RenewJobAsync(Guid planId, Guid jobId, CancellationToken cancellationToken)
Expand Down
77 changes: 39 additions & 38 deletions src/Runner.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using GitHub.Services.Common;
using GitHub.Services.WebApi;
using GitHub.Services.WebApi.Jwt;
using Sdk.RSWebApi.Contracts;
using Pipelines = GitHub.DistributedTask.Pipelines;

namespace GitHub.Runner.Listener
Expand Down Expand Up @@ -372,15 +373,15 @@ private async Task RunAsync(Pipelines.AgentJobRequestMessage message, string orc
TaskCompletionSource<int> firstJobRequestRenewed = new();
var notification = HostContext.GetService<IJobNotification>();

var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));

// lock renew cancellation token.
using (var lockRenewalTokenSource = new CancellationTokenSource())
using (var workerProcessCancelTokenSource = new CancellationTokenSource())
{
long requestId = message.RequestId;
Guid lockToken = Guid.Empty; // lockToken has never been used, keep this here of compat

var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));

// start renew job request
Trace.Info($"Start renew job request {requestId} for job {message.JobId}.");
Task renewJobRequest = RenewJobRequestAsync(message, systemConnection, _poolId, requestId, lockToken, orchestrationId, firstJobRequestRenewed, lockRenewalTokenSource.Token);
Expand All @@ -405,7 +406,7 @@ private async Task RunAsync(Pipelines.AgentJobRequestMessage message, string orc
await renewJobRequest;

// complete job request with result Cancelled
await CompleteJobRequestAsync(_poolId, message, lockToken, TaskResult.Canceled);
await CompleteJobRequestAsync(_poolId, message, systemConnection, lockToken, TaskResult.Canceled);
return;
}

Expand Down Expand Up @@ -544,15 +545,14 @@ await processChannel.SendAsync(
detailInfo = string.Join(Environment.NewLine, workerOutput);
Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result.");


var jobServer = await InitializeJobServerAsync(systemConnection);
await LogWorkerProcessUnhandledException(jobServer, message, detailInfo);

// Go ahead to finish the job with result 'Failed' if the STDERR from worker is System.IO.IOException, since it typically means we are running out of disk space.
if (detailInfo.Contains(typeof(System.IO.IOException).ToString(), StringComparison.OrdinalIgnoreCase))
{
Trace.Info($"Finish job with result 'Failed' due to IOException.");
await ForceFailJob(jobServer, message);
await ForceFailJob(jobServer, message, detailInfo);
}
}

Expand All @@ -567,7 +567,7 @@ await processChannel.SendAsync(
await renewJobRequest;

// complete job request
await CompleteJobRequestAsync(_poolId, message, lockToken, result, detailInfo);
await CompleteJobRequestAsync(_poolId, message, systemConnection, lockToken, result, detailInfo);

// print out unhandled exception happened in worker after we complete job request.
// when we run out of disk space, report back to server has higher priority.
Expand Down Expand Up @@ -664,7 +664,7 @@ await processChannel.SendAsync(
await renewJobRequest;

// complete job request
await CompleteJobRequestAsync(_poolId, message, lockToken, resultOnAbandonOrCancel);
await CompleteJobRequestAsync(_poolId, message, systemConnection, lockToken, resultOnAbandonOrCancel);
}
finally
{
Expand Down Expand Up @@ -1065,7 +1065,7 @@ private async Task TryUploadUnfinishedLogs(Pipelines.AgentJobRequestMessage mess
}
}

private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequestMessage message, Guid lockToken, TaskResult result, string detailInfo = null)
private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequestMessage message, ServiceEndpoint systemConnection, Guid lockToken, TaskResult result, string detailInfo = null)
{
Trace.Entering();

Expand All @@ -1077,7 +1077,23 @@ private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequest

if (this._isRunServiceJob)
{
Trace.Verbose($"Skip FinishAgentRequest call from Listener because MessageType is {message.MessageType}");
var runServer = await GetRunServerAsync(systemConnection);
var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = detailInfo };
var unhandledAnnotation = unhandledExceptionIssue.ToAnnotation();
var jobAnnotations = new List<Annotation>();
if (unhandledAnnotation.HasValue)
{
jobAnnotations.Add(unhandledAnnotation.Value);
}
try
{
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, result, outputs: null, stepResults: null, jobAnnotations: jobAnnotations, CancellationToken.None);
}
catch (Exception ex)
{
Trace.Error("Fail to raise job completion back to service.");
Trace.Error(ex);
}
return;
}

Expand Down Expand Up @@ -1117,7 +1133,7 @@ private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequest
}

// log an error issue to job level timeline record
private async Task LogWorkerProcessUnhandledException(IRunnerService server, Pipelines.AgentJobRequestMessage message, string errorMessage)
private async Task LogWorkerProcessUnhandledException(IRunnerService server, Pipelines.AgentJobRequestMessage message, string detailInfo)
{
if (server is IJobServer jobServer)
{
Expand All @@ -1129,34 +1145,11 @@ private async Task LogWorkerProcessUnhandledException(IRunnerService server, Pip
TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job");
ArgUtil.NotNull(jobRecord, nameof(jobRecord));

try
{
if (!string.IsNullOrEmpty(errorMessage) &&
message.Variables.TryGetValue("DistributedTask.EnableRunnerIPCDebug", out var enableRunnerIPCDebug) &&
StringUtil.ConvertToBoolean(enableRunnerIPCDebug.Value))
{
// the trace should be best effort and not affect any job result
var match = _invalidJsonRegex.Match(errorMessage);
if (match.Success &&
match.Groups.Count == 2)
{
var jsonPosition = int.Parse(match.Groups[1].Value);
var serializedJobMessage = JsonUtility.ToString(message);
var originalJson = serializedJobMessage.Substring(jsonPosition - 10, 20);
errorMessage = $"Runner sent Json at position '{jsonPosition}': {originalJson} ({Convert.ToBase64String(Encoding.UTF8.GetBytes(originalJson))})\n{errorMessage}";
}
}
}
catch (Exception ex)
{
Trace.Error(ex);
errorMessage = $"Fail to check json IPC error: {ex.Message}\n{errorMessage}";
}

var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = errorMessage };
var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = detailInfo };
unhandledExceptionIssue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.WorkerCrash;
jobRecord.ErrorCount++;
jobRecord.Issues.Add(unhandledExceptionIssue);

await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None);
}
catch (Exception ex)
Expand All @@ -1167,13 +1160,13 @@ private async Task LogWorkerProcessUnhandledException(IRunnerService server, Pip
}
else
{
Trace.Info("Job server does not support handling unhandled exception yet, error message: {0}", errorMessage);
Trace.Info("Job server does not support handling unhandled exception yet, error message: {0}", detailInfo);
return;
}
}

// raise job completed event to fail the job.
private async Task ForceFailJob(IRunnerService server, Pipelines.AgentJobRequestMessage message)
private async Task ForceFailJob(IRunnerService server, Pipelines.AgentJobRequestMessage message, string detailInfo)
{
if (server is IJobServer jobServer)
{
Expand All @@ -1192,7 +1185,15 @@ private async Task ForceFailJob(IRunnerService server, Pipelines.AgentJobRequest
{
try
{
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, TaskResult.Failed, outputs: null, stepResults: null, CancellationToken.None);
var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = detailInfo };
var unhandledAnnotation = unhandledExceptionIssue.ToAnnotation();
var jobAnnotations = new List<Annotation>();
if (unhandledAnnotation.HasValue)
{
jobAnnotations.Add(unhandledAnnotation.Value);
}

await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, TaskResult.Failed, outputs: null, stepResults: null, jobAnnotations: jobAnnotations, CancellationToken.None);
}
catch (Exception ex)
{
Expand Down
32 changes: 24 additions & 8 deletions src/Runner.Worker/ExecutionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using GitHub.Runner.Worker.Container;
using GitHub.Runner.Worker.Handlers;
using Newtonsoft.Json;
using Sdk.RSWebApi.Contracts;
using ObjectTemplating = GitHub.DistributedTask.ObjectTemplating;
using Pipelines = GitHub.DistributedTask.Pipelines;

Expand Down Expand Up @@ -438,14 +439,26 @@ public TaskResult Complete(TaskResult? result = null, string currentOperation =

PublishStepTelemetry();

var stepResult = new StepResult();
stepResult.ExternalID = _record.Id;
stepResult.Conclusion = _record.Result ?? TaskResult.Succeeded;
stepResult.Status = _record.State;
stepResult.Number = _record.Order;
stepResult.Name = _record.Name;
stepResult.StartedAt = _record.StartTime;
stepResult.CompletedAt = _record.FinishTime;
var stepResult = new StepResult
{
ExternalID = _record.Id,
Conclusion = _record.Result ?? TaskResult.Succeeded,
Status = _record.State,
Number = _record.Order,
Name = _record.Name,
StartedAt = _record.StartTime,
CompletedAt = _record.FinishTime,
Annotations = new List<Annotation>()
};

_record.Issues?.ForEach(issue =>
{
var annotation = issue.ToAnnotation();
if (annotation != null)
{
stepResult.Annotations.Add(annotation.Value);
}
});

Global.StepsResult.Add(stepResult);

Expand Down Expand Up @@ -725,6 +738,9 @@ public void InitializeJob(Pipelines.AgentJobRequestMessage message, Cancellation
// Steps results for entire job
Global.StepsResult = new List<StepResult>();

// Job level annotations
Global.JobAnnotations = new List<Annotation>();

// Job Outputs
JobOutputs = new Dictionary<string, VariableValue>(StringComparer.OrdinalIgnoreCase);

Expand Down
4 changes: 3 additions & 1 deletion src/Runner.Worker/GlobalContext.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using System;
using System;
using System.Collections.Generic;
using GitHub.Actions.RunService.WebApi;
using GitHub.DistributedTask.WebApi;
using GitHub.Runner.Common.Util;
using GitHub.Runner.Worker.Container;
using Newtonsoft.Json.Linq;
using Sdk.RSWebApi.Contracts;

namespace GitHub.Runner.Worker
{
Expand All @@ -19,6 +20,7 @@ public sealed class GlobalContext
public IDictionary<String, IDictionary<String, String>> CompositeDefaults { get; set; }
public List<ActionsStepTelemetry> StepsTelemetry { get; set; }
public List<StepResult> StepsResult { get; set; }
public List<Annotation> JobAnnotations { get; set; }
public List<JobTelemetry> JobTelemetry { get; set; }
public TaskOrchestrationPlanReference Plan { get; set; }
public List<string> PrependPath { get; set; }
Expand Down
2 changes: 1 addition & 1 deletion src/Runner.Worker/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ private async Task<TaskResult> CompleteJobAsync(IRunServer runServer, IExecution
{
try
{
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, result, jobContext.JobOutputs, jobContext.Global.StepsResult, default);
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, result, jobContext.JobOutputs, jobContext.Global.StepsResult, jobContext.Global.JobAnnotations, default);
return result;
}
catch (Exception ex)
Expand Down
35 changes: 35 additions & 0 deletions src/Sdk/RSWebApi/Contracts/Annotation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System.Runtime.Serialization;

namespace Sdk.RSWebApi.Contracts
{
[DataContract]
public struct Annotation
{
[DataMember(Name = "level", EmitDefaultValue = false)]
public AnnotationLevel Level;

[DataMember(Name = "message", EmitDefaultValue = false)]
public string Message;

[DataMember(Name = "rawDetails", EmitDefaultValue = false)]
public string RawDetails;

[DataMember(Name = "path", EmitDefaultValue = false)]
public string Path;

[DataMember(Name = "isInfrastructureIssue", EmitDefaultValue = false)]
public bool IsInfrastructureIssue;

[DataMember(Name = "startLine", EmitDefaultValue = false)]
public long StartLine;

[DataMember(Name = "endLine", EmitDefaultValue = false)]
public long EndLine;

[DataMember(Name = "startColumn", EmitDefaultValue = false)]
public long StartColumn;

[DataMember(Name = "endColumn", EmitDefaultValue = false)]
public long EndColumn;
}
}
20 changes: 20 additions & 0 deletions src/Sdk/RSWebApi/Contracts/AnnotationLevel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Runtime.Serialization;

namespace Sdk.RSWebApi.Contracts
{
[DataContract]
public enum AnnotationLevel
{
[EnumMember]
UNKNOWN = 0,

[EnumMember]
NOTICE = 1,

[EnumMember]
WARNING = 2,

[EnumMember]
FAILURE = 3
}
}
16 changes: 10 additions & 6 deletions src/Sdk/RSWebApi/Contracts/CompleteJobRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Runtime.Serialization;
using GitHub.DistributedTask.WebApi;
using Sdk.RSWebApi.Contracts;

namespace GitHub.Actions.RunService.WebApi
{
Expand All @@ -10,17 +11,20 @@ public class CompleteJobRequest
{
[DataMember(Name = "planId", EmitDefaultValue = false)]
public Guid PlanID { get; set; }

[DataMember(Name = "jobId", EmitDefaultValue = false)]
public Guid JobID { get; set; }

[DataMember(Name = "conclusion")]
public TaskResult Conclusion { get; set; }

[DataMember(Name = "outputs", EmitDefaultValue = false)]
public Dictionary<string, VariableValue> Outputs { get; set; }
public Dictionary<string, VariableValue> Outputs { get; set; }

[DataMember(Name = "stepResults", EmitDefaultValue = false)]
public IList<StepResult> StepResults { get; set; }

[DataMember(Name = "annotations", EmitDefaultValue = false)]
public IList<Annotation> Annotations { get; set; }
}
}
}
Loading

0 comments on commit 7b629c8

Please sign in to comment.