From c61c861b0ac7fb7f0222f1448038c747342bd014 Mon Sep 17 00:00:00 2001 From: Marc Schier Date: Thu, 5 Sep 2024 15:44:57 +0200 Subject: [PATCH] Split subscriptions per max items per subscription limits (#2336) * Re-use subscription when single subscriber * Split subscription * #2332 * #2333 * #2337 --- docs/opc-publisher/commandline.md | 8 +- docs/opc-publisher/readme.md | 14 +- .../src/Runtime/CommandLine.cs | 3 + .../AdvancedPubSubIntegrationTests.cs | 11 +- .../BasicPubSubIntegrationTests.cs | 10 +- .../BasicSamplesIntegrationTests.cs | 19 +- .../Stack/Runtime/OpcUaSubscriptionConfig.cs | 3 + .../Stack/Runtime/OpcUaSubscriptionOptions.cs | 6 + .../Services/OpcUaClient.Subscription.cs | 186 ++- .../src/Stack/Services/OpcUaClient.cs | 68 +- .../src/Stack/Services/OpcUaClientManager.cs | 30 +- .../src/Stack/Services/OpcUaSession.cs | 6 +- .../src/Stack/Services/OpcUaSubscription.cs | 1376 ++++++++++------- 13 files changed, 1096 insertions(+), 644 deletions(-) diff --git a/docs/opc-publisher/commandline.md b/docs/opc-publisher/commandline.md index 7b593833cd..c1b8e8ab2c 100644 --- a/docs/opc-publisher/commandline.md +++ b/docs/opc-publisher/commandline.md @@ -261,7 +261,7 @@ Messaging configuration subscription setup. Set to `0` to block until metadata is loaded. Only used if meta data is supported and enabled. - Default: `not set` (block forever). + Default: `5000` milliseconds. --ps, --publishschemas, --PublishMessageSchema[=VALUE] Publish the Avro or Json message schemas to schema registry or subtopics. @@ -693,6 +693,12 @@ Subscription settings client reads instead of subscriptions services, unless otherwise configured. Default: `false`. + --xmi, --maxmonitoreditems, --MaxMonitoredItemPerSubscription=VALUE + Max monitored items per subscription until the + subscription is split. + This is used if the server does not provide + limits in its server capabilities. + Default: `not set`. --da, --deferredacks, --UseDeferredAcknoledgements[=VALUE] (Experimental) Acknoledge subscription notifications only when the data has been diff --git a/docs/opc-publisher/readme.md b/docs/opc-publisher/readme.md index c1a400b592..10539758b7 100644 --- a/docs/opc-publisher/readme.md +++ b/docs/opc-publisher/readme.md @@ -714,11 +714,19 @@ The OPC UA subscription/monitored items service due to its async model (server s ### Overcoming server limits and interop limitations -OPC UA servers can limit the amount of sessions, subscriptions or publishing requests allowed at any point in time. OPC Publisher has several options to overcome these limits that can be used to tune and overcome interoperability issues. +OPC UA servers can be limited with regards to the amount of sessions, subscriptions or publishing requests they support. By default OPC Publisher tries to bundle as many writers with the same subscription configuration (including the publishing interval) into a OPC UA subscription inside a (writer group) session. It uses the `MaxMonitoredItemsPerSubscription` limit provided by in the Server capabilities object read by OPC Publisher when the session is created to create the right number of subscriptions that hold as many monitored items as possible. If the limit a not provided by the server or is 0, OPC Publisher uses a default value of `65536`. This value can be overridden using the `--xmi --maxmonitoreditems` [command line](./commandline.md) option. -- To minimize the number of sessions against a server, the default behavior of creating a session per writer group can be overridden using the `--dsg, --disablesessionpergroup` [command line](./commandline.md) option which results in a *session per endpoint* spanning multiple writer groups with the same endpoint url and configuration. +OPC Publisher has several options to overcome additional server limitations and that can be used to tune and overcome interoperability issues. -- To further limit the number of subscriptions avoid specifying different publishing intervals for items as each publishing interval will result in its own subscription. You can use the `--ipi, --ignorepublishingintervals` command line option to *ignore publishing interval configuration* in the JSON configuration and use the publishing interval configured using the `--op` command line option (default: 1 second). In addition you can set the `--op=0` to let the server decide the smallest publishing interval it offers. You can also use the `--aq, --autosetqueuesize` option to let OPC Publisher calculate the best queue size for monitored items in the subscription to limit data loss. Note that the `--npd` command line option (default 1000) will still split the data set writer into multiple subscriptions if more nodes than the configured amount are specified. +- To minimize the number of sessions against a server, the default behavior of creating a session per writer group can be overridden using the `--dsg, --disablesessionpergroup` command line option which results in a *session per endpoint* spanning multiple writer groups with the same endpoint url and configuration. + +- To further limit the number of subscriptions avoid specifying different publishing intervals for the `OpcNodes` items in the OPC Publisher [configuration](#configuration-schema). Each publishing interval will result in a subscription with the server inside the (writer group) session. + + - You can use the `--ipi, --ignorepublishingintervals` command line option to *ignore publishing interval configuration* in the JSON configuration and use the publishing interval configured using the `--op` command line option (default: 1 second). + + - In addition you can set the `--op=0` to let the server decide the smallest publishing interval it offers. + + - You can also use the `--aq, --autosetqueuesize` option to let OPC Publisher calculate the best queue size for monitored items in the subscription to limit data loss. - By default OPC Publisher tries to dispatch as many publishing requests to a server session as there are subscriptions in the session up to a maximum of `10`. The OPC UA stack tries to gradually lower the number based on feedback from the server (`BadTooManyPublishRequests`). This behavior is not tolerated by some servers. To set a lower maximum that OPC Publisher should never exceed use the `--xpr` command line option. diff --git a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/CommandLine.cs b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/CommandLine.cs index 3cab429799..1b4227728b 100644 --- a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/CommandLine.cs +++ b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/CommandLine.cs @@ -336,6 +336,9 @@ public CommandLine(string[] args, CommandLineLogger? logger = null) { $"ucr|usecyclicreads:|{OpcUaSubscriptionConfig.DefaultSamplingUsingCyclicReadKey}:", "All nodes should be sampled using periodical client reads instead of subscriptions services, unless otherwise configured.\nDefault: `false`.\n", (bool? b) => this[OpcUaSubscriptionConfig.DefaultSamplingUsingCyclicReadKey] = b?.ToString() ?? "True" }, + { $"xmi|maxmonitoreditems=|{OpcUaSubscriptionConfig.MaxMonitoredItemPerSubscriptionKey}=", + "Max monitored items per subscription until the subscription is split.\nThis is used if the server does not provide limits in its server capabilities.\nDefault: `not set`.\n", + (uint u) => this[OpcUaSubscriptionConfig.MaxMonitoredItemPerSubscriptionKey] = u.ToString(CultureInfo.CurrentCulture) }, { $"da|deferredacks:|{OpcUaSubscriptionConfig.UseDeferredAcknoledgementsKey}:", "(Experimental) Acknoledge subscription notifications only when the data has been successfully published.\nDefault: `false`.\n", (bool? b) => this[OpcUaSubscriptionConfig.UseDeferredAcknoledgementsKey] = b?.ToString() ?? "True" }, diff --git a/src/Azure.IIoT.OpcUa.Publisher.Module/tests/Sdk/ReferenceServer/AdvancedPubSubIntegrationTests.cs b/src/Azure.IIoT.OpcUa.Publisher.Module/tests/Sdk/ReferenceServer/AdvancedPubSubIntegrationTests.cs index 3c477c059b..443fb7bca2 100644 --- a/src/Azure.IIoT.OpcUa.Publisher.Module/tests/Sdk/ReferenceServer/AdvancedPubSubIntegrationTests.cs +++ b/src/Azure.IIoT.OpcUa.Publisher.Module/tests/Sdk/ReferenceServer/AdvancedPubSubIntegrationTests.cs @@ -156,9 +156,12 @@ public async Task SwitchServerWithDifferentWriterGroupTest() } [Theory] - [InlineData(false)] - [InlineData(true)] - public async Task AddNodeToDataSetWriterGroupWithNodeUsingDeviceMethod(bool differentPublishingInterval) + [InlineData(false, 100)] + [InlineData(true, 100)] + [InlineData(false, 1)] + [InlineData(true, 1)] + public async Task AddNodeToDataSetWriterGroupWithNodeUsingDeviceMethod(bool differentPublishingInterval, + int maxMonitoredItems) { var server = new ReferenceServer(); EndpointUrl = server.EndpointUrl; @@ -170,7 +173,7 @@ public async Task AddNodeToDataSetWriterGroupWithNodeUsingDeviceMethod(bool diff // Set both to the same so that there is a single writer instead of 2 testInput2[0].OpcNodes[0].OpcPublishingInterval = testInput1[0].OpcNodes[0].OpcPublishingInterval; } - StartPublisher(name, arguments: new string[] { "--mm=PubSub", "--dm=false" }); + StartPublisher(name, arguments: new string[] { "--mm=PubSub", "--dm=false", "--xmi=" + maxMonitoredItems }); try { var endpoints = await PublisherApi.GetConfiguredEndpointsAsync(); diff --git a/src/Azure.IIoT.OpcUa.Publisher.Module/tests/Sdk/ReferenceServer/BasicPubSubIntegrationTests.cs b/src/Azure.IIoT.OpcUa.Publisher.Module/tests/Sdk/ReferenceServer/BasicPubSubIntegrationTests.cs index 0585625875..b469b547c5 100644 --- a/src/Azure.IIoT.OpcUa.Publisher.Module/tests/Sdk/ReferenceServer/BasicPubSubIntegrationTests.cs +++ b/src/Azure.IIoT.OpcUa.Publisher.Module/tests/Sdk/ReferenceServer/BasicPubSubIntegrationTests.cs @@ -585,14 +585,16 @@ public async Task PeriodicHeartbeatTest() #endif } - [Fact] - public async Task CanSendKeyFramesToIoTHubTest() + [Theory] + [InlineData(100)] + [InlineData(1)] + public async Task CanSendKeyFramesToIoTHubTest(int maxMonitoredItems) { // Arrange // Act var (metadata, messages) = await ProcessMessagesAndMetadataAsync( - nameof(CanSendDataItemToIoTHubTest), "./Resources/KeyFrames.json", TimeSpan.FromMinutes(2), 11, - messageType: "ua-data", arguments: new[] { "--dm=false" }); + nameof(CanSendKeyFramesToIoTHubTest), "./Resources/KeyFrames.json", TimeSpan.FromMinutes(2), 11, + messageType: "ua-data", arguments: new[] { "--dm=false", "--xmi=" + maxMonitoredItems }); // Assert var allDataSetMessages = messages.Select(m => m.Message.GetProperty("Messages")).SelectMany(m => m.EnumerateArray()).ToList(); diff --git a/src/Azure.IIoT.OpcUa.Publisher.Module/tests/Sdk/ReferenceServer/BasicSamplesIntegrationTests.cs b/src/Azure.IIoT.OpcUa.Publisher.Module/tests/Sdk/ReferenceServer/BasicSamplesIntegrationTests.cs index 08b4d690d6..b01a949c90 100644 --- a/src/Azure.IIoT.OpcUa.Publisher.Module/tests/Sdk/ReferenceServer/BasicSamplesIntegrationTests.cs +++ b/src/Azure.IIoT.OpcUa.Publisher.Module/tests/Sdk/ReferenceServer/BasicSamplesIntegrationTests.cs @@ -427,14 +427,16 @@ public async Task CanSendPendingConditionsToIoTHubTestWithDeviceMethod() } } - [Fact] - public async Task CanSendDataItemToIoTHubTestWithDeviceMethod2() + [Theory] + [InlineData(100)] + [InlineData(1)] + public async Task CanSendDataItemToIoTHubTestWithDeviceMethod2(int maxMonitoredItems) { const string name = nameof(CanSendDataItemToIoTHubTestWithDeviceMethod2); var testInput1 = GetEndpointsFromFile(name, "./Resources/DataItems.json"); var testInput2 = GetEndpointsFromFile(name, "./Resources/SimpleEvents.json"); var testInput3 = GetEndpointsFromFile(name, "./Resources/PendingAlarms.json"); - StartPublisher(name); + StartPublisher(name, arguments: new[] { "--xmi=" + maxMonitoredItems }); try { var endpoints = await PublisherApi.GetConfiguredEndpointsAsync(); @@ -453,8 +455,9 @@ public async Task CanSendDataItemToIoTHubTestWithDeviceMethod2() endpoints = await PublisherApi.GetConfiguredEndpointsAsync(); Assert.Empty(endpoints.Endpoints); - await PublisherApi.AddOrUpdateEndpointsAsync(new List { - new() + await PublisherApi.AddOrUpdateEndpointsAsync(new List + { + new () { OpcNodes = nodes.OpcNodes.ToList(), EndpointUrl = e.EndpointUrl, @@ -468,6 +471,12 @@ await PublisherApi.AddOrUpdateEndpointsAsync(new List nodes = await PublisherApi.GetConfiguredNodesOnEndpointAsync(e); Assert.Equal(3, nodes.OpcNodes.Count); + var messages1 = await WaitForMessagesAsync(GetDataFrame); + var message1 = Assert.Single(messages1).Message; + Assert.Equal("ns=23;i=1259", message1.GetProperty("NodeId").GetString()); + Assert.InRange(message1.GetProperty("Value").GetProperty("Value").GetDouble(), + double.MinValue, double.MaxValue); + _output.WriteLine("Removing items..."); await PublisherApi.UnpublishNodesAsync(testInput3[0]); nodes = await PublisherApi.GetConfiguredNodesOnEndpointAsync(e); diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Runtime/OpcUaSubscriptionConfig.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Runtime/OpcUaSubscriptionConfig.cs index 252171c450..3df62e921e 100644 --- a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Runtime/OpcUaSubscriptionConfig.cs +++ b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Runtime/OpcUaSubscriptionConfig.cs @@ -34,6 +34,7 @@ public sealed class OpcUaSubscriptionConfig : PostConfigureOptionBase public TimeSpan? DefaultPublishingInterval { get; set; } + /// + /// Allow max monitored item per subscription. If the server + /// supports less, this value takes no effect. + /// + public uint? MaxMonitoredItemPerSubscription { get; set; } + /// /// Default subscription keep alive counter /// diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.Subscription.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.Subscription.cs index 34671cc5d0..46d987fb3c 100644 --- a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.Subscription.cs +++ b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.Subscription.cs @@ -18,6 +18,7 @@ namespace Azure.IIoT.OpcUa.Publisher.Stack.Services using System.Linq; using System.Threading; using System.Threading.Tasks; + using Opc.Ua.Client; internal sealed partial class OpcUaClient { @@ -37,31 +38,42 @@ internal async ValueTask RegisterAsync( try { await _subscriptionLock.WaitAsync(ct).ConfigureAwait(false); + AddRef(); try { + OpcUaSubscription? existingSub = null; + // - // If callback is registered with a different subscription - // dispose first and release the reference count to the client. - // - // TODO: Here we want to check if there is only one subscriber - // for the subscription. If there is - we want to update the - // subscription (safely) with the new template configuration. - // Essentially original behavior before 2.9.12. + // If subscriber is registered with a different subscription we either + // update the subscription or dispose the old one and create a new one. // if (_registrations.TryGetValue(subscriber, out var existing) && existing.Subscription != subscription) { - existing.RemoveFromRegistration(); + existing.RemoveAndReleaseNoLockInternal(); Debug.Assert(!_registrations.ContainsKey(subscriber)); + + // + // We check if there are any other subscribers registered with the + // same subscription configuration that we want to apply. If there + // arenot - we update the subscription (safely) with the new + // desired template configuration. Essentially original behavior + // before 2.9.12. + // + if ((!_s2r.TryGetValue(subscription, out var c) || c.Count == 0) && + TryGetSubscription(existing.Subscription, out existingSub)) + { + existingSub.Update(subscription); + } } var registration = new Registration(this, subscription, subscriber); - _registrations.Add(subscriber, registration); - TriggerSubscriptionSynchronization(null); + TriggerSubscriptionSynchronization(existingSub); return registration; } finally { + Release(); _subscriptionLock.Release(); } } @@ -72,23 +84,22 @@ internal async ValueTask RegisterAsync( } /// - /// Called by subscription to obtain the monitored items that - /// should be part of itself. This is called under the subscription - /// lock from the management thread so no need to lock here. + /// Get subscribers for a subscription template to get at the monitored + /// items that should be created in the subscription or subscriptions. + /// Called under the subscription lock as a result of synchronization. /// /// /// - internal IEnumerable<(ISubscriber, BaseMonitoredItemModel)> GetItems( - SubscriptionModel template) + internal IEnumerable GetSubscribers(SubscriptionModel template) { Debug.Assert(_subscriptionLock.CurrentCount == 0, "Must be locked"); - // Consider having an index for template to subscribers - // This will be needed anyway as we must support partitioning + if (_s2r.TryGetValue(template, out var registrations)) + { + return registrations.Select(r => r.Owner); + } - return _registrations - .Where(s => s.Value.Subscription == template) - .SelectMany(s => s.Key.MonitoredItems.Select(i => (s.Key, i))); + return Enumerable.Empty(); } /// @@ -138,8 +149,8 @@ private bool TryGetSubscription(SubscriptionModel template, { return true; } - subscription = _session?.SubscriptionHandles - .Find(s => s.Template == template); + subscription = _session?.SubscriptionHandles.Values + .FirstOrDefault(s => s.IsRoot && s.Template == template); return subscription != null; } @@ -154,10 +165,20 @@ private bool TryGetSubscription(SubscriptionModel template, internal async Task SyncAsync(OpcUaSubscription subscription, CancellationToken ct = default) { + var session = _session; + if (session == null) + { + return; + } + await _subscriptionLock.WaitAsync(ct).ConfigureAwait(false); try { - await subscription.SyncAsync(ct).ConfigureAwait(false); + // Get the max item per subscription as well as max + var caps = await session.GetServerCapabilitiesAsync( + NamespaceFormat.Uri, ct).ConfigureAwait(false); + await subscription.SyncAsync(caps.MaxMonitoredItemsPerSubscription, + caps.OperationLimits, ct).ConfigureAwait(false); } catch (Exception ex) { @@ -189,14 +210,24 @@ internal async Task SyncAsync(CancellationToken ct = default) var removals = 0; var additions = 0; var updates = 0; - var existing = session.SubscriptionHandles.ToDictionary(k => k.Template); - _logger.LogInformation( + var existing = session.SubscriptionHandles + .Where(s => s.Value.IsRoot) + .ToDictionary(k => k.Value.Template, k => k.Value); + + _logger.LogDebug( "{Client}: Perform synchronization of subscriptions (total: {Total})", this, session.SubscriptionHandles.Count); await EnsureSessionIsReadyForSubscriptionsAsync(session, ct).ConfigureAwait(false); + + // Get the max item per subscription as well as max + var caps = await session.GetServerCapabilitiesAsync( + NamespaceFormat.Uri, ct).ConfigureAwait(false); + var maxMonitoredItems = caps.MaxMonitoredItemsPerSubscription; + var limits = caps.OperationLimits; + // // Take the subscription lock here! - we hold it all the way until we // have updated all subscription states. The subscriptions will access @@ -209,19 +240,23 @@ await EnsureSessionIsReadyForSubscriptionsAsync(session, await _subscriptionLock.WaitAsync(ct).ConfigureAwait(false); try { - var registered = _registrations - .GroupBy(v => v.Value.Subscription) - .ToDictionary(kv => kv.Key, kv => kv.ToList()); + var s2r = _s2r.ToDictionary(kv => kv.Key, kv => kv.Value.ToList()); // Close and remove items that have no subscribers await Task.WhenAll(existing.Keys - .Except(registered.Keys) + .Except(s2r.Keys) .Select(k => existing[k]) .Select(async close => { try { _cache.TryRemove(close.Template, out _); + if (_s2r.TryRemove(close.Template, out var r)) + { + Debug.Assert(r.Count == 0, + $"count of registrations {r.Count} > 0"); + } + // Removes the item from the session and dispose await close.DisposeAsync().ConfigureAwait(false); @@ -239,7 +274,7 @@ await Task.WhenAll(existing.Keys })).ConfigureAwait(false); // Add new subscription for items with subscribers - await Task.WhenAll(registered.Keys + await Task.WhenAll(s2r.Keys .Except(existing.Keys) .Select(async add => { @@ -247,7 +282,7 @@ await Task.WhenAll(registered.Keys { // // Create a new subscription with the subscription - // configuration template that as yet has no + // configuration template that as of yet has no // representation and add it to the session. // #pragma warning disable CA2000 // Dispose objects before losing scope @@ -255,18 +290,19 @@ await Task.WhenAll(registered.Keys add, _subscriptionOptions, CreateSessionTimeout, _loggerFactory, new OpcUaClientTagList(_connection, _metrics), - _timeProvider); + null, _timeProvider); #pragma warning restore CA2000 // Dispose objects before losing scope // Add the subscription to the session session.AddSubscription(subscription); // Sync the subscription which will get it to go live. - await subscription.SyncAsync(ct).ConfigureAwait(false); + await subscription.SyncAsync(maxMonitoredItems, + caps.OperationLimits, ct).ConfigureAwait(false); Interlocked.Increment(ref additions); Debug.Assert(session == subscription.Session); - registered[add].ForEach(r => r.Value.Dirty = false); + s2r[add].ForEach(r => r.Dirty = false); } catch (OperationCanceledException) { } catch (Exception ex) @@ -278,17 +314,18 @@ await Task.WhenAll(registered.Keys })).ConfigureAwait(false); // Update any items where subscriber signalled the item was updated - await Task.WhenAll(registered.Keys.Intersect(existing.Keys) - .Where(u => registered[u].Any(b => b.Value.Dirty)) + await Task.WhenAll(s2r.Keys.Intersect(existing.Keys) + .Where(u => s2r[u].Any(b => b.Dirty)) .Select(async update => { try { var subscription = existing[update]; - await subscription.SyncAsync(ct).ConfigureAwait(false); + await subscription.SyncAsync(maxMonitoredItems, + caps.OperationLimits, ct).ConfigureAwait(false); Interlocked.Increment(ref updates); Debug.Assert(session == subscription.Session); - registered[update].ForEach(r => r.Value.Dirty = false); + s2r[update].ForEach(r => r.Dirty = false); } catch (OperationCanceledException) { } catch (Exception ex) @@ -317,8 +354,8 @@ await Task.WhenAll(registered.Keys.Intersect(existing.Keys) return; } - _logger.LogInformation("{Client}: Removed {Removals}, added {Additions}, and " + - "updated {Updates} subscriptions (total: {Total}) took {Duration}ms.", + _logger.LogInformation("{Client}: Removed {Removals}, added {Additions}, " + + "and updated {Updates} subscriptions (total: {Total}) took {Duration} ms.", this, removals, additions, updates, session.SubscriptionHandles.Count, sw.ElapsedMilliseconds); } @@ -363,10 +400,15 @@ private sealed record Registration : ISubscription, ISubscriptionDiagnostics /// public SubscriptionModel Subscription { get; } + /// + /// Monitored items on the subscriber + /// + internal ISubscriber Owner { get; } + /// /// Mark the registration as dirty /// - public bool Dirty { get; internal set; } + internal bool Dirty { get; set; } /// public IOpcUaClientDiagnostics ClientDiagnostics => _outer; @@ -377,23 +419,23 @@ private sealed record Registration : ISubscription, ISubscriptionDiagnostics /// public int GoodMonitoredItems => _outer.TryGetSubscription(Subscription, out var subscription) - ? subscription.GetGoodMonitoredItems(_owner) : 0; + ? subscription.GetGoodMonitoredItems(Owner) : 0; /// public int BadMonitoredItems => _outer.TryGetSubscription(Subscription, out var subscription) - ? subscription.GetBadMonitoredItems(_owner) : 0; + ? subscription.GetBadMonitoredItems(Owner) : 0; /// public int LateMonitoredItems => _outer.TryGetSubscription(Subscription, out var subscription) - ? subscription.GetLateMonitoredItems(_owner) : 0; + ? subscription.GetLateMonitoredItems(Owner) : 0; /// public int HeartbeatsEnabled => _outer.TryGetSubscription(Subscription, out var subscription) - ? subscription.GetHeartbeatsEnabled(_owner) : 0; + ? subscription.GetHeartbeatsEnabled(Owner) : 0; /// public int ConditionsEnabled => _outer.TryGetSubscription(Subscription, out var subscription) - ? subscription.GetConditionsEnabled(_owner) : 0; + ? subscription.GetConditionsEnabled(Owner) : 0; /// /// Create subscription @@ -405,10 +447,11 @@ public Registration(OpcUaClient outer, SubscriptionModel subscription, ISubscriber owner) { Subscription = subscription; - _owner = owner; + Owner = owner; _outer = outer; _outer.AddRef(); + AddNoLockInternal(); } /// @@ -429,8 +472,7 @@ public async ValueTask DisposeAsync() await _outer._subscriptionLock.WaitAsync().ConfigureAwait(false); try { - RemoveFromRegistration(); - + RemoveNoLockInternal(); _outer.TriggerSubscriptionSynchronization(null); } finally @@ -440,11 +482,6 @@ public async ValueTask DisposeAsync() } } - public void RemoveFromRegistration() - { - _outer._registrations.Remove(_owner); - } - /// public OpcUaSubscriptionNotification? CreateKeepAlive() { @@ -478,15 +515,52 @@ public async ValueTask CollectMetaDataAsync( dataSetMetaData, minorVersion, ct).ConfigureAwait(false); } + /// + /// Remove registration and release reference but not under + /// lock (like user of the registration handle) and without + /// triggering an update. + /// + internal void RemoveAndReleaseNoLockInternal() + { + RemoveNoLockInternal(); + _outer.Release(); + } + + private void AddNoLockInternal() + { + _outer._registrations.Add(Owner, this); + _outer._s2r.AddOrUpdate(Subscription, _ + => new List { this }, + (_, c) => + { + c.Add(this); + return c; + }); + } + + private void RemoveNoLockInternal() + { + _outer._s2r.AddOrUpdate(Subscription, _ => + { + Debug.Fail("Unexpected"); + return new List(); + }, (_, c) => + { + c.Remove(this); + return c; + }); + _outer._registrations.Remove(Owner); + } + private readonly OpcUaClient _outer; - private readonly ISubscriber _owner; } #pragma warning disable CA2213 // Disposable fields should be disposed private readonly SemaphoreSlim _subscriptionLock = new(1, 1); #pragma warning restore CA2213 // Disposable fields should be disposed private readonly Dictionary _registrations = new(); - private readonly IOptions _subscriptionOptions; + private readonly ConcurrentDictionary> _s2r = new(); private readonly ConcurrentDictionary _cache = new(); + private readonly IOptions _subscriptionOptions; } } diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.cs index bb04c51a2f..3ea2db3511 100644 --- a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.cs +++ b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.cs @@ -819,13 +819,11 @@ internal void Release(string? token = null) private async Task ManageSessionStateMachineAsync(CancellationToken ct) { var currentSessionState = SessionState.Disconnected; - IReadOnlyList currentSubscriptions; var reconnectPeriod = 0; var reconnectTimer = _timeProvider.CreateTimer( _ => TriggerConnectionEvent(ConnectionEvent.ConnectRetry), null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); - currentSubscriptions = Array.Empty(); try { await using (reconnectTimer.ConfigureAwait(false)) @@ -896,7 +894,7 @@ private async Task ManageSessionStateMachineAsync(CancellationToken ct) _disconnectLock = null; currentSessionState = SessionState.Connected; - currentSubscriptions.ForEach(h => h.NotifySessionConnectionState(false)); + NotifySubscriptions(_session, false); break; case SessionState.Disconnected: case SessionState.Connected: @@ -946,8 +944,7 @@ private async Task ManageSessionStateMachineAsync(CancellationToken ct) _session = null; NotifyConnectivityStateChange(EndpointConnectivityState.Connecting); currentSessionState = SessionState.Reconnecting; - _reconnectingSession?.SubscriptionHandles - .ForEach(h => h.NotifySessionConnectionState(true)); + NotifySubscriptions(_reconnectingSession, true); (context as TaskCompletionSource)?.TrySetResult(); break; case SessionState.Connecting: @@ -1007,7 +1004,7 @@ private async Task ManageSessionStateMachineAsync(CancellationToken ct) _reconnectRequired = 0; reconnectPeriod = GetMinReconnectPeriod(); currentSessionState = SessionState.Connected; - currentSubscriptions.ForEach(h => h.NotifySessionConnectionState(false)); + NotifySubscriptions(_session, false); break; case SessionState.Connected: @@ -1073,17 +1070,24 @@ async ValueTask HandleDisconnectEvent(CancellationToken cancellationToken) reconnectPeriod = 0; NotifyConnectivityStateChange(EndpointConnectivityState.Disconnected); - - if (_session?.Connected == true) - { - _session.SubscriptionHandles.ForEach(h => - h.NotifySessionConnectionState(true)); - } + NotifySubscriptions(_session, true); await CloseSessionAsync().ConfigureAwait(false); Debug.Assert(_session == null); } + static void NotifySubscriptions(OpcUaSession? session, bool disconnected) + { + if (session == null) + { + return; + } + foreach (var h in session.SubscriptionHandles.Values) + { + h.NotifySessionConnectionState(disconnected); + } + } + int GetMinReconnectPeriod() { var reconnectPeriod = MinReconnectDelay ?? TimeSpan.Zero; @@ -1239,9 +1243,9 @@ private async ValueTask TryConnectAsync(CancellationToken ct) // var securityMode = _connection.Endpoint.SecurityMode ?? SecurityMode.NotNone; var securityProfile = _connection.Endpoint.SecurityPolicy; - - var endpointDescription = await SelectEndpointAsync(endpointUrl, - connection, securityMode, securityProfile, ct: ct).ConfigureAwait(false); + var endpointDescription = await SelectEndpointAsync(_configuration, + endpointUrl, connection, securityMode, securityProfile, _logger, + this, ct: ct).ConfigureAwait(false); if (endpointDescription == null) { _logger.LogWarning( @@ -1381,7 +1385,7 @@ internal void Session_PublishSequenceNumbersToAcknowledge(ISession session, e.AcknowledgementsToSend.Clear(); e.DeferredAcknowledgementsToSend.Clear(); - foreach (var subscription in ((OpcUaSession)session).SubscriptionHandles) + foreach (var subscription in ((OpcUaSession)session).SubscriptionHandles.Values) { if (!subscription.TryGetCurrentPosition(out var sid, out var seq)) { @@ -1764,16 +1768,21 @@ private void NotifyConnectivityStateChange(EndpointConnectivityState state) /// /// Select the endpoint to use /// + /// /// /// /// /// + /// + /// /// /// /// - internal async Task SelectEndpointAsync(Uri? discoveryUrl, + internal static async Task SelectEndpointAsync( + ApplicationConfiguration configuration, Uri? discoveryUrl, ITransportWaitingConnection? connection, SecurityMode securityMode, - string? securityPolicy, string? endpointUrl = null, CancellationToken ct = default) + string? securityPolicy, ILogger logger, object? context, + string? endpointUrl = null, CancellationToken ct = default) { var endpointConfiguration = EndpointConfiguration.Create(); endpointConfiguration.OperationTimeout = @@ -1798,25 +1807,30 @@ private void NotifyConnectivityStateChange(EndpointConnectivityState state) } using (var client = connection != null ? - DiscoveryClient.Create(_configuration, connection, endpointConfiguration) : - DiscoveryClient.Create(_configuration, discoveryUrl, endpointConfiguration)) + DiscoveryClient.Create(configuration, connection, endpointConfiguration) : + DiscoveryClient.Create(configuration, discoveryUrl, endpointConfiguration)) { var uri = new Uri(endpointUrl ?? client.Endpoint.EndpointUrl); var endpoints = await client.GetEndpointsAsync(null, ct).ConfigureAwait(false); discoveryUrl ??= uri; - _logger.LogInformation("{Client}: Discovery endpoint {DiscoveryUrl} returned endpoints. " + + logger.LogInformation("{Client}: Discovery endpoint {DiscoveryUrl} returned endpoints. " + "Selecting endpoint {EndpointUri} with SecurityMode " + "{SecurityMode} and {SecurityPolicy} SecurityPolicyUri from:\n{Endpoints}", - this, discoveryUrl, uri, securityMode, securityPolicy ?? "any", endpoints.Select( + context, discoveryUrl, uri, securityMode, securityPolicy ?? "any", endpoints.Select( ep => " " + ToString(ep)).Aggregate((a, b) => $"{a}\n{b}")); var filtered = endpoints .Where(ep => SecurityPolicies.GetDisplayName(ep.SecurityPolicyUri) != null && ep.SecurityMode.IsSame(securityMode) && - (securityPolicy == null || string.Equals(ep.SecurityPolicyUri, - securityPolicy, StringComparison.OrdinalIgnoreCase))) + (securityPolicy == null || + string.Equals(ep.SecurityPolicyUri, + securityPolicy, + StringComparison.OrdinalIgnoreCase) || + string.Equals(ep.SecurityPolicyUri, + "http://opcfoundation.org/UA/SecurityPolicy#" + securityPolicy, + StringComparison.OrdinalIgnoreCase))) // // The security level is a relative measure assigned by the server // to the endpoints that it returns. Clients should always pick the @@ -1843,9 +1857,9 @@ private void NotifyConnectivityStateChange(EndpointConnectivityState state) // if (selected != null) { - _logger.LogInformation( + logger.LogInformation( "{Client}: Endpoint {Endpoint} selected via reverse connect!", - this, ToString(selected)); + context, ToString(selected)); } return selected; } @@ -1880,7 +1894,7 @@ private void NotifyConnectivityStateChange(EndpointConnectivityState state) }.ToString(); } - _logger.LogInformation("{Client}: Endpoint {Endpoint} selected!", this, + logger.LogInformation("{Client}: Endpoint {Endpoint} selected!", context, ToString(selected)); return selected; diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClientManager.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClientManager.cs index c33593991e..1997c43668 100644 --- a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClientManager.cs +++ b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClientManager.cs @@ -96,26 +96,26 @@ public async ValueTask CreateSubscriptionAsync( /// public async Task TestConnectionAsync( - ConnectionModel endpoint, TestConnectionRequestModel request, - CancellationToken ct) + ConnectionModel endpoint, TestConnectionRequestModel request, CancellationToken ct) { ArgumentNullException.ThrowIfNull(endpoint); - if (string.IsNullOrEmpty(endpoint.Endpoint?.Url)) - { - throw new ArgumentException("Endpoint url is missing.", nameof(endpoint)); - } + ArgumentNullException.ThrowIfNullOrWhiteSpace(endpoint.Endpoint?.Url); - var endpointUrl = endpoint.Endpoint.Url; - var endpointDescription = CoreClientUtils.SelectEndpoint( - _configuration.Value, endpointUrl, - endpoint.Endpoint.SecurityMode != SecurityMode.None); - var endpointConfiguration = EndpointConfiguration.Create(_configuration.Value); - var configuredEndpoint = new ConfiguredEndpoint(null, endpointDescription, - endpointConfiguration); - var userIdentity = await endpoint.User.ToUserIdentityAsync( - _configuration.Value).ConfigureAwait(false); + var endpointUrl = new Uri(endpoint.Endpoint.Url); try { + var endpointDescription = await OpcUaClient.SelectEndpointAsync( + _configuration.Value, endpointUrl, null, + endpoint.Endpoint.SecurityMode ?? SecurityMode.NotNone, + endpoint.Endpoint.SecurityPolicy, _logger, endpoint, + ct: ct).ConfigureAwait(false); + + var endpointConfiguration = EndpointConfiguration.Create( + _configuration.Value); + var configuredEndpoint = new ConfiguredEndpoint(null, + endpointDescription, endpointConfiguration); + var userIdentity = await endpoint.User.ToUserIdentityAsync( + _configuration.Value).ConfigureAwait(false); using var session = await DefaultSessionFactory.Instance.CreateAsync( _configuration.Value, reverseConnectManager: null, configuredEndpoint, updateBeforeConnect: true, // Update endpoint through discovery diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaSession.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaSession.cs index 53a5685c5f..2eec9c8b69 100644 --- a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaSession.cs +++ b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaSession.cs @@ -52,13 +52,15 @@ internal bool IsTypeSystemLoaded /// /// Get list of subscription handles registered in the session /// - internal List SubscriptionHandles + internal Dictionary SubscriptionHandles { get { lock (SyncRoot) { - return Subscriptions.OfType().ToList(); + return Subscriptions + .OfType() + .ToDictionary(k => k.SubscriptionId); } } } diff --git a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaSubscription.cs b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaSubscription.cs index 255f7f3ee0..efcd92e74f 100644 --- a/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaSubscription.cs +++ b/src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaSubscription.cs @@ -14,6 +14,7 @@ namespace Azure.IIoT.OpcUa.Publisher.Stack.Services using Microsoft.Extensions.Options; using Opc.Ua; using Opc.Ua.Client; + using Opc.Ua.Client.ComplexTypes; using Opc.Ua.Extensions; using System; using System.Collections.Frozen; @@ -25,6 +26,7 @@ namespace Azure.IIoT.OpcUa.Publisher.Stack.Services using System.Runtime.Serialization; using System.Threading; using System.Threading.Tasks; + using System.Text; /// /// Subscription implementation @@ -39,13 +41,23 @@ namespace Azure.IIoT.OpcUa.Publisher.Stack.Services [KnownType(typeof(OpcUaMonitoredItem.Condition))] [KnownType(typeof(OpcUaMonitoredItem.Field))] internal sealed class OpcUaSubscription : Subscription, IAsyncDisposable, - IEquatable, IEquatable + IEquatable { /// /// Template for subscription /// public SubscriptionModel Template { get; private set; } + /// + /// Is root subscription + /// + public bool IsRoot => _parentId == null; + + /// + /// Unique subscription identifier in the process + /// + public uint SubscriptionId { get; } + /// /// The name of the subscription /// @@ -145,26 +157,28 @@ public bool ResolveBrowsePathFromRoot /// /// /// + /// /// internal OpcUaSubscription(OpcUaClient client, SubscriptionModel template, IOptions options, TimeSpan? createSessionTimeout, - ILoggerFactory loggerFactory, IMetricsContext metrics, TimeProvider? timeProvider = null) + ILoggerFactory loggerFactory, IMetricsContext metrics, uint? parentId = null, + TimeProvider? timeProvider = null) { - _client = client ?? throw new ArgumentNullException(nameof(client)); - _options = options ?? throw new ArgumentNullException(nameof(options)); + _client = client; + _options = options; + _loggerFactory = loggerFactory; + _metrics = metrics; + _parentId = parentId; _createSessionTimeout = createSessionTimeout; - _loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); - _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); _timeProvider = timeProvider ?? TimeProvider.System; Template = template; Name = Template.CreateSubscriptionId(); + SubscriptionId = Opc.Ua.SequenceNumber.Increment32(ref _lastIndex); _logger = _loggerFactory.CreateLogger(); _additionallyMonitored = FrozenDictionary.Empty; - _generation = Opc.Ua.SequenceNumber.Increment32(ref _lastIndex); - Initialize(); _timer = _timeProvider.CreateTimer(_ => TriggerManageSubscription(), null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); @@ -192,10 +206,13 @@ private OpcUaSubscription(OpcUaSubscription subscription, bool copyEventHandlers _timeProvider = subscription._timeProvider; _metrics = subscription._metrics; _firstDataChangeReceived = subscription._firstDataChangeReceived; + Template = subscription.Template; Name = subscription.Name; - _generation = subscription._generation; + SubscriptionId = subscription.SubscriptionId; + _parentId = subscription._parentId; + _client = subscription._client; _logger = subscription._logger; _sequenceNumber = subscription._sequenceNumber; @@ -225,6 +242,40 @@ private OpcUaSubscription(OpcUaSubscription subscription, bool copyEventHandlers _client.OnSubscriptionCreated(this); } + /// + /// Copy constructor + /// + /// + /// + private OpcUaSubscription(OpcUaSubscription subscription, uint parentId) + { + _options = subscription._options; + _loggerFactory = subscription._loggerFactory; + _timeProvider = subscription._timeProvider; + _client = subscription._client; + _metrics = subscription._metrics; + _parentId = parentId; + + Template = subscription.Template; + Name = subscription.Name; + + SubscriptionId = Opc.Ua.SequenceNumber.Increment32(ref _lastIndex); + _logger = _loggerFactory.CreateLogger(); + _additionallyMonitored = FrozenDictionary.Empty; + + Initialize(); + + _timer = _timeProvider.CreateTimer(_ => TriggerManageSubscription(), null, + Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); + _keepAliveWatcher = _timeProvider.CreateTimer(OnKeepAliveMissing, null, + Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); + _monitoredItemWatcher = _timeProvider.CreateTimer(OnMonitoredItemWatchdog, null, + Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); + + InitializeMetrics(); + ResetMonitoredItemWatchdogTimer(PublishingEnabled); + } + /// public override object Clone() { @@ -240,31 +291,36 @@ public override Subscription CloneSubscription(bool copyEventHandlers) /// public override string? ToString() { - return $"{Id}:{Name}"; - } - - /// - public override bool Equals(object? obj) - { - if (obj is OpcUaSubscription subscription) + var sb = new StringBuilder() + .Append(Id) + .Append(':') + .Append(SubscriptionId) + .Append(':'); + if (_parentId != null) { - return subscription.Template.Equals(Template); + sb = sb + .Append(_parentId.Value) + .Append("->"); } - if (obj is SubscriptionModel model) + sb = sb.Append(Name); + if (_childId != null) { - return model.Equals(Template); + sb = sb + .Append("->") + .Append(_childId.Value); } - return false; + return sb.ToString(); } /// - public bool Equals(SubscriptionModel? other) + public override bool Equals(object? obj) { - if (other is null) + if (obj is OpcUaSubscription subscription) { - return false; + return subscription.Template.Equals(Template) && + subscription.SubscriptionId == SubscriptionId; } - return other.Equals(Template); + return false; } /// @@ -274,7 +330,9 @@ public bool Equals(OpcUaSubscription? other) { return false; } - return other.Template.Equals(Template); + return + other.Template.Equals(Template) && + other.SubscriptionId == SubscriptionId; } /// @@ -286,73 +344,238 @@ public override int GetHashCode() /// protected override void Dispose(bool disposing) { + base.Dispose(disposing); + + if (!disposing || _disposed) + { + return; + } + _disposed = true; + try + { + ResetMonitoredItemWatchdogTimer(false); + _keepAliveWatcher.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); + + FastDataChangeCallback = null; + FastEventCallback = null; + FastKeepAliveCallback = null; + + PublishStatusChanged -= OnPublishStatusChange; + StateChanged -= OnStateChange; + + var items = CurrentlyMonitored.ToList(); + if (items.Count == 0) + { + _logger.LogInformation("Disposed Subscription {Subscription}.", this); + return; + } + + // + // When the entire session is disposed and recreated we must + // still dispose all monitored items that are remaining + // + items.ForEach(item => item.Dispose()); + RemoveItems(MonitoredItems); + _additionallyMonitored = FrozenDictionary.Empty; + Debug.Assert(!CurrentlyMonitored.Any()); + + _logger.LogInformation( + "Disposed Subscription {Subscription} with {Count)} items.", + this, items.Count); + } + catch (Exception ex) + { + _logger.LogError(ex, + "Disposing Subscription {Subscription} encountered error.", this); + + // Eat the error + } + finally + { + _keepAliveWatcher.Dispose(); + _monitoredItemWatcher.Dispose(); + _timer.Dispose(); + _meter.Dispose(); + + Handle = null; + } + } + + /// + public async ValueTask DisposeAsync() + { + // + // Called by the management thread to "close" the subscription and dispose it. + // Note that the session calls dispose again or when it is closed or + // reconnected. This her is called when the management thread determines + // to gracefully close the subscription. + // try { - if (disposing) + // first close the children + var child = GetChildSubscription(); + if (child != null) { - if (_disposed) - { - // Double dispose - Debug.Fail("Double dispose in subscription"); - return; - } - _disposed = true; - try - { - ResetMonitoredItemWatchdogTimer(false); - _keepAliveWatcher.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); - - FastDataChangeCallback = null; - FastEventCallback = null; - FastKeepAliveCallback = null; - - PublishStatusChanged -= OnPublishStatusChange; - StateChanged -= OnStateChange; - - // When the entire session is disposed and recreated we must still dispose - // all monitored items - var items = CurrentlyMonitored.ToList(); - items.ForEach(item => item.Dispose()); - RemoveItems(MonitoredItems); - - _additionallyMonitored = FrozenDictionary.Empty; - Debug.Assert(!CurrentlyMonitored.Any()); - _logger.LogInformation("Disposed Subscription {Subscription} (with {Count)} items).", - this, items.Count); - } - finally - { - _keepAliveWatcher.Dispose(); - _monitoredItemWatcher.Dispose(); - _timer.Dispose(); - _meter.Dispose(); + await child.DisposeAsync().ConfigureAwait(false); + } - Handle = null; - } + if (IsClosed) + { + return; } - Debug.Assert(!_disposed || FastDataChangeCallback == null); - Debug.Assert(!_disposed || FastKeepAliveCallback == null); - Debug.Assert(!_disposed || FastEventCallback == null); + Debug.Assert(Session != null); + + ResetKeepAliveTimer(); + ResetMonitoredItemWatchdogTimer(false); + + // Does not throw + await CloseCurrentSubscriptionAsync().ConfigureAwait(false); + + _logger.LogInformation("Closed Subscription {Subscription}.", this); + Debug.Assert(Session == null); } finally { - base.Dispose(disposing); + Dispose(); } } /// - /// Update subscription configuration + /// Try get the current position in the out stream. This is called + /// on all subscriptions in the session and takes child subscriptions + /// into account /// - /// - internal void Update(SubscriptionModel template) + /// + /// + /// + internal bool TryGetCurrentPosition(out uint subscriptionId, out uint sequenceNumber) { - Template = template; - Name = Template.CreateSubscriptionId(); + subscriptionId = Id; + sequenceNumber = _currentSequenceNumber; + return UseDeferredAcknoledgements; + } + + /// + /// Notify session disconnected/reconnecting. This is called + /// on all subscriptions in the session and takes child subscriptions + /// into account + /// + /// + /// + internal void NotifySessionConnectionState(bool disconnected) + { + foreach (var item in CurrentlyMonitored) + { + item.NotifySessionConnectionState(disconnected); + } } /// - /// Collect metadata + /// Create a keep alive message + /// + /// + internal OpcUaSubscriptionNotification? CreateKeepAlive() + { + Debug.Assert(IsRoot); + if (IsClosed) + { + _logger.LogError("Subscription {Subscription} closed!", this); + return null; + } + try + { + var session = Session; + if (session == null) + { + return null; + } + return new OpcUaSubscriptionNotification(this, session.MessageContext, + Array.Empty(), _timeProvider) + { + ApplicationUri = session.Endpoint.Server.ApplicationUri, + EndpointUrl = session.Endpoint.EndpointUrl, + SequenceNumber = Opc.Ua.SequenceNumber.Increment32(ref _sequenceNumber), + MessageType = MessageType.KeepAlive + }; + } + catch (Exception ex) + { + _logger.LogError(ex, + "Failed to create keep alive for subscription {Subscription}.", this); + return null; + } + } + + + /// + /// Get number of good monitored item for the subscriber across + /// this and all child subscriptions + /// + /// + /// + internal int GetGoodMonitoredItems(ISubscriber owner) + { + Debug.Assert(IsRoot); + return GetAllMonitoredItems().Count(r => r is OpcUaMonitoredItem h + && h.Owner == owner && h.IsGood); + } + + /// + /// Get number of bad monitored item for the subscriber across + /// this and all child subscriptions + /// + /// + /// + internal int GetBadMonitoredItems(ISubscriber owner) + { + Debug.Assert(IsRoot); + return GetAllMonitoredItems().Count(r => r is OpcUaMonitoredItem h + && h.Owner == owner && h.IsBad); + } + + /// + /// Get number of late monitored item for the subscriber across + /// this and all child subscriptions + /// + /// + /// + internal int GetLateMonitoredItems(ISubscriber owner) + { + Debug.Assert(IsRoot); + return GetAllMonitoredItems().Count(r => r is OpcUaMonitoredItem h + && h.Owner == owner && h.IsLate); + } + + /// + /// Get number of enabled heartbeats for the subscriber across + /// this and all child subscriptions + /// + /// + /// + internal int GetHeartbeatsEnabled(ISubscriber owner) + { + Debug.Assert(IsRoot); + return GetAllMonitoredItems().Count(r => r is OpcUaMonitoredItem.Heartbeat h + && h.Owner == owner && h.TimerEnabled); + } + + /// + /// Get number of conditions enabled for the subscriber across + /// this and all child subscriptions + /// + /// + /// + internal int GetConditionsEnabled(ISubscriber owner) + { + Debug.Assert(IsRoot); + return GetAllMonitoredItems().Count(r => r is OpcUaMonitoredItem.Condition h + && h.Owner == owner && h.TimerEnabled); + } + + /// + /// Collect metadata for the subscriber across this and all child + /// subscriptions /// /// /// @@ -360,12 +583,12 @@ internal void Update(SubscriptionModel template) /// /// /// - /// /// internal async ValueTask CollectMetaDataAsync( ISubscriber owner, DataSetFieldContentFlags? dataSetFieldContentMask, DataSetMetaDataModel dataSetMetaData, uint minorVersion, CancellationToken ct) { + Debug.Assert(IsRoot); if (Session is not OpcUaSession session) { throw ServiceResultException.Create(StatusCodes.BadSessionIdInvalid, @@ -375,11 +598,9 @@ internal async ValueTask CollectMetaDataAsync( var typeSystem = await session.GetComplexTypeSystemAsync(ct).ConfigureAwait(false); var dataTypes = new NodeIdDictionary(); var fields = new List(); - foreach (var monitoredItem in CurrentlyMonitored.Where(m => m.Owner == owner)) - { - await monitoredItem.GetMetaDataAsync(session, typeSystem, - fields, dataTypes, ct).ConfigureAwait(false); - } + + await CollectMetaDataAsync(owner, session, typeSystem, dataTypes, fields, + ct).ConfigureAwait(false); // // For full featured messages there are additional fields that are required @@ -430,349 +651,331 @@ static void AddExtraField(List fields, } /// - /// Create a keep alive message + /// Update subscription configuration and apply changes later during + /// synchronization. This is used when the subscription is owned by a + /// single subscriber and the configuration is updated. /// - /// - internal OpcUaSubscriptionNotification? CreateKeepAlive() - { - if (IsClosed) - { - _logger.LogError("Subscription {Subscription} closed!", this); - return null; - } - try - { - var session = Session; - if (session == null) - { - return null; - } - return new OpcUaSubscriptionNotification(this, session.MessageContext, - Array.Empty(), _timeProvider) - { - ApplicationUri = session.Endpoint.Server.ApplicationUri, - EndpointUrl = session.Endpoint.EndpointUrl, - SequenceNumber = Opc.Ua.SequenceNumber.Increment32(ref _sequenceNumber), - MessageType = MessageType.KeepAlive - }; - } - catch (Exception ex) - { - _logger.LogError(ex, - "Failed to create a subscription notification for subscription {Subscription}.", - this); - return null; - } - } - - /// - public async ValueTask DisposeAsync() + /// + internal void Update(SubscriptionModel template) { - try - { - if (!IsClosed) - { - Debug.Assert(Session != null); + // Debug.Assert(IsRoot); -- called recursively down to all children. - ResetKeepAliveTimer(); - ResetMonitoredItemWatchdogTimer(false); - - // Does not throw - await CloseCurrentSubscriptionAsync().ConfigureAwait(false); + Template = template; + Name = Template.CreateSubscriptionId(); - Debug.Assert(Session == null); - } - } - finally - { - Dispose(); - } + GetChildSubscription()?.Update(template); } /// - /// Create or update the subscription now using the - /// currently configured subscription configuration. + /// Create or update the subscription now using the currently configured + /// subscription configuration template. /// + /// + /// /// /// - internal async ValueTask SyncAsync(CancellationToken ct) + /// + internal async ValueTask SyncAsync(uint? maxMonitoredItemsPerSubscription, + OperationLimitsModel limits, CancellationToken ct) { + Debug.Assert(IsRoot); if (_disposed) { return; } - Debug.Assert(Session != null); - - TriggerSubscriptionManagementCallbackIn(Timeout.InfiniteTimeSpan); + var maxMonitoredItems = maxMonitoredItemsPerSubscription ?? 0u; + if (maxMonitoredItems <= 0) + { + maxMonitoredItems = _options.Value.MaxMonitoredItemPerSubscription + ?? kMaxMonitoredItemPerSubscriptionDefault; + } - if (!Session.Connected) + Debug.Assert(Session != null); + if (Session is not OpcUaSession session || !session.Connected) { _logger.LogError( "Session {Session} for {Subscription} not connected.", Session, this); - TriggerSubscriptionManagementCallbackIn( - _createSessionTimeout, TimeSpan.FromSeconds(10)); + _timer.Change(Delay(_createSessionTimeout, TimeSpan.FromSeconds(10)), + Timeout.InfiniteTimeSpan); return; } try { - await SyncInternalAsync(ct).ConfigureAwait(false); - } - catch (Exception e) - { - _logger.LogError(e, - "Failed to apply state to Subscription {Subscription} in session {Session}...", - this, Session); - // Retry in 1 minute if not automatically retried - TriggerSubscriptionManagementCallbackIn( - _options.Value.SubscriptionErrorRetryDelay, kDefaultErrorRetryDelay); - } - } + var retryDelay = Timeout.InfiniteTimeSpan; + _timer.Change(retryDelay, Timeout.InfiniteTimeSpan); - /// - /// Try get the current position in the out stream. - /// - /// - /// - /// - internal bool TryGetCurrentPosition(out uint subscriptionId, out uint sequenceNumber) - { - subscriptionId = Id; - sequenceNumber = _currentSequenceNumber; - return UseDeferredAcknoledgements; - } + // Force recreate all subscriptions in the chain if needed + await ForceRecreateIfNeededAsync(session).ConfigureAwait(false); - /// - /// Notify session disconnected/reconnecting - /// - /// - /// - internal void NotifySessionConnectionState(bool disconnected) - { - foreach (var item in CurrentlyMonitored) - { - item.NotifySessionConnectionState(disconnected); - } - } + // Parition the monitored items across subscriptions + var partitions = Partition.Create(_client.GetSubscribers(Template), + maxMonitoredItems, _options.Value); - /// - /// Send notification - /// - /// - /// - /// - /// - /// - /// - /// - internal void SendNotification(ISubscriber callback, MessageType messageType, - IList notifications, ISession? session, - string? eventTypeName, bool diagnosticsOnly, DateTimeOffset? timestamp) - { - var curSession = session ?? Session; - var messageContext = curSession?.MessageContext; - - if (messageContext == null) - { - if (session == null) + var subscriptionPartition = this; // The root is the default + for (var partitionIdx = 0; partitionIdx < partitions.Count; partitionIdx++) { - // Can only send with context - _logger.LogDebug("Failed to send notification since no session exists " + - "to use as context. Notification was dropped."); - return; - } - _logger.LogWarning("A session was passed to send notification with but without " + - "message context. Using thread context."); - messageContext = ServiceMessageContext.ThreadContext; - } + // Synchronize the subscription of this partition + await subscriptionPartition.SynchronizeSubscriptionAsync( + ct).ConfigureAwait(false); -#pragma warning disable CA2000 // Dispose objects before losing scope - var message = new OpcUaSubscriptionNotification(this, messageContext, notifications, - _timeProvider, createdTimestamp: timestamp) - { - ApplicationUri = curSession?.Endpoint?.Server?.ApplicationUri, - EndpointUrl = curSession?.Endpoint?.EndpointUrl, - EventTypeName = eventTypeName, - SequenceNumber = Opc.Ua.SequenceNumber.Increment32(ref _sequenceNumber), - MessageType = messageType - }; -#pragma warning restore CA2000 // Dispose objects before losing scope + // Add partitioned items + var partition = partitions[partitionIdx]; + var delay = await subscriptionPartition.SynchronizeMonitoredItemsAsync( + partition, limits, ct).ConfigureAwait(false); + if (retryDelay > delay) + { + retryDelay = delay; + } - var count = message.GetDiagnosticCounters(out var modelChanges, - out var heartbeats, out var overflows); - if (messageType == MessageType.Event || messageType == MessageType.Condition) - { - if (!diagnosticsOnly) - { - callback.OnSubscriptionEventReceived(message); + if (partitionIdx == partitions.Count - 1) + { + break; + } + + // Get or create a child subscription + subscriptionPartition = subscriptionPartition.GetChildSubscription(true); + if (subscriptionPartition == null) + { + throw ServiceResultException.Create(StatusCodes.BadUnexpectedError, + "Failed to create child subscription."); + } } - if (count > 0) + + // + // subscription now is the tail or head subscription. We remove + // all child subscriptions below it as they are not needed anymore. + // + var tail = subscriptionPartition; + while (tail != null) { - callback.OnSubscriptionEventDiagnosticsChange(false, - count, overflows, modelChanges == 0 ? 0 : 1); + tail = tail.GetChildSubscription(); + if (tail != null) + { + await tail.DisposeAsync().ConfigureAwait(false); + } } + // Snip off here + subscriptionPartition._childId = null; + + // Force finalize all subscriptions in the (new) chain if needed + await FinalizeSyncAsync(ct).ConfigureAwait(false); + + _timer.Change(retryDelay, Timeout.InfiniteTimeSpan); } - else + catch (Exception e) { - if (!diagnosticsOnly) - { - callback.OnSubscriptionDataChangeReceived(message); - } - if (count > 0) - { - callback.OnSubscriptionDataDiagnosticsChange(false, - count, overflows, heartbeats); - } + _logger.LogError(e, + "Failed to apply state to Subscription {Subscription} in session {Session}...", + this, Session); + // Retry in 1 minute if not automatically retried + _timer.Change(Delay(_options.Value.SubscriptionErrorRetryDelay, + kDefaultErrorRetryDelay), Timeout.InfiniteTimeSpan); } } /// - /// Get number of good monitored item for the subscriber + /// Force recreate all subscriptions in the chain if needed /// - /// - /// - internal int GetGoodMonitoredItems(ISubscriber owner) - { - return MonitoredItems.Count(r => r is OpcUaMonitoredItem h - && h.Owner == owner && h.IsGood); - } - - /// - /// Get number of bad monitored item for the subscriber - /// - /// - /// - internal int GetBadMonitoredItems(ISubscriber owner) - { - return MonitoredItems.Count(r => r is OpcUaMonitoredItem h - && h.Owner == owner && h.IsBad); - } - - /// - /// Get number of late monitored item for the subscriber - /// - /// + /// /// - internal int GetLateMonitoredItems(ISubscriber owner) + private async Task ForceRecreateIfNeededAsync(OpcUaSession session) { - return MonitoredItems.Count(r => r is OpcUaMonitoredItem h - && h.Owner == owner && h.IsLate); - } + var child = GetChildSubscription(); + if (child != null) + { + await child.ForceRecreateIfNeededAsync(session).ConfigureAwait(false); + } - /// - /// Get number of enabled heartbeats for the subscriber - /// - /// - /// - internal int GetHeartbeatsEnabled(ISubscriber owner) - { - return MonitoredItems.Count(r => r is OpcUaMonitoredItem.Heartbeat h - && h.Owner == owner && h.TimerEnabled); - } + if (!_forceRecreate) + { + return; + } - /// - /// Get number of conditions enabled for the subscriber - /// - /// - /// - internal int GetConditionsEnabled(ISubscriber owner) - { - return MonitoredItems.Count(r => r is OpcUaMonitoredItem.Condition h - && h.Owner == owner && h.TimerEnabled); - } + _forceRecreate = false; + _logger.LogInformation( + "======== Closing subscription {Subscription} and re-creating =========", + this); - /// - /// Initialize state - /// - private void Initialize() - { - FastKeepAliveCallback = OnSubscriptionKeepAliveNotification; - FastDataChangeCallback = OnSubscriptionDataChangeNotification; - FastEventCallback = OnSubscriptionEventNotificationList; - PublishStatusChanged += OnPublishStatusChange; - StateChanged += OnStateChange; + // Does not throw + await CloseCurrentSubscriptionAsync().ConfigureAwait(false); - TimestampsToReturn = Opc.Ua.TimestampsToReturn.Both; - DisableMonitoredItemCache = true; + Debug.Assert(Session == null); + session.AddSubscription(this); // Re-add the subscription now + Debug.Assert(Session == session); } /// - /// Close subscription + /// Finalize sync of all subscriptions in the chain if needed /// + /// /// - private async Task CloseCurrentSubscriptionAsync() + private async Task FinalizeSyncAsync(CancellationToken ct) { - ResetKeepAliveTimer(); - try + var child = GetChildSubscription(); + if (child != null) { - Handle = null; // Mark as closed + await child.FinalizeSyncAsync(ct).ConfigureAwait(false); + } - _logger.LogDebug("Closing subscription '{Subscription}'...", this); + if (ChangesPending) + { + await ApplyChangesAsync(ct).ConfigureAwait(false); + } - // Dispose all monitored items - var items = CurrentlyMonitored.ToList(); + var shouldEnable = MonitoredItems + .OfType() + .Any(m => m.Valid && m.MonitoringMode != Opc.Ua.MonitoringMode.Disabled); + if (PublishingEnabled ^ shouldEnable) + { + await SetPublishingModeAsync(shouldEnable, ct).ConfigureAwait(false); - _additionallyMonitored = FrozenDictionary.Empty; - RemoveItems(MonitoredItems); - _currentSequenceNumber = 0; - _goodMonitoredItems = 0; - _badMonitoredItems = 0; + _logger.LogInformation( + "{State} Subscription {Subscription} in session {Session}.", + shouldEnable ? "Enabled" : "Disabled", this, Session); - _reportingItems = 0; - _disabledItems = 0; - _samplingItems = 0; - _notAppliedItems = 0; + ResetMonitoredItemWatchdogTimer(shouldEnable); + } + } - ResetMonitoredItemWatchdogTimer(false); + /// + /// Get a subscription with the supplied configuration (no lock) + /// + /// + /// + /// + private async ValueTask SynchronizeSubscriptionAsync(CancellationToken ct) + { + Debug.Assert(Session.DefaultSubscription != null, "No default subscription template."); - await Try.Async(() => SetPublishingModeAsync(false)).ConfigureAwait(false); - await Try.Async(() => DeleteItemsAsync(default)).ConfigureAwait(false); - await Try.Async(() => ApplyChangesAsync()).ConfigureAwait(false); + if (Handle == null) + { + Handle = SubscriptionId; // Initialized for the first time + DisplayName = Name + SubscriptionId; + PublishingEnabled = EnableImmediatePublishing; + KeepAliveCount = DesiredKeepAliveCount; + PublishingInterval = (int)DesiredPublishingInterval.TotalMilliseconds; + MaxNotificationsPerPublish = DesiredMaxNotificationsPerPublish; + LifetimeCount = DesiredLifetimeCount; + Priority = DesiredPriority; - items.ForEach(item => item.Dispose()); - _logger.LogDebug("Deleted {Count} monitored items for '{Subscription}'.", - items.Count, this); + // TODO: use a channel and reorder task before calling OnMessage + // to order or else republish is called too often + RepublishAfterTransfer = DesiredRepublishAfterTransfer; + SequentialPublishing = EnableSequentialPublishing; - await Try.Async(() => DeleteAsync(true)).ConfigureAwait(false); + _logger.LogInformation( + "Creating new {State} subscription {Subscription} in session {Session}.", + PublishingEnabled ? "enabled" : "disabled", this, Session); - if (Session != null) + Debug.Assert(Session != null); + await CreateAsync(ct).ConfigureAwait(false); + if (!Created) { - await Session.RemoveSubscriptionAsync(this).ConfigureAwait(false); + Handle = null; + var session = Session; + await session.RemoveSubscriptionAsync(this, ct).ConfigureAwait(false); + Debug.Assert(Session == null); + throw new ServiceResultException(StatusCodes.BadSubscriptionIdInvalid, + $"Failed to create subscription {this} in session {session}"); } - Debug.Assert(Session == null, "Subscription should not be part of session"); - Debug.Assert(!CurrentlyMonitored.Any(), "Not all items removed."); - _logger.LogInformation("Subscription '{Subscription}' closed.", this); + + ResetMonitoredItemWatchdogTimer(PublishingEnabled); + LogRevisedValues(true); + Debug.Assert(Id != 0); + Debug.Assert(Created); + + _firstDataChangeReceived = false; } - catch (Exception e) + else { - _logger.LogError(e, "Failed to close subscription {Subscription}", this); + // + // Only needed when we reconfiguring a subscription with a single subscriber + // This is not yet implemented. + // TODO: Consider removing... + // + + // Apply new configuration on configuration on original subscription + var modifySubscription = false; + + if (DesiredKeepAliveCount != KeepAliveCount) + { + _logger.LogInformation( + "Change KeepAliveCount to {New} in Subscription {Subscription}...", + DesiredKeepAliveCount, this); + + KeepAliveCount = DesiredKeepAliveCount; + modifySubscription = true; + } + + if (PublishingInterval != (int)DesiredPublishingInterval.TotalMilliseconds) + { + _logger.LogInformation( + "Change publishing interval to {New} in Subscription {Subscription}...", + DesiredPublishingInterval, this); + PublishingInterval = (int)DesiredPublishingInterval.TotalMilliseconds; + modifySubscription = true; + } + + if (MaxNotificationsPerPublish != DesiredMaxNotificationsPerPublish) + { + _logger.LogInformation( + "Change MaxNotificationsPerPublish to {New} in Subscription {Subscription}", + DesiredMaxNotificationsPerPublish, this); + MaxNotificationsPerPublish = DesiredMaxNotificationsPerPublish; + modifySubscription = true; + } + + if (LifetimeCount != DesiredLifetimeCount) + { + _logger.LogInformation( + "Change LifetimeCount to {New} in Subscription {Subscription}...", + DesiredLifetimeCount, this); + LifetimeCount = DesiredLifetimeCount; + modifySubscription = true; + } + if (Priority != DesiredPriority) + { + _logger.LogInformation( + "Change Priority to {New} in Subscription {Subscription}...", + DesiredPriority, this); + Priority = DesiredPriority; + modifySubscription = true; + } + if (modifySubscription) + { + await ModifyAsync(ct).ConfigureAwait(false); + _logger.LogInformation( + "Subscription {Subscription} in session {Session} successfully modified.", + this, Session); + LogRevisedValues(false); + ResetMonitoredItemWatchdogTimer(PublishingEnabled); + } } + ResetKeepAliveTimer(); } /// - /// Synchronize monitored items in subscription (no lock) + /// Synchronize partition of monitored items into this subscription /// + /// + /// /// - private async Task SynchronizeMonitoredItemsAsync(CancellationToken ct) + /// + private async ValueTask SynchronizeMonitoredItemsAsync( + Partition partition, OperationLimitsModel operationLimits, CancellationToken ct) { - Debug.Assert(Session != null); if (Session is not OpcUaSession session) { - return false; + throw ServiceResultException.Create(StatusCodes.BadSessionIdInvalid, + "Session not connected."); } - // Get limits to batch requests during resolve - var operationLimits = await session.GetOperationLimitsAsync( - ct).ConfigureAwait(false); - // Get the items assigned to this subscription. #pragma warning disable CA2000 // Dispose objects before losing scope var desired = OpcUaMonitoredItem - .Create(_client, _client - .GetItems(Template) - .Select(i => (i.Item1, i.Item2.SetDefaults(_options.Value))), - _loggerFactory, _timeProvider) + .Create(_client, partition.Items, _loggerFactory, _timeProvider) .ToHashSet(); #pragma warning restore CA2000 // Dispose objects before losing scope @@ -780,6 +983,7 @@ private async Task SynchronizeMonitoredItemsAsync(CancellationToken ct) var remove = previouslyMonitored.Except(desired).ToHashSet(); var add = desired.Except(previouslyMonitored).ToHashSet(); var same = previouslyMonitored.ToHashSet(); + var errors = 0; same.IntersectWith(desired); // @@ -814,7 +1018,8 @@ private async Task SynchronizeMonitoredItemsAsync(CancellationToken ct) _logger.LogWarning( "Failed to resolve browse path in {Subscription} due to {ErrorInfo}...", this, results.ErrorInfo); - return false; + throw ServiceResultException.Create(results.ErrorInfo.StatusCode, + results.ErrorInfo.ErrorMessage ?? "Failed to resolve browse paths"); } foreach (var result in results) @@ -824,14 +1029,15 @@ private async Task SynchronizeMonitoredItemsAsync(CancellationToken ct) { resolvedId = result.Result.Targets[0].TargetId.ToNodeId( session.MessageContext.NamespaceUris); + result.Request!.Value.Update(resolvedId, session.MessageContext); } else { _logger.LogWarning("Failed to resolve browse path for {NodeId} " + "in {Subscription} due to '{ServiceResult}'", result.Request!.Value.NodeId, this, result.ErrorInfo); + errors++; } - result.Request!.Value.Update(resolvedId, session.MessageContext); } } @@ -901,7 +1107,6 @@ private async Task SynchronizeMonitoredItemsAsync(CancellationToken ct) var metadataChanged = new HashSet(); var applyChanges = false; var updated = 0; - var errors = 0; foreach (var toUpdate in same) { @@ -1008,7 +1213,6 @@ private async Task SynchronizeMonitoredItemsAsync(CancellationToken ct) } Debug.Assert(desired.Count == 0, "We should have processed all desired updates."); - var noErrorFound = errors == 0; if (applyChanges) { @@ -1072,7 +1276,7 @@ private async Task SynchronizeMonitoredItemsAsync(CancellationToken ct) this, results.ErrorInfo); // We will retry later. - noErrorFound = false; + errors++; } else { @@ -1183,7 +1387,8 @@ private async Task SynchronizeMonitoredItemsAsync(CancellationToken ct) itemsToChange[i].StartNodeId, this, results[i].StatusCode); } } - noErrorFound = false; + // Retry later + errors++; } } } @@ -1209,9 +1414,36 @@ private async Task SynchronizeMonitoredItemsAsync(CancellationToken ct) .Where(m => !m.AttachedToSubscription) .ToFrozenDictionary(m => m.ClientHandle, m => m); + // Notify semantic change now that we have update the monitored items + foreach (var owner in metadataChanged) + { + owner.OnMonitoredItemSemanticsChanged(); + } + + // Refresh condition + if (set.OfType().Any()) + { + _logger.LogInformation( + "Issuing ConditionRefresh on subscription {Subscription}", this); + try + { + await ConditionRefreshAsync(ct).ConfigureAwait(false); + _logger.LogInformation("ConditionRefresh on subscription " + + "{Subscription} has completed.", this); + } + catch (Exception e) + { + _logger.LogInformation("ConditionRefresh on subscription " + + "{Subscription} failed with an exception '{Message}'", + this, e.Message); + errors++; + } + } + set.ForEach(item => item.LogRevisedSamplingRateAndQueueSize()); _badMonitoredItems = invalidItems; + _errorsDuringSync = errors; _goodMonitoredItems = Math.Max(set.Count - invalidItems, 0); _reportingItems = set @@ -1233,6 +1465,7 @@ private async Task SynchronizeMonitoredItemsAsync(CancellationToken ct) _logger.LogInformation(@"{Subscription} - Now monitoring {Count} nodes: # Good/Bad: {Good}/{Bad} +# Errors: {Errors} # Reporting: {Reporting} # Sampling: {Sampling} # Heartbeat/ing: {Heartbeat}/{EnabledHeartbeats} @@ -1242,6 +1475,7 @@ private async Task SynchronizeMonitoredItemsAsync(CancellationToken ct) # Removed: {Disposed}", this, set.Count, _goodMonitoredItems, _badMonitoredItems, + _errorsDuringSync, _reportingItems, _samplingItems, _heartbeatItems, heartbeatsEnabled, @@ -1250,217 +1484,180 @@ private async Task SynchronizeMonitoredItemsAsync(CancellationToken ct) _notAppliedItems, dispose.Count); - // Notify semantic change now that we have update the monitored items - foreach (var owner in metadataChanged) - { - owner.OnMonitoredItemSemanticsChanged(); - } - - // Refresh condition - if (set.OfType().Any()) - { - _logger.LogInformation( - "Issuing ConditionRefresh on subscription {Subscription}", this); - try - { - await ConditionRefreshAsync(ct).ConfigureAwait(false); - _logger.LogInformation("ConditionRefresh on subscription " + - "{Subscription} has completed.", this); - } - catch (Exception e) - { - _logger.LogInformation("ConditionRefresh on subscription " + - "{Subscription} failed with an exception '{Message}'", - this, e.Message); - noErrorFound = false; - } - } - // Set up subscription management trigger - if (invalidItems != 0) + if (invalidItems != 0 || errors != 0) { // There were items that could not be added to subscription - TriggerSubscriptionManagementCallbackIn( - _options.Value.InvalidMonitoredItemRetryDelayDuration, TimeSpan.FromMinutes(5)); + return Delay(_options.Value.InvalidMonitoredItemRetryDelayDuration, + TimeSpan.FromMinutes(5)); } else if (desiredMonitoredItems.Count != set.Count) { // There were items !Valid but desired. - TriggerSubscriptionManagementCallbackIn( - _options.Value.BadMonitoredItemRetryDelayDuration, TimeSpan.FromMinutes(30)); + return Delay(_options.Value.BadMonitoredItemRetryDelayDuration, + TimeSpan.FromMinutes(30)); } else { // Nothing to do - TriggerSubscriptionManagementCallbackIn( - _options.Value.SubscriptionManagementIntervalDuration, Timeout.InfiniteTimeSpan); + return Delay(_options.Value.SubscriptionManagementIntervalDuration, + Timeout.InfiniteTimeSpan); + } + } + + /// + /// Initialize state + /// + private void Initialize() + { + FastKeepAliveCallback = OnSubscriptionKeepAliveNotification; + FastDataChangeCallback = OnSubscriptionDataChangeNotification; + FastEventCallback = OnSubscriptionEventNotificationList; + PublishStatusChanged += OnPublishStatusChange; + StateChanged += OnStateChange; + + TimestampsToReturn = Opc.Ua.TimestampsToReturn.Both; + DisableMonitoredItemCache = true; + } + + /// + /// Get child subscription + /// + /// + /// + private OpcUaSubscription? GetChildSubscription(bool createIfNotExist = false) + { + if (Session is OpcUaSession session) + { + if (_childId.HasValue && + session.SubscriptionHandles.TryGetValue(_childId.Value, + out var subscription)) + { + // Found entry + return subscription; + } + + if (createIfNotExist) + { + subscription = new OpcUaSubscription(this, parentId: SubscriptionId); + _childId = subscription.SubscriptionId; + session.AddSubscription(subscription); + return subscription; + } + + if (_childId != null) + { + _logger.LogError( + "Child subscription {ChildId} not found in session {Session}.", + _childId, session); + } + _childId = null; + } + return null; + } + + /// + /// Get all monitored items încluding all child subscriptions. + /// This call is used to collect all items recursively. + /// + /// + /// + private IEnumerable GetAllMonitoredItems( + IEnumerable? parent = null) + { + parent ??= Enumerable.Empty(); + + parent = parent.Concat(CurrentlyMonitored); + + var child = GetChildSubscription(); + if (child != null) + { + // Recursively concat the items of all children + parent = child.GetAllMonitoredItems(parent); } - - return noErrorFound; + return parent; } /// - /// Apply state to session + /// Collect metadata across this and all child subscriptions + /// recursively from parent to child to child and so on. /// + /// + /// + /// + /// + /// /// /// - private async ValueTask SyncInternalAsync(CancellationToken ct) + private async Task CollectMetaDataAsync(ISubscriber owner, OpcUaSession session, + ComplexTypeSystem? typeSystem, NodeIdDictionary dataTypes, + List fields, CancellationToken ct) { - if (_forceRecreate) - { - var session = Session; - _forceRecreate = false; - - _logger.LogInformation( - "======== Closing subscription {Subscription} and re-creating =========", this); - - // Does not throw - await CloseCurrentSubscriptionAsync().ConfigureAwait(false); - Debug.Assert(Session == null); - session.AddSubscription(this); // Re-add - Debug.Assert(Session == session); - } - - // Synchronize subscription through the session. - await SynchronizeSubscriptionAsync(ct).ConfigureAwait(false); - - await SynchronizeMonitoredItemsAsync(ct).ConfigureAwait(false); - - if (ChangesPending) + foreach (var monitoredItem in CurrentlyMonitored.Where(m => m.Owner == owner)) { - await ApplyChangesAsync(ct).ConfigureAwait(false); + await monitoredItem.GetMetaDataAsync(session, typeSystem, + fields, dataTypes, ct).ConfigureAwait(false); } - var shouldEnable = MonitoredItems - .OfType() - .Any(m => m.Valid && m.MonitoringMode != Opc.Ua.MonitoringMode.Disabled); - if (PublishingEnabled ^ shouldEnable) + var child = GetChildSubscription(); + if (child != null) { - await SetPublishingModeAsync(shouldEnable, ct).ConfigureAwait(false); - - _logger.LogInformation( - "{State} Subscription {Subscription} in session {Session}.", - shouldEnable ? "Enabled" : "Disabled", this, Session); - - ResetMonitoredItemWatchdogTimer(shouldEnable); + await child.CollectMetaDataAsync(owner, + session, typeSystem, dataTypes, fields, ct).ConfigureAwait(false); } } /// - /// Get a subscription with the supplied configuration (no lock) + /// Close subscription /// - /// /// - /// - private async ValueTask SynchronizeSubscriptionAsync(CancellationToken ct) + private async Task CloseCurrentSubscriptionAsync() { - Debug.Assert(Session.DefaultSubscription != null, "No default subscription template."); - - if (Handle == null) + ResetKeepAliveTimer(); + try { - Handle = _generation; // Initialized for the first time - DisplayName = Name; - PublishingEnabled = EnableImmediatePublishing; - KeepAliveCount = DesiredKeepAliveCount; - PublishingInterval = (int)DesiredPublishingInterval.TotalMilliseconds; - MaxNotificationsPerPublish = DesiredMaxNotificationsPerPublish; - LifetimeCount = DesiredLifetimeCount; - Priority = DesiredPriority; - - // TODO: use a channel and reorder task before calling OnMessage - // to order or else republish is called too often - RepublishAfterTransfer = DesiredRepublishAfterTransfer; - SequentialPublishing = EnableSequentialPublishing; - - _logger.LogInformation( - "Creating new {State} subscription {Subscription} in session {Session}.", - PublishingEnabled ? "enabled" : "disabled", this, Session); + Handle = null; // Mark as closed - Debug.Assert(Session != null); - await CreateAsync(ct).ConfigureAwait(false); - if (!Created) - { - Handle = null; - var session = Session; - await session.RemoveSubscriptionAsync(this, ct).ConfigureAwait(false); - Debug.Assert(Session == null); - throw new ServiceResultException(StatusCodes.BadSubscriptionIdInvalid, - $"Failed to create subscription {this} in session {session}"); - } + _logger.LogDebug("Closing subscription '{Subscription}'...", this); - ResetMonitoredItemWatchdogTimer(PublishingEnabled); - LogRevisedValues(true); - Debug.Assert(Id != 0); - Debug.Assert(Created); + // Dispose all monitored items + var items = CurrentlyMonitored.ToList(); - _firstDataChangeReceived = false; - } - else - { - // - // Only needed when we reconfiguring a subscription with a single subscriber - // This is not yet implemented. - // TODO: Consider removing... - // + _additionallyMonitored = FrozenDictionary.Empty; + RemoveItems(MonitoredItems); + _currentSequenceNumber = 0; + _goodMonitoredItems = 0; + _badMonitoredItems = 0; - // Apply new configuration on configuration on original subscription - var modifySubscription = false; + _reportingItems = 0; + _disabledItems = 0; + _samplingItems = 0; + _notAppliedItems = 0; - if (DesiredKeepAliveCount != KeepAliveCount) - { - _logger.LogInformation( - "Change KeepAliveCount to {New} in Subscription {Subscription}...", - DesiredKeepAliveCount, this); + ResetMonitoredItemWatchdogTimer(false); - KeepAliveCount = DesiredKeepAliveCount; - modifySubscription = true; - } + await Try.Async(() => SetPublishingModeAsync(false)).ConfigureAwait(false); + await Try.Async(() => DeleteItemsAsync(default)).ConfigureAwait(false); + await Try.Async(() => ApplyChangesAsync()).ConfigureAwait(false); - if (PublishingInterval != (int)DesiredPublishingInterval.TotalMilliseconds) - { - _logger.LogInformation( - "Change publishing interval to {New} in Subscription {Subscription}...", - DesiredPublishingInterval, this); - PublishingInterval = (int)DesiredPublishingInterval.TotalMilliseconds; - modifySubscription = true; - } + items.ForEach(item => item.Dispose()); + _logger.LogDebug("Deleted {Count} monitored items for '{Subscription}'.", + items.Count, this); - if (MaxNotificationsPerPublish != DesiredMaxNotificationsPerPublish) - { - _logger.LogInformation( - "Change MaxNotificationsPerPublish to {New} in Subscription {Subscription}", - DesiredMaxNotificationsPerPublish, this); - MaxNotificationsPerPublish = DesiredMaxNotificationsPerPublish; - modifySubscription = true; - } + await Try.Async(() => DeleteAsync(true)).ConfigureAwait(false); - if (LifetimeCount != DesiredLifetimeCount) - { - _logger.LogInformation( - "Change LifetimeCount to {New} in Subscription {Subscription}...", - DesiredLifetimeCount, this); - LifetimeCount = DesiredLifetimeCount; - modifySubscription = true; - } - if (Priority != DesiredPriority) - { - _logger.LogInformation( - "Change Priority to {New} in Subscription {Subscription}...", - DesiredPriority, this); - Priority = DesiredPriority; - modifySubscription = true; - } - if (modifySubscription) + if (Session != null) { - await ModifyAsync(ct).ConfigureAwait(false); - _logger.LogInformation( - "Subscription {Subscription} in session {Session} successfully modified.", - this, Session); - LogRevisedValues(false); - ResetMonitoredItemWatchdogTimer(PublishingEnabled); + await Session.RemoveSubscriptionAsync(this).ConfigureAwait(false); } + Debug.Assert(Session == null, "Subscription should not be part of session"); + Debug.Assert(!CurrentlyMonitored.Any(), "Not all items removed."); + _logger.LogInformation("Subscription '{Subscription}' closed.", this); + } + catch (Exception e) + { + _logger.LogError(e, "Failed to close subscription {Subscription}", this); } - ResetKeepAliveTimer(); } /// @@ -1483,12 +1680,12 @@ private void LogRevisedValues(bool created) } /// - /// Trigger subscription management callback + /// Calculate delay /// /// /// - private void TriggerSubscriptionManagementCallbackIn(TimeSpan? delay, - TimeSpan defaultDelay = default) + /// + private static TimeSpan Delay(TimeSpan? delay, TimeSpan defaultDelay) { if (delay == null) { @@ -1498,13 +1695,7 @@ private void TriggerSubscriptionManagementCallbackIn(TimeSpan? delay, { delay = Timeout.InfiniteTimeSpan; } - if (delay != Timeout.InfiniteTimeSpan) - { - _logger.LogInformation( - "Setting up trigger to reapply state to {Subscription} in {Timeout}...", - this, delay); - } - _timer.Change(delay.Value, Timeout.InfiniteTimeSpan); + return delay.Value; } /// @@ -1512,6 +1703,8 @@ private void TriggerSubscriptionManagementCallbackIn(TimeSpan? delay, /// private void TriggerManageSubscription() { + Debug.Assert(IsRoot); + if (IsClosed) { return; @@ -1533,6 +1726,77 @@ private void TriggerManageSubscription() _client.TriggerSubscriptionSynchronization(this); } + /// + /// Send notification + /// + /// + /// + /// + /// + /// + /// + /// + private void SendNotification(ISubscriber callback, MessageType messageType, + IList notifications, ISession? session, + string? eventTypeName, bool diagnosticsOnly, DateTimeOffset? timestamp) + { + var curSession = session ?? Session; + var messageContext = curSession?.MessageContext; + + if (messageContext == null) + { + if (session == null) + { + // Can only send with context + _logger.LogDebug("Failed to send notification since no session exists " + + "to use as context. Notification was dropped."); + return; + } + _logger.LogWarning("A session was passed to send notification with but without " + + "message context. Using thread context."); + messageContext = ServiceMessageContext.ThreadContext; + } + +#pragma warning disable CA2000 // Dispose objects before losing scope + var message = new OpcUaSubscriptionNotification(this, messageContext, notifications, + _timeProvider, createdTimestamp: timestamp) + { + ApplicationUri = curSession?.Endpoint?.Server?.ApplicationUri, + EndpointUrl = curSession?.Endpoint?.EndpointUrl, + EventTypeName = eventTypeName, + SequenceNumber = Opc.Ua.SequenceNumber.Increment32(ref _sequenceNumber), + MessageType = messageType + }; +#pragma warning restore CA2000 // Dispose objects before losing scope + + var count = message.GetDiagnosticCounters(out var modelChanges, + out var heartbeats, out var overflows); + if (messageType == MessageType.Event || messageType == MessageType.Condition) + { + if (!diagnosticsOnly) + { + callback.OnSubscriptionEventReceived(message); + } + if (count > 0) + { + callback.OnSubscriptionEventDiagnosticsChange(false, + count, overflows, modelChanges == 0 ? 0 : 1); + } + } + else + { + if (!diagnosticsOnly) + { + callback.OnSubscriptionDataChangeReceived(message); + } + if (count > 0) + { + callback.OnSubscriptionDataDiagnosticsChange(false, + count, overflows, heartbeats); + } + } + } + /// /// Handle event notification. Depending on the sequential publishing setting /// this will be called in order and thread safe or from different threads. @@ -2297,6 +2561,61 @@ private void OnStateChange(Subscription subscription, SubscriptionStateChangedEv } } + /// + /// Helper to partition subscribers across subscriptions. Uses a bag packing + /// algorithm. + /// + private sealed class Partition + { + /// + /// Monitored items that should be in the subscription partition + /// + public List<(ISubscriber, BaseMonitoredItemModel)> Items { get; } = new(); + + /// + /// Create + /// + /// + /// + /// + /// + public static List Create(IEnumerable subscribers, + uint maxMonitoredItemsInPartition, OpcUaSubscriptionOptions options) + { + var partitions = new List(); + foreach (var subscriberItems in subscribers + .Select(s => s.MonitoredItems + .Select(m => (s, m.SetDefaults(options))) + .ToList()) + .OrderByDescending(tl => tl.Count)) + { + bool placed = false; + foreach (var partition in partitions) + { + if (partition.Items.Count + + subscriberItems.Count <= maxMonitoredItemsInPartition) + { + partition.Items.AddRange(subscriberItems); + placed = true; + break; + } + } + if (!placed) + { + // Break items into batches of max here and add partition each + foreach (var batch in subscriberItems.Batch( + (int)maxMonitoredItemsInPartition)) + { + var newPartition = new Partition(); + newPartition.Items.AddRange(batch); + partitions.Add(newPartition); + } + } + } + return partitions; + } + } + /// /// Helper to advance the sequence number when all notifications are /// completed. @@ -2422,6 +2741,7 @@ public void InitializeMetrics() static double Ratio(int value, int count) => count == 0 ? 0.0 : (double)value / count; } + private const int kMaxMonitoredItemPerSubscriptionDefault = 64 * 1024; private static readonly TimeSpan kDefaultErrorRetryDelay = TimeSpan.FromMinutes(1); private FrozenDictionary _additionallyMonitored; private uint _previousSequenceNumber; @@ -2429,7 +2749,8 @@ public void InitializeMetrics() private uint _currentSequenceNumber; private bool _firstDataChangeReceived; private bool _forceRecreate; - private readonly uint _generation; + private uint? _childId; + private readonly uint? _parentId; private readonly OpcUaClient _client; private readonly IOptions _options; private readonly TimeSpan? _createSessionTimeout; @@ -2452,6 +2773,7 @@ public void InitializeMetrics() private int _conditionItems; private int _lateMonitoredItems; private int _badMonitoredItems; + private int _errorsDuringSync; private int _missingKeepAlives; private int _continuouslyMissingKeepAlives; private long _unassignedNotifications;