From 203bd467e6849825f3823c435f9a91691bb34f0f Mon Sep 17 00:00:00 2001 From: Henry van der Vegte Date: Wed, 18 Sep 2024 16:13:39 +0200 Subject: [PATCH] [IngestionClient] Use dependency injection for BatchClient, add User-Agent request header (#2592) --- .../ingestion-client/Connector/BatchClient.cs | 53 +++++++++++-------- .../FetchTranscription/Config/AppConfig.cs | 2 + .../FetchTranscription/FetchTranscription.cs | 11 +++- .../FetchTranscription/Program.cs | 11 ++++ .../TranscriptionProcessor.cs | 22 ++++---- .../Config/AppConfig.cs | 2 + .../StartTranscriptionByTimer/Program.cs | 11 ++++ .../StartTranscriptionHelper.cs | 6 ++- .../ingestion-client/infra/main.bicep | 4 +- .../ingestion-client/infra/main.json | 10 ++-- 10 files changed, 93 insertions(+), 39 deletions(-) diff --git a/samples/ingestion/ingestion-client/Connector/BatchClient.cs b/samples/ingestion/ingestion-client/Connector/BatchClient.cs index c5791a3e6..4fc471942 100644 --- a/samples/ingestion/ingestion-client/Connector/BatchClient.cs +++ b/samples/ingestion/ingestion-client/Connector/BatchClient.cs @@ -17,7 +17,7 @@ namespace Connector using Polly; using Polly.Retry; - public static class BatchClient + public class BatchClient { private const string TranscriptionsBasePath = "speechtotext/v3.0/Transcriptions/"; @@ -29,36 +29,42 @@ public static class BatchClient private static readonly TimeSpan GetFilesTimeout = TimeSpan.FromMinutes(5); - private static readonly HttpClient HttpClient = new HttpClient() { Timeout = Timeout.InfiniteTimeSpan }; - private static readonly AsyncRetryPolicy RetryPolicy = Policy .Handle(e => e is HttpStatusCodeException || e is HttpRequestException) .WaitAndRetryAsync(MaxNumberOfRetries, retryAttempt => TimeSpan.FromSeconds(5)); - public static Task GetTranscriptionReportFileFromSasAsync(string sasUri) + private readonly HttpClient httpClient; + + public BatchClient(IHttpClientFactory httpClientFactory) + { + ArgumentNullException.ThrowIfNull(httpClientFactory, nameof(httpClientFactory)); + this.httpClient = httpClientFactory.CreateClient(nameof(BatchClient)); + } + + public Task GetTranscriptionReportFileFromSasAsync(string sasUri) { - return GetAsync(sasUri, null, DefaultTimeout); + return this.GetAsync(sasUri, null, DefaultTimeout); } - public static Task GetSpeechTranscriptFromSasAsync(string sasUri) + public Task GetSpeechTranscriptFromSasAsync(string sasUri) { - return GetAsync(sasUri, null, DefaultTimeout); + return this.GetAsync(sasUri, null, DefaultTimeout); } - public static Task GetTranscriptionAsync(string transcriptionLocation, string subscriptionKey) + public Task GetTranscriptionAsync(string transcriptionLocation, string subscriptionKey) { - return GetAsync(transcriptionLocation, subscriptionKey, DefaultTimeout); + return this.GetAsync(transcriptionLocation, subscriptionKey, DefaultTimeout); } - public static async Task GetTranscriptionFilesAsync(string transcriptionLocation, string subscriptionKey) + public async Task GetTranscriptionFilesAsync(string transcriptionLocation, string subscriptionKey) { var path = $"{transcriptionLocation}/files"; var combinedTranscriptionFiles = new List(); do { - var transcriptionFiles = await GetAsync(path, subscriptionKey, GetFilesTimeout).ConfigureAwait(false); + var transcriptionFiles = await this.GetAsync(path, subscriptionKey, GetFilesTimeout).ConfigureAwait(false); combinedTranscriptionFiles.AddRange(transcriptionFiles.Values); path = transcriptionFiles.NextLink; } @@ -67,39 +73,39 @@ public static async Task GetTranscriptionFilesAsync(string t return new TranscriptionFiles(combinedTranscriptionFiles, null); } - public static Task DeleteTranscriptionAsync(string transcriptionLocation, string subscriptionKey) + public Task DeleteTranscriptionAsync(string transcriptionLocation, string subscriptionKey) { - return DeleteAsync(transcriptionLocation, subscriptionKey, DefaultTimeout); + return this.DeleteAsync(transcriptionLocation, subscriptionKey, DefaultTimeout); } - public static async Task PostTranscriptionAsync(TranscriptionDefinition transcriptionDefinition, string hostName, string subscriptionKey) + public async Task PostTranscriptionAsync(TranscriptionDefinition transcriptionDefinition, string hostName, string subscriptionKey) { var path = $"{hostName}{TranscriptionsBasePath}"; var payloadString = JsonConvert.SerializeObject(transcriptionDefinition); - return await PostAsync(path, subscriptionKey, payloadString, PostTimeout).ConfigureAwait(false); + return await this.PostAsync(path, subscriptionKey, payloadString, PostTimeout).ConfigureAwait(false); } - private static async Task PostAsync(string path, string subscriptionKey, string payloadString, TimeSpan timeout) + private async Task PostAsync(string path, string subscriptionKey, string payloadString, TimeSpan timeout) { - var responseMessage = await SendHttpRequestMessage(HttpMethod.Post, path, subscriptionKey, payloadString, timeout).ConfigureAwait(false); + var responseMessage = await this.SendHttpRequestMessage(HttpMethod.Post, path, subscriptionKey, payloadString, timeout).ConfigureAwait(false); return responseMessage.Headers.Location; } - private static async Task DeleteAsync(string path, string subscriptionKey, TimeSpan timeout) + private async Task DeleteAsync(string path, string subscriptionKey, TimeSpan timeout) { - await SendHttpRequestMessage(HttpMethod.Delete, path, subscriptionKey, payload: null, timeout: timeout).ConfigureAwait(false); + await this.SendHttpRequestMessage(HttpMethod.Delete, path, subscriptionKey, payload: null, timeout: timeout).ConfigureAwait(false); } - private static async Task GetAsync(string path, string subscriptionKey, TimeSpan timeout) + private async Task GetAsync(string path, string subscriptionKey, TimeSpan timeout) { - var responseMessage = await SendHttpRequestMessage(HttpMethod.Get, path, subscriptionKey, payload: null, timeout: timeout).ConfigureAwait(false); + var responseMessage = await this.SendHttpRequestMessage(HttpMethod.Get, path, subscriptionKey, payload: null, timeout: timeout).ConfigureAwait(false); var contentString = await responseMessage.Content.ReadAsStringAsync().ConfigureAwait(false); return JsonConvert.DeserializeObject(contentString); } - private static async Task SendHttpRequestMessage(HttpMethod httpMethod, string path, string subscriptionKey, string payload, TimeSpan timeout) + private async Task SendHttpRequestMessage(HttpMethod httpMethod, string path, string subscriptionKey, string payload, TimeSpan timeout) { try { @@ -110,6 +116,7 @@ private static async Task SendHttpRequestMessage(HttpMethod async (token) => { using var httpRequestMessage = new HttpRequestMessage(httpMethod, path); + if (!string.IsNullOrEmpty(subscriptionKey)) { httpRequestMessage.Headers.Add("Ocp-Apim-Subscription-Key", subscriptionKey); @@ -120,7 +127,7 @@ private static async Task SendHttpRequestMessage(HttpMethod httpRequestMessage.Content = new StringContent(payload, Encoding.UTF8, "application/json"); } - var responseMessage = await HttpClient.SendAsync(httpRequestMessage, token).ConfigureAwait(false); + var responseMessage = await this.httpClient.SendAsync(httpRequestMessage, token).ConfigureAwait(false); await responseMessage.EnsureSuccessStatusCodeAsync().ConfigureAwait(false); return responseMessage; diff --git a/samples/ingestion/ingestion-client/FetchTranscription/Config/AppConfig.cs b/samples/ingestion/ingestion-client/FetchTranscription/Config/AppConfig.cs index 1bdea2cfd..d890e414a 100644 --- a/samples/ingestion/ingestion-client/FetchTranscription/Config/AppConfig.cs +++ b/samples/ingestion/ingestion-client/FetchTranscription/Config/AppConfig.cs @@ -104,5 +104,7 @@ public int RetryLimit public bool CreateAudioProcessedContainer { get; set; } public string AudioProcessedContainer { get; set; } + + public string Version { get; set; } } } \ No newline at end of file diff --git a/samples/ingestion/ingestion-client/FetchTranscription/FetchTranscription.cs b/samples/ingestion/ingestion-client/FetchTranscription/FetchTranscription.cs index b48112db2..14dc3bdfb 100644 --- a/samples/ingestion/ingestion-client/FetchTranscription/FetchTranscription.cs +++ b/samples/ingestion/ingestion-client/FetchTranscription/FetchTranscription.cs @@ -27,6 +27,7 @@ public class FetchTranscription private readonly IStorageConnector storageConnector; private readonly IAzureClientFactory serviceBusClientFactory; private readonly ILogger logger; + private readonly BatchClient batchClient; private readonly AppConfig appConfig; /// @@ -36,18 +37,21 @@ public class FetchTranscription /// The FetchTranscription logger. /// Storage Connector dependency /// Azure client factory for service bus clients + /// The client to call the Azure Speech-To-Text batch API /// Environment configuration public FetchTranscription( IServiceProvider serviceProvider, ILogger logger, IStorageConnector storageConnector, IAzureClientFactory serviceBusClientFactory, + BatchClient batchClient, IOptions appConfig) { this.serviceProvider = serviceProvider; this.logger = logger; this.storageConnector = storageConnector; this.serviceBusClientFactory = serviceBusClientFactory; + this.batchClient = batchClient; this.appConfig = appConfig?.Value; } @@ -72,7 +76,12 @@ public async Task Run([ServiceBusTrigger("fetch_transcription_queue", Connection var databaseContext = this.appConfig.UseSqlDatabase ? this.serviceProvider.GetRequiredService() : null; - var transcriptionProcessor = new TranscriptionProcessor(this.storageConnector, this.serviceBusClientFactory, databaseContext, Options.Create(this.appConfig)); + var transcriptionProcessor = new TranscriptionProcessor( + this.storageConnector, + this.serviceBusClientFactory, + databaseContext, + this.batchClient, + Options.Create(this.appConfig)); await transcriptionProcessor.ProcessTranscriptionJobAsync(serviceBusMessage, this.serviceProvider, this.logger).ConfigureAwait(false); } diff --git a/samples/ingestion/ingestion-client/FetchTranscription/Program.cs b/samples/ingestion/ingestion-client/FetchTranscription/Program.cs index 2926ba383..29dc17a02 100644 --- a/samples/ingestion/ingestion-client/FetchTranscription/Program.cs +++ b/samples/ingestion/ingestion-client/FetchTranscription/Program.cs @@ -6,6 +6,7 @@ namespace FetchTranscription { using System.IO; + using System.Threading; using Azure.Storage; using Azure.Storage.Blobs; @@ -70,6 +71,16 @@ public static void Main(string[] args) .WithName(ServiceBusClientName.CompletedTranscriptionServiceBusClient.ToString()); } }); + + services.AddHttpClient(nameof(BatchClient), httpClient => + { + // timeouts are managed by BatchClient directly: + httpClient.Timeout = Timeout.InfiniteTimeSpan; + httpClient.DefaultRequestHeaders.UserAgent.ParseAdd($"Ingestion Client ({config.Version})"); + }); + + services.AddSingleton(); + services.Configure(configuration); }) .Build(); diff --git a/samples/ingestion/ingestion-client/FetchTranscription/TranscriptionProcessor.cs b/samples/ingestion/ingestion-client/FetchTranscription/TranscriptionProcessor.cs index faa4c7f10..0bf30be3a 100644 --- a/samples/ingestion/ingestion-client/FetchTranscription/TranscriptionProcessor.cs +++ b/samples/ingestion/ingestion-client/FetchTranscription/TranscriptionProcessor.cs @@ -39,16 +39,20 @@ public class TranscriptionProcessor private readonly IStorageConnector storageConnector; + private readonly BatchClient batchClient; + private readonly AppConfig appConfig; public TranscriptionProcessor( IStorageConnector storageConnector, IAzureClientFactory serviceBusClientFactory, IngestionClientDbContext databaseContext, + BatchClient batchClient, IOptions appConfig) { this.storageConnector = storageConnector; this.databaseContext = databaseContext; + this.batchClient = batchClient; this.appConfig = appConfig?.Value; ArgumentNullException.ThrowIfNull(serviceBusClientFactory, nameof(serviceBusClientFactory)); @@ -86,7 +90,7 @@ public async Task ProcessTranscriptionJobAsync(TranscriptionStartedMessage servi try { - var transcription = await BatchClient.GetTranscriptionAsync(transcriptionLocation, subscriptionKey).ConfigureAwait(false); + var transcription = await this.batchClient.GetTranscriptionAsync(transcriptionLocation, subscriptionKey).ConfigureAwait(false); log.LogInformation($"Polled {serviceBusMessage.PollingCounter} time(s) for results in total, delay job for {messageDelayTime.TotalMinutes} minutes if not completed."); switch (transcription.Status) { @@ -189,13 +193,13 @@ private async Task ProcessFailedTranscriptionAsync(string transcriptionLocation, log.LogInformation(logMessage); - var transcriptionFiles = await BatchClient.GetTranscriptionFilesAsync(transcriptionLocation, subscriptionKey).ConfigureAwait(false); + var transcriptionFiles = await this.batchClient.GetTranscriptionFilesAsync(transcriptionLocation, subscriptionKey).ConfigureAwait(false); var errorReportOutput = logMessage; var reportFile = transcriptionFiles.Values.Where(t => t.Kind == TranscriptionFileKind.TranscriptionReport).FirstOrDefault(); if (reportFile?.Links?.ContentUrl != null) { - var reportFileContent = await BatchClient.GetTranscriptionReportFileFromSasAsync(reportFile.Links.ContentUrl).ConfigureAwait(false); + var reportFileContent = await this.batchClient.GetTranscriptionReportFileFromSasAsync(reportFile.Links.ContentUrl).ConfigureAwait(false); errorReportOutput += $"\nReport file: \n {JsonConvert.SerializeObject(reportFileContent)}"; } @@ -237,7 +241,7 @@ await this.storageConnector.MoveFileAsync( } } - await BatchClient.DeleteTranscriptionAsync(transcriptionLocation, subscriptionKey).ConfigureAwait(false); + await this.batchClient.DeleteTranscriptionAsync(transcriptionLocation, subscriptionKey).ConfigureAwait(false); } private async Task ProcessReportFileAsync(TranscriptionReportFile transcriptionReportFile, ILogger log) @@ -290,7 +294,7 @@ private async Task RetryOrFailJobAsync(TranscriptionStartedMessage message, stri else { await this.WriteFailedJobLogToStorageAsync(message, errorMessage, jobName, log).ConfigureAwait(false); - await BatchClient.DeleteTranscriptionAsync(transcriptionLocation, subscriptionKey).ConfigureAwait(false); + await this.batchClient.DeleteTranscriptionAsync(transcriptionLocation, subscriptionKey).ConfigureAwait(false); } } @@ -347,7 +351,7 @@ private async Task ProcessSucceededTranscriptionAsync(string transcriptionLocati return; } - var transcriptionFiles = await BatchClient.GetTranscriptionFilesAsync(transcriptionLocation, subscriptionKey).ConfigureAwait(false); + var transcriptionFiles = await this.batchClient.GetTranscriptionFilesAsync(transcriptionLocation, subscriptionKey).ConfigureAwait(false); log.LogInformation($"Received transcription files."); var resultFiles = transcriptionFiles.Values.Where(t => t.Kind == TranscriptionFileKind.Transcription); @@ -360,7 +364,7 @@ private async Task ProcessSucceededTranscriptionAsync(string transcriptionLocati try { - var transcriptionResult = await BatchClient.GetSpeechTranscriptFromSasAsync(resultFile.Links.ContentUrl).ConfigureAwait(false); + var transcriptionResult = await this.batchClient.GetSpeechTranscriptFromSasAsync(resultFile.Links.ContentUrl).ConfigureAwait(false); if (string.IsNullOrEmpty(transcriptionResult.Source)) { @@ -522,10 +526,10 @@ await this.databaseContext.StoreTranscriptionAsync( } var reportFile = transcriptionFiles.Values.Where(t => t.Kind == TranscriptionFileKind.TranscriptionReport).FirstOrDefault(); - var reportFileContent = await BatchClient.GetTranscriptionReportFileFromSasAsync(reportFile.Links.ContentUrl).ConfigureAwait(false); + var reportFileContent = await this.batchClient.GetTranscriptionReportFileFromSasAsync(reportFile.Links.ContentUrl).ConfigureAwait(false); await this.ProcessReportFileAsync(reportFileContent, log).ConfigureAwait(false); - BatchClient.DeleteTranscriptionAsync(transcriptionLocation, subscriptionKey).ConfigureAwait(false).GetAwaiter().GetResult(); + this.batchClient.DeleteTranscriptionAsync(transcriptionLocation, subscriptionKey).ConfigureAwait(false).GetAwaiter().GetResult(); } } } diff --git a/samples/ingestion/ingestion-client/StartTranscriptionByTimer/Config/AppConfig.cs b/samples/ingestion/ingestion-client/StartTranscriptionByTimer/Config/AppConfig.cs index 672e0d552..fcd746d70 100644 --- a/samples/ingestion/ingestion-client/StartTranscriptionByTimer/Config/AppConfig.cs +++ b/samples/ingestion/ingestion-client/StartTranscriptionByTimer/Config/AppConfig.cs @@ -79,5 +79,7 @@ public int InitialPollingDelayInMinutes public string StartTranscriptionServiceBusConnectionString { get; set; } public string StartTranscriptionFunctionTimeInterval { get; set; } + + public string Version { get; set; } } } \ No newline at end of file diff --git a/samples/ingestion/ingestion-client/StartTranscriptionByTimer/Program.cs b/samples/ingestion/ingestion-client/StartTranscriptionByTimer/Program.cs index 71adb21ad..c30f132df 100644 --- a/samples/ingestion/ingestion-client/StartTranscriptionByTimer/Program.cs +++ b/samples/ingestion/ingestion-client/StartTranscriptionByTimer/Program.cs @@ -6,6 +6,7 @@ namespace StartTranscriptionByTimer { using System.IO; + using System.Threading; using Azure.Storage; using Azure.Storage.Blobs; @@ -56,6 +57,16 @@ public static void Main(string[] args) clientBuilder.AddServiceBusClient(config.FetchTranscriptionServiceBusConnectionString) .WithName(ServiceBusClientName.FetchTranscriptionServiceBusClient.ToString()); }); + + services.AddHttpClient(nameof(BatchClient), httpClient => + { + // timeouts are managed by BatchClient directly: + httpClient.Timeout = Timeout.InfiniteTimeSpan; + httpClient.DefaultRequestHeaders.UserAgent.ParseAdd($"Ingestion Client ({config.Version})"); + }); + + services.AddSingleton(); + services.Configure(configuration); }) .Build(); diff --git a/samples/ingestion/ingestion-client/StartTranscriptionByTimer/StartTranscriptionHelper.cs b/samples/ingestion/ingestion-client/StartTranscriptionByTimer/StartTranscriptionHelper.cs index b89da2d3b..15112ada4 100644 --- a/samples/ingestion/ingestion-client/StartTranscriptionByTimer/StartTranscriptionHelper.cs +++ b/samples/ingestion/ingestion-client/StartTranscriptionByTimer/StartTranscriptionHelper.cs @@ -41,16 +41,20 @@ public class StartTranscriptionHelper : IStartTranscriptionHelper private readonly IStorageConnector storageConnector; + private readonly BatchClient batchClient; + private readonly AppConfig appConfig; public StartTranscriptionHelper( ILogger logger, IStorageConnector storageConnector, IAzureClientFactory serviceBusClientFactory, + BatchClient batchClient, IOptions appConfig) { this.logger = logger; this.storageConnector = storageConnector; + this.batchClient = batchClient; this.appConfig = appConfig?.Value; this.locale = this.appConfig.Locale.Split('|')[0].Trim(); @@ -229,7 +233,7 @@ private async Task StartBatchTranscriptionJobAsync(IEnumerable