Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for settlement from the isolated worker extension #38865

Merged
merged 15 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion eng/Packages.Data.props
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,17 @@
<PackageReference Update="Apache.Avro" Version="1.11.0" />
<PackageReference Update="CloudNative.CloudEvents" Version="2.0.0" />
<PackageReference Update="CloudNative.CloudEvents.SystemTextJson" Version="2.0.0" />
<PackageReference Update="Google.Protobuf" Version="3.24.3" />
<PackageReference Update="Grpc.Tools" Version="2.51.0" PrivateAssets="all" />
<PackageReference Update="MessagePack" Version="1.9.11" />
<PackageReference Update="Microsoft.AspNetCore.SignalR.Protocols.MessagePack" Version="1.1.5" />
<PackageReference Update="Microsoft.Azure.SignalR" Version="1.21.6" />
<PackageReference Update="Microsoft.Azure.SignalR.Management" Version="1.21.6" />
<PackageReference Update="Microsoft.Azure.SignalR.Protocols" Version="1.21.6" />
<PackageReference Update="Microsoft.Azure.SignalR.Serverless.Protocols" Version="1.9.0" />
<PackageReference Update="Microsoft.Azure.WebJobs" Version="3.0.37" />
<PackageReference Update="Microsoft.Azure.WebJobs.Sources" Version="3.0.37" />
<PackageReference Update="Microsoft.Azure.WebJobs.Sources" Version="3.0.37" PrivateAssets="All"/>
<PackageReference Update="Microsoft.Azure.WebJobs.Extensions.Rpc" Version="3.0.37" />
<PackageReference Update="Microsoft.Azure.WebJobs.Host.Storage" Version="5.0.0" />
<PackageReference Update="Microsoft.Spatial" Version="7.5.3" />
<PackageReference Update="Newtonsoft.Json" Version="10.0.3" />
Expand Down
10 changes: 5 additions & 5 deletions sdk/core/Azure.Core/src/Shared/MessagingClientDiagnostics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ public DiagnosticScope CreateScope(
/// <param name="traceparent">The trace parent of the message.</param>
/// <param name="tracestate">The trace state of the message.</param>
/// <returns><c>true</c> if the message properties contained the diagnostic id; otherwise, <c>false</c>.</returns>
public static bool TryExtractTraceContext(IReadOnlyDictionary<string, object> properties, out string? traceparent, out string? tracestate)
public static bool TryExtractTraceContext(IReadOnlyDictionary<string, object?> properties, out string? traceparent, out string? tracestate)
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
traceparent = null;
tracestate = null;

if (ActivityExtensions.SupportsActivitySource && properties.TryGetValue(TraceParent, out var traceParent) && traceParent is string traceParentString)
{
traceparent = traceParentString;
if (properties.TryGetValue(TraceState, out object state) && state is string stateString)
if (properties.TryGetValue(TraceState, out object? state) && state is string stateString)
{
tracestate = stateString;
}
Expand All @@ -126,15 +126,15 @@ public static bool TryExtractTraceContext(IReadOnlyDictionary<string, object> pr
/// <param name="traceparent">The trace parent of the message.</param>
/// <param name="tracestate">The trace state of the message.</param>
/// <returns><c>true</c> if the message properties contained the diagnostic id; otherwise, <c>false</c>.</returns>
public static bool TryExtractTraceContext(IDictionary<string, object> properties, out string? traceparent, out string? tracestate)
public static bool TryExtractTraceContext(IDictionary<string, object?> properties, out string? traceparent, out string? tracestate)
{
traceparent = null;
tracestate = null;

if (ActivityExtensions.SupportsActivitySource && properties.TryGetValue(TraceParent, out var traceParent) && traceParent is string traceParentString)
{
traceparent = traceParentString;
if (properties.TryGetValue(TraceState, out object state) && state is string stateString)
if (properties.TryGetValue(TraceState, out object? state) && state is string stateString)
{
tracestate = stateString;
}
Expand All @@ -158,7 +158,7 @@ public static bool TryExtractTraceContext(IDictionary<string, object> properties
/// <param name="activityName">The activity name to use for the diagnostic scope.</param>
/// <param name="traceparent">The traceparent that was either added, or that already existed in the message properties.</param>
/// <param name="tracestate">The tracestate that was either added, or that already existed in the message properties.</param>
public void InstrumentMessage(IDictionary<string, object> properties, string activityName, out string? traceparent, out string? tracestate)
public void InstrumentMessage(IDictionary<string, object?> properties, string activityName, out string? traceparent, out string? tracestate)
{
traceparent = null;
tracestate = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public bool Equals(MessagingDiagnosticOperation other)
return _operation == other._operation;
}

public override bool Equals(object obj)
public override bool Equals(object? obj)
{
return obj is MessagingDiagnosticOperation other && Equals(other);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
using System.Linq;
using System.Net;
using Microsoft.Azure.WebJobs;
#if NET6_0_OR_GREATER
using Microsoft.Azure.WebJobs.Extensions.Rpc;
using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Grpc;
#endif
using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config;
using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Listeners;
using Microsoft.Azure.WebJobs.Host.Scale;
Expand All @@ -27,7 +31,6 @@ public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder)
}

builder.AddServiceBus(p => { });

return builder;
}

Expand Down Expand Up @@ -99,11 +102,18 @@ public static IWebJobsBuilder AddServiceBus(this IWebJobsBuilder builder, Action
}

configure(options);
});
})
#if NET6_0_OR_GREATER
.MapWorkerGrpcService<SettlementService>()
#endif
;

builder.Services.AddAzureClientsCore();
builder.Services.TryAddSingleton<MessagingProvider>();
builder.Services.AddSingleton<ServiceBusClientFactory>();
#if NET6_0_OR_GREATER
builder.Services.AddSingleton<SettlementService>();
#endif
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

#if NET6_0_OR_GREATER
using Microsoft.Azure.ServiceBus.Grpc;

namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Grpc
{
internal static class SettlementExtensions
{
internal static object GetPropertyValue(this SettlementProperties properties)
{
return properties.ValuesCase switch
{
SettlementProperties.ValuesOneofCase.LongValue => properties.LongValue,
SettlementProperties.ValuesOneofCase.UlongValue => properties.UlongValue,
SettlementProperties.ValuesOneofCase.DoubleValue => properties.DoubleValue,
SettlementProperties.ValuesOneofCase.FloatValue => properties.FloatValue,
SettlementProperties.ValuesOneofCase.IntValue => properties.IntValue,
SettlementProperties.ValuesOneofCase.UintValue => properties.UintValue,
SettlementProperties.ValuesOneofCase.BoolValue => properties.BoolValue,
SettlementProperties.ValuesOneofCase.StringValue => properties.StringValue,
_ => null
};
}
}
}
#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
syntax = "proto3";

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

// this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic
option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc";

// The settlement service definition.
service Settlement {
// Completes a message
rpc Complete (CompleteRequest) returns (google.protobuf.Empty) {}

// Abandons a message
rpc Abandon (AbandonRequest) returns (google.protobuf.Empty) {}

// Deadletters a message
rpc Deadletter (DeadletterRequest) returns (google.protobuf.Empty) {}

// Defers a message
rpc Defer (DeferRequest) returns (google.protobuf.Empty) {}
}

// The complete message request containing the locktoken.
message CompleteRequest {
string locktoken = 1;
}

// The abandon message request containing the locktoken and properties to modify.
message AbandonRequest {
string locktoken = 1;
map<string, SettlementProperties> propertiesToModify = 2;
}

// The deadletter message request containing the locktoken and properties to modify along with the reason/description.
message DeadletterRequest {
string locktoken = 1;
map<string, SettlementProperties> propertiesToModify = 2;
string deadletterReason = 3;
string deadletterErrorDescription = 4;
}

// The defer message request containing the locktoken and properties to modify.
message DeferRequest {
string locktoken = 1;
map<string, SettlementProperties> propertiesToModify = 2;
}

// The settlement property can be of any type listed below, which
// corresponds to the types specified in
// https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.servicebus.servicebusmessage.applicationproperties?view=azure-dotnet#remarks
// Note: this list doesn't match 1:1 with the supported Service Bus types, so compatible types are used in some cases - e.g.
// short uses int, TimeSpan uses string, etc. The full list of transforms can be found in the isolated worker extension source code.
message SettlementProperties {
oneof values {
string stringValue = 1;
int32 intValue = 2;
uint32 uintValue = 3;
int64 longValue = 4;
uint64 ulongValue = 5;
bool boolValue = 6;
float floatValue = 7;
double doubleValue = 8;
google.protobuf.Timestamp timestampValue = 9;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

#if NET6_0_OR_GREATER
using System;
using System.Linq;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Microsoft.Azure.ServiceBus.Grpc;
using Microsoft.Azure.WebJobs.ServiceBus;

namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Grpc
{
internal class SettlementService : Settlement.SettlementBase
{
private readonly MessagingProvider _provider;

public SettlementService(MessagingProvider provider)
{
_provider = provider;
}

public SettlementService()
{
_provider = null;
}

public override async Task<Empty> Complete(CompleteRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
await tuple.Actions.CompleteMessageAsync(
tuple.Message,
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

public override async Task<Empty> Abandon(AbandonRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
await tuple.Actions.AbandonMessageAsync(
tuple.Message,
request.PropertiesToModify.ToDictionary(
pair => pair.Key,
pair => pair.Value.GetPropertyValue()),
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

public override async Task<Empty> Defer(DeferRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
await tuple.Actions.DeferMessageAsync(
tuple.Message,
request.PropertiesToModify.ToDictionary(
pair => pair.Key,
pair => pair.Value.GetPropertyValue()),
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}

public override async Task<Empty> Deadletter(DeadletterRequest request, ServerCallContext context)
{
if (_provider.ActionsCache.TryGetValue(request.Locktoken, out var tuple))
{
await tuple.Actions.DeadLetterMessageAsync(
tuple.Message,
request.PropertiesToModify.ToDictionary(
pair => pair.Key,
pair => pair.Value.GetPropertyValue()),
request.DeadletterReason,
request.DeadletterErrorDescription,
context.CancellationToken).ConfigureAwait(false);
return new Empty();
}
throw new RpcException (new Status(StatusCode.FailedPrecondition, $"LockToken {request.Locktoken} not found."));
}
}
}
#endif
Loading