Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding IsHealthy check to LogWriter #14

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/MessageHandlers/TelemetryMetricHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ private void TelemetryMultiMetricHandler(MessageFormats.Common.TelemetryMultiMet

// Create a new response from the request message
MessageFormats.Common.TelemetryMetricResponse returnResponse = Core.Utils.ResponseFromRequest(message, new MessageFormats.Common.TelemetryMetricResponse());
returnResponse.ResponseHeader.Status = MessageFormats.Common.StatusCodes.Pending;

// If the MetricTime in the message is null, set it to the current UTC time
if (message.MetricTime == null) {
Expand Down Expand Up @@ -140,6 +141,13 @@ private void TelemetryMultiMetricHandler(MessageFormats.Common.TelemetryMultiMet
orig_request: message, orig_response: returnResponse,
pluginDelegate: _pluginDelegates.TelemetryMetricResponse);

if (output_response == null || output_request == null) {
_logger.LogTrace("Plugins nullified '{messageType}' or '{output_requestMessageType}' from '{sourceApp}'. Dropping Message (trackingId: '{trackingId}' / correlationId: '{correlationId}')", returnResponse.GetType().Name, message.GetType().Name, fullMessage.SourceAppId, message.RequestHeader.TrackingId, message.RequestHeader.CorrelationId);
return null;
} else {
returnResponse = output_response;
}

// Return the response
return returnResponse;
}
Expand Down
131 changes: 78 additions & 53 deletions src/Services/LogWriterService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
namespace Microsoft.Azure.SpaceFx.HostServices.Logging;

public partial class Services {
public class LogWriterService : BackgroundService {
public class LogWriterService : BackgroundService, Core.IMonitorableService {
private readonly ILogger<LogWriterService> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly Microsoft.Azure.SpaceFx.Core.Services.PluginLoader _pluginLoader;
Expand All @@ -13,6 +13,21 @@ public class LogWriterService : BackgroundService {
private readonly Models.APP_CONFIG _appConfig;
private readonly Core.Client _client;
private readonly string _outputDir;
public bool IsHealthy() {
var files = Directory.GetFiles(_outputDir);
// Check to make sure there's at least one log file and no more than two.
// One log file means there's a current log file, two means there's a current log file and a previous log file that hasn't been downlinked yet.
// This is expected behavior and means the service is writing logs and downlinking them correctly.
if (files.Length >= 1 && files.Length <= 2) {
return true;
}

// There's either:
// no log files: meaning the service isn't writing logs like its supposed to,
// there's more than two log files: meaning the service isn't downlinking logs like its supposed to.
// Both are failures and need a restart.
return false;
}
public LogWriterService(ILogger<LogWriterService> logger, IServiceProvider serviceProvider, Utils.PluginDelegates pluginDelegates, Core.Services.PluginLoader pluginLoader, Core.Client client) {
_logger = logger;
_serviceProvider = serviceProvider;
Expand All @@ -28,47 +43,45 @@ public LogWriterService(ILogger<LogWriterService> logger, IServiceProvider servi

protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
while (!stoppingToken.IsCancellationRequested) {
using (var scope = _serviceProvider.CreateScope()) {
string logFileName = GetLogFileName();

while (_logMessageQueue.Count > 0) {
MessageFormats.Common.LogMessage logMessage = _logMessageQueue.Take();
try {
// Call the plugins before we write
(MessageFormats.Common.LogMessage? output_request, string? fileName) preFileWrite =
_pluginLoader.CallPlugins<MessageFormats.Common.LogMessage?, Plugins.PluginBase, string>(
orig_request: logMessage, orig_response: logFileName,
pluginDelegate: _pluginDelegates.PreWriteToLog);

// Drop out of the call if our plugins removed the request
if (preFileWrite.output_request == null || preFileWrite.output_request == default(MessageFormats.Common.LogMessage)) {
return;
}

string jsonString = JsonSerializer.Serialize(logMessage, jsonOptions);

File.AppendAllLines(logFileName, new[] { jsonString });

// Call the plugins after we wrote
(MessageFormats.Common.LogMessage? output_request, string? fileName) postFileWrite =
_pluginLoader.CallPlugins<MessageFormats.Common.LogMessage?, Plugins.PluginBase, string>(
orig_request: logMessage, orig_response: logFileName,
pluginDelegate: _pluginDelegates.PostWriteToLog);

} catch (Exception ex) {
_logger.LogError("Failed to write log message. Error: {error}", ex.Message);
}
}
using var scope = _serviceProvider.CreateScope();
string logFileName = GetLogFileName();

// Process log messages in the queue
while (_logMessageQueue.Count > 0) {
var logMessage = _logMessageQueue.Take();
try {
await DownlinkLogFiles();
// Call pre-write plugins
(MessageFormats.Common.LogMessage? output_request, string? fileName) preFileWrite = _pluginLoader.CallPlugins<MessageFormats.Common.LogMessage?, Plugins.PluginBase, string>(
orig_request: logMessage, orig_response: logFileName, pluginDelegate: _pluginDelegates.PreWriteToLog);

// Skip if the pre-write plugin nullifies the request
if (preFileWrite.output_request == null) {
continue;
}

// Serialize the log message to JSON
string jsonString = JsonSerializer.Serialize(logMessage, jsonOptions);

// Append the serialized log message to the log file
await File.AppendAllTextAsync(logFileName, jsonString, stoppingToken);

// Call post-write plugins
_pluginLoader.CallPlugins<MessageFormats.Common.LogMessage?, Plugins.PluginBase, string>(
orig_request: logMessage, orig_response: logFileName, pluginDelegate: _pluginDelegates.PostWriteToLog);
} catch (Exception ex) {
_logger.LogError("Failed to downlink log files. Error: {error}", ex.Message);
_logger.LogError("Failed to process log message. Error: {error}", ex.Message);
}
}


await Task.Delay(_appConfig.HEARTBEAT_PULSE_TIMING_MS, stoppingToken);
try {
// Downlink log files
await DownlinkLogFiles();
} catch (Exception ex) {
_logger.LogError("Failed to downlink log files. Error: {error}", ex.Message);
}

// Wait for the specified heartbeat pulse timing before the next iteration
await Task.Delay(_appConfig.HEARTBEAT_PULSE_TIMING_MS, stoppingToken);
}
}

Expand All @@ -85,53 +98,65 @@ protected internal void QueueLogMessage(MessageFormats.Common.LogMessage logMess
}
}

/// <summary>
/// Calculate the log file name based on the current datetime, the maximum allowed time-to-live (TTL), and the maximum allowed size
/// </summary>
/// <returns>Full path to the expected log filename</returns>
internal string GetLogFileName() {

// Generate the initial log file name based on the current date and time
string currentFileName = string.Format($"msft-azure-orbital-{_logFileDateTime:dd-MM-yy-HH.mm.ss}.json");
string returnLogFileName = currentFileName; // Assume we're not cutting a new log file

// We've exceeded our max run time - cut a new log file
// Check if the current log file has exceeded the maximum allowed time-to-live (TTL)
if ((DateTime.UtcNow - _logFileDateTime).TotalMinutes > _appConfig.LOG_FILE_MAX_TTL.TotalMinutes) {
// Update the log file date-time to the current time
_logFileDateTime = DateTime.UtcNow;
// Generate a new log file name based on the updated date and time
returnLogFileName = string.Format($"msft-azure-orbital-{_logFileDateTime:dd-MM-yy-HH.mm.ss}.json");
}

// Log file will exceed the maximum size if we add another log message - cut a new log file
if (File.Exists(Path.Combine(_outputDir, returnLogFileName)) && (new FileInfo(Path.Combine(_outputDir, returnLogFileName)).Length / 1024) > (_appConfig.LOG_FILE_MAX_SIZE_KB * .9)) {
// Check if the current log file exists and if its size exceeds 90% of the maximum allowed size
if (File.Exists(Path.Combine(_outputDir, returnLogFileName)) &&
(new FileInfo(Path.Combine(_outputDir, returnLogFileName)).Length / 1024) > (_appConfig.LOG_FILE_MAX_SIZE_KB * .9)) {
// Update the log file date-time to the current time
_logFileDateTime = DateTime.UtcNow;
// Generate a new log file name based on the updated date and time
returnLogFileName = string.Format($"msft-azure-orbital-{_logFileDateTime:dd-MM-yy-HH.mm.ss}.json");
}

// Return the full path of the log file
return Path.Combine(_outputDir, returnLogFileName);
}

/// <summary>
/// Downlink all but the current log file to Platform MTS
/// </summary>
internal Task DownlinkLogFiles() => Task.Run(async () => {
string currentLogFileName = GetLogFileName();
foreach (string file in Directory.GetFiles(_outputDir)) {
if (file == currentLogFileName) continue; // Don't send the current log file
_logger.LogDebug("Current log file: '{currentLogFileName}'", currentLogFileName);
_logger.LogInformation("Downlinking '{currentFileName}'", file);
var filesToDownlink = Directory.GetFiles(_outputDir).Where(file => file != currentLogFileName);

// Need this deployment does not delete yaml
MessageFormats.HostServices.Link.LinkRequest linkRequest = new() {
foreach (string file in filesToDownlink) {
_logger.LogInformation("Downlinking '{file}'", file);

var linkRequest = new MessageFormats.HostServices.Link.LinkRequest {
DestinationAppId = $"platform-{nameof(MessageFormats.Common.PlatformServices.Mts).ToLower()}",
ExpirationTime = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(DateTime.UtcNow.AddHours(12)),
Subdirectory = "logs",
FileName = System.IO.Path.GetFileName(file),
FileName = Path.GetFileName(file),
LeaveSourceFile = false,
LinkType = MessageFormats.HostServices.Link.LinkRequest.Types.LinkType.Downlink,
Priority = MessageFormats.Common.Priority.Medium,
RequestHeader = new() {
TrackingId = Guid.NewGuid().ToString(),
CorrelationId = Guid.NewGuid().ToString()
RequestHeader = new MessageFormats.Common.RequestHeader {
TrackingId = Guid.NewGuid().ToString()
}
};

linkRequest.RequestHeader.CorrelationId = linkRequest.RequestHeader.TrackingId;

await _client.DirectToApp(appId: $"hostsvc-{nameof(MessageFormats.Common.HostServices.Link).ToLower()}", message: linkRequest);

_logger.LogDebug("Downlink of '{currentFileName}' complete.", file);
_logger.LogDebug("Downlink of '{file}' complete.", file);
}
});


}
}
Loading