Skip to content

Commit

Permalink
Split subscriptions per max items per subscription limits (#2336)
Browse files Browse the repository at this point in the history
* Re-use subscription when single subscriber
* Split subscription
* #2332
* #2333 
* #2337
  • Loading branch information
marcschier authored Sep 5, 2024
1 parent 7e528da commit c61c861
Show file tree
Hide file tree
Showing 13 changed files with 1,096 additions and 644 deletions.
8 changes: 7 additions & 1 deletion docs/opc-publisher/commandline.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ Messaging configuration
subscription setup. Set to `0` to block until
metadata is loaded.
Only used if meta data is supported and enabled.
Default: `not set` (block forever).
Default: `5000` milliseconds.
--ps, --publishschemas, --PublishMessageSchema[=VALUE]
Publish the Avro or Json message schemas to schema
registry or subtopics.
Expand Down Expand Up @@ -693,6 +693,12 @@ Subscription settings
client reads instead of subscriptions services,
unless otherwise configured.
Default: `false`.
--xmi, --maxmonitoreditems, --MaxMonitoredItemPerSubscription=VALUE
Max monitored items per subscription until the
subscription is split.
This is used if the server does not provide
limits in its server capabilities.
Default: `not set`.
--da, --deferredacks, --UseDeferredAcknoledgements[=VALUE]
(Experimental) Acknoledge subscription
notifications only when the data has been
Expand Down
14 changes: 11 additions & 3 deletions docs/opc-publisher/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -714,11 +714,19 @@ The OPC UA subscription/monitored items service due to its async model (server s

### Overcoming server limits and interop limitations

OPC UA servers can limit the amount of sessions, subscriptions or publishing requests allowed at any point in time. OPC Publisher has several options to overcome these limits that can be used to tune and overcome interoperability issues.
OPC UA servers can be limited with regards to the amount of sessions, subscriptions or publishing requests they support. By default OPC Publisher tries to bundle as many writers with the same subscription configuration (including the publishing interval) into a OPC UA subscription inside a (writer group) session. It uses the `MaxMonitoredItemsPerSubscription` limit provided by in the Server capabilities object read by OPC Publisher when the session is created to create the right number of subscriptions that hold as many monitored items as possible. If the limit a not provided by the server or is 0, OPC Publisher uses a default value of `65536`. This value can be overridden using the `--xmi --maxmonitoreditems` [command line](./commandline.md) option.

- To minimize the number of sessions against a server, the default behavior of creating a session per writer group can be overridden using the `--dsg, --disablesessionpergroup` [command line](./commandline.md) option which results in a *session per endpoint* spanning multiple writer groups with the same endpoint url and configuration.
OPC Publisher has several options to overcome additional server limitations and that can be used to tune and overcome interoperability issues.

- To further limit the number of subscriptions avoid specifying different publishing intervals for items as each publishing interval will result in its own subscription. You can use the `--ipi, --ignorepublishingintervals` command line option to *ignore publishing interval configuration* in the JSON configuration and use the publishing interval configured using the `--op` command line option (default: 1 second). In addition you can set the `--op=0` to let the server decide the smallest publishing interval it offers. You can also use the `--aq, --autosetqueuesize` option to let OPC Publisher calculate the best queue size for monitored items in the subscription to limit data loss. Note that the `--npd` command line option (default 1000) will still split the data set writer into multiple subscriptions if more nodes than the configured amount are specified.
- To minimize the number of sessions against a server, the default behavior of creating a session per writer group can be overridden using the `--dsg, --disablesessionpergroup` command line option which results in a *session per endpoint* spanning multiple writer groups with the same endpoint url and configuration.

- To further limit the number of subscriptions avoid specifying different publishing intervals for the `OpcNodes` items in the OPC Publisher [configuration](#configuration-schema). Each publishing interval will result in a subscription with the server inside the (writer group) session.

- You can use the `--ipi, --ignorepublishingintervals` command line option to *ignore publishing interval configuration* in the JSON configuration and use the publishing interval configured using the `--op` command line option (default: 1 second).

- In addition you can set the `--op=0` to let the server decide the smallest publishing interval it offers.

- You can also use the `--aq, --autosetqueuesize` option to let OPC Publisher calculate the best queue size for monitored items in the subscription to limit data loss.

- By default OPC Publisher tries to dispatch as many publishing requests to a server session as there are subscriptions in the session up to a maximum of `10`. The OPC UA stack tries to gradually lower the number based on feedback from the server (`BadTooManyPublishRequests`). This behavior is not tolerated by some servers. To set a lower maximum that OPC Publisher should never exceed use the `--xpr` command line option.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ public CommandLine(string[] args, CommandLineLogger? logger = null)
{ $"ucr|usecyclicreads:|{OpcUaSubscriptionConfig.DefaultSamplingUsingCyclicReadKey}:",
"All nodes should be sampled using periodical client reads instead of subscriptions services, unless otherwise configured.\nDefault: `false`.\n",
(bool? b) => this[OpcUaSubscriptionConfig.DefaultSamplingUsingCyclicReadKey] = b?.ToString() ?? "True" },
{ $"xmi|maxmonitoreditems=|{OpcUaSubscriptionConfig.MaxMonitoredItemPerSubscriptionKey}=",
"Max monitored items per subscription until the subscription is split.\nThis is used if the server does not provide limits in its server capabilities.\nDefault: `not set`.\n",
(uint u) => this[OpcUaSubscriptionConfig.MaxMonitoredItemPerSubscriptionKey] = u.ToString(CultureInfo.CurrentCulture) },
{ $"da|deferredacks:|{OpcUaSubscriptionConfig.UseDeferredAcknoledgementsKey}:",
"(Experimental) Acknoledge subscription notifications only when the data has been successfully published.\nDefault: `false`.\n",
(bool? b) => this[OpcUaSubscriptionConfig.UseDeferredAcknoledgementsKey] = b?.ToString() ?? "True" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,12 @@ public async Task SwitchServerWithDifferentWriterGroupTest()
}

[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task AddNodeToDataSetWriterGroupWithNodeUsingDeviceMethod(bool differentPublishingInterval)
[InlineData(false, 100)]
[InlineData(true, 100)]
[InlineData(false, 1)]
[InlineData(true, 1)]
public async Task AddNodeToDataSetWriterGroupWithNodeUsingDeviceMethod(bool differentPublishingInterval,
int maxMonitoredItems)
{
var server = new ReferenceServer();
EndpointUrl = server.EndpointUrl;
Expand All @@ -170,7 +173,7 @@ public async Task AddNodeToDataSetWriterGroupWithNodeUsingDeviceMethod(bool diff
// Set both to the same so that there is a single writer instead of 2
testInput2[0].OpcNodes[0].OpcPublishingInterval = testInput1[0].OpcNodes[0].OpcPublishingInterval;
}
StartPublisher(name, arguments: new string[] { "--mm=PubSub", "--dm=false" });
StartPublisher(name, arguments: new string[] { "--mm=PubSub", "--dm=false", "--xmi=" + maxMonitoredItems });
try
{
var endpoints = await PublisherApi.GetConfiguredEndpointsAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,14 +585,16 @@ public async Task PeriodicHeartbeatTest()
#endif
}

[Fact]
public async Task CanSendKeyFramesToIoTHubTest()
[Theory]
[InlineData(100)]
[InlineData(1)]
public async Task CanSendKeyFramesToIoTHubTest(int maxMonitoredItems)
{
// Arrange
// Act
var (metadata, messages) = await ProcessMessagesAndMetadataAsync(
nameof(CanSendDataItemToIoTHubTest), "./Resources/KeyFrames.json", TimeSpan.FromMinutes(2), 11,
messageType: "ua-data", arguments: new[] { "--dm=false" });
nameof(CanSendKeyFramesToIoTHubTest), "./Resources/KeyFrames.json", TimeSpan.FromMinutes(2), 11,
messageType: "ua-data", arguments: new[] { "--dm=false", "--xmi=" + maxMonitoredItems });

// Assert
var allDataSetMessages = messages.Select(m => m.Message.GetProperty("Messages")).SelectMany(m => m.EnumerateArray()).ToList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,14 +427,16 @@ public async Task CanSendPendingConditionsToIoTHubTestWithDeviceMethod()
}
}

[Fact]
public async Task CanSendDataItemToIoTHubTestWithDeviceMethod2()
[Theory]
[InlineData(100)]
[InlineData(1)]
public async Task CanSendDataItemToIoTHubTestWithDeviceMethod2(int maxMonitoredItems)
{
const string name = nameof(CanSendDataItemToIoTHubTestWithDeviceMethod2);
var testInput1 = GetEndpointsFromFile(name, "./Resources/DataItems.json");
var testInput2 = GetEndpointsFromFile(name, "./Resources/SimpleEvents.json");
var testInput3 = GetEndpointsFromFile(name, "./Resources/PendingAlarms.json");
StartPublisher(name);
StartPublisher(name, arguments: new[] { "--xmi=" + maxMonitoredItems });
try
{
var endpoints = await PublisherApi.GetConfiguredEndpointsAsync();
Expand All @@ -453,8 +455,9 @@ public async Task CanSendDataItemToIoTHubTestWithDeviceMethod2()
endpoints = await PublisherApi.GetConfiguredEndpointsAsync();
Assert.Empty(endpoints.Endpoints);

await PublisherApi.AddOrUpdateEndpointsAsync(new List<PublishedNodesEntryModel> {
new()
await PublisherApi.AddOrUpdateEndpointsAsync(new List<PublishedNodesEntryModel>
{
new ()
{
OpcNodes = nodes.OpcNodes.ToList(),
EndpointUrl = e.EndpointUrl,
Expand All @@ -468,6 +471,12 @@ await PublisherApi.AddOrUpdateEndpointsAsync(new List<PublishedNodesEntryModel>
nodes = await PublisherApi.GetConfiguredNodesOnEndpointAsync(e);
Assert.Equal(3, nodes.OpcNodes.Count);

var messages1 = await WaitForMessagesAsync(GetDataFrame);
var message1 = Assert.Single(messages1).Message;
Assert.Equal("ns=23;i=1259", message1.GetProperty("NodeId").GetString());
Assert.InRange(message1.GetProperty("Value").GetProperty("Value").GetDouble(),
double.MinValue, double.MaxValue);

_output.WriteLine("Removing items...");
await PublisherApi.UnpublishNodesAsync(testInput3[0]);
nodes = await PublisherApi.GetConfiguredNodesOnEndpointAsync(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public sealed class OpcUaSubscriptionConfig : PostConfigureOptionBase<OpcUaSubsc
public const string AutoSetQueueSizesKey = "AutoSetQueueSizes";
public const string DefaultLifetimeCountKey = "DefaultLifetimeCount";
public const string DefaultKeepAliveCountKey = "DefaultKeepAliveCount";
public const string MaxMonitoredItemPerSubscriptionKey = "MaxMonitoredItemPerSubscription";
public const string UseDeferredAcknoledgementsKey = "UseDeferredAcknoledgements";
public const string DefaultSamplingUsingCyclicReadKey = "DefaultSamplingUsingCyclicRead";
public const string EnableImmediatePublishingKey = "EnableImmediatePublishing";
Expand Down Expand Up @@ -184,6 +185,8 @@ public override void PostConfigure(string? name, OpcUaSubscriptionOptions option
options.DefaultQueueSize ??= (uint?)GetIntOrNull(DefaultQueueSizeKey);
options.AutoSetQueueSizes ??= GetBoolOrNull(AutoSetQueueSizesKey);

options.MaxMonitoredItemPerSubscription ??= (uint?)GetIntOrNull(MaxMonitoredItemPerSubscriptionKey);

var unsMode = _options.Value.DefaultDataSetRouting ?? DataSetRoutingMode.None;
options.FetchOpcBrowsePathFromRoot ??= unsMode != DataSetRoutingMode.None
? true : GetBoolOrNull(FetchOpcBrowsePathFromRootKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public sealed class OpcUaSubscriptionOptions
/// </summary>
public TimeSpan? DefaultPublishingInterval { get; set; }

/// <summary>
/// Allow max monitored item per subscription. If the server
/// supports less, this value takes no effect.
/// </summary>
public uint? MaxMonitoredItemPerSubscription { get; set; }

/// <summary>
/// Default subscription keep alive counter
/// </summary>
Expand Down
Loading

0 comments on commit c61c861

Please sign in to comment.