Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deserialize empty payloads #455

Merged
merged 12 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions src/NATS.Client.Core/INatsSerialize.cs
Original file line number Diff line number Diff line change
Expand Up @@ -367,16 +367,22 @@

/// <inheritdoc />
public T? Deserialize(in ReadOnlySequence<byte> buffer)
{

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / test (main)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / test (main)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / test (v2.9)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / test (v2.9)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / test (latest)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / test (latest)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / check

An opening brace should not be followed by a blank line

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / check

An opening brace should not be followed by a blank line

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / memory test (latest)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / memory test (v2.9)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / memory test (v2.9)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / memory test (v2.9)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / memory test (v2.9)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (main)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.9)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.9)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.9)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.9)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.9)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (v2.9)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / memory test (main)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)

Check warning on line 370 in src/NATS.Client.Core/INatsSerialize.cs

View workflow job for this annotation

GitHub Actions / dotnet (latest)


if (typeof(T) == typeof(string))
{
if (buffer.Length == 0)
return default;
return (T)(object)Encoding.UTF8.GetString(buffer);
}

var span = buffer.IsSingleSegment ? buffer.FirstSpan : buffer.ToArray();

if (typeof(T) == typeof(DateTime) || typeof(T) == typeof(DateTime?))
{
if (buffer.Length == 0)
return default;

if (Utf8Parser.TryParse(span, out DateTime value, out _))
{
return (T)(object)value;
Expand All @@ -387,6 +393,9 @@

if (typeof(T) == typeof(DateTimeOffset) || typeof(T) == typeof(DateTimeOffset?))
{
if (buffer.Length == 0)
return default;

if (Utf8Parser.TryParse(span, out DateTimeOffset value, out _))
{
return (T)(object)value;
Expand All @@ -397,6 +406,9 @@

if (typeof(T) == typeof(Guid) || typeof(T) == typeof(Guid?))
{
if (buffer.Length == 0)
return default;

if (Utf8Parser.TryParse(span, out Guid value, out _))
{
return (T)(object)value;
Expand All @@ -407,6 +419,9 @@

if (typeof(T) == typeof(TimeSpan) || typeof(T) == typeof(TimeSpan?))
{
if (buffer.Length == 0)
return default;

if (Utf8Parser.TryParse(span, out TimeSpan value, out _))
{
return (T)(object)value;
Expand All @@ -417,6 +432,9 @@

if (typeof(T) == typeof(bool) || typeof(T) == typeof(bool?))
{
if (buffer.Length == 0)
return default;

if (Utf8Parser.TryParse(span, out bool value, out _))
{
return (T)(object)value;
Expand All @@ -427,6 +445,9 @@

if (typeof(T) == typeof(byte) || typeof(T) == typeof(byte?))
{
if (buffer.Length == 0)
return default;

if (Utf8Parser.TryParse(span, out byte value, out _))
{
return (T)(object)value;
Expand All @@ -437,6 +458,9 @@

if (typeof(T) == typeof(decimal) || typeof(T) == typeof(decimal?))
{
if (buffer.Length == 0)
return default;

if (Utf8Parser.TryParse(span, out decimal value, out _))
{
return (T)(object)value;
Expand All @@ -447,6 +471,9 @@

if (typeof(T) == typeof(double) || typeof(T) == typeof(double?))
{
if (buffer.Length == 0)
return default;

if (Utf8Parser.TryParse(span, out double value, out _))
{
return (T)(object)value;
Expand All @@ -457,6 +484,9 @@

if (typeof(T) == typeof(float) || typeof(T) == typeof(float?))
{
if (buffer.Length == 0)
return default;

if (Utf8Parser.TryParse(span, out float value, out _))
{
return (T)(object)value;
Expand All @@ -467,6 +497,9 @@

if (typeof(T) == typeof(int) || typeof(T) == typeof(int?))
{
if (buffer.Length == 0)
return default;

if (Utf8Parser.TryParse(span, out int value, out _))
{
return (T)(object)value;
Expand All @@ -477,6 +510,9 @@

if (typeof(T) == typeof(long) || typeof(T) == typeof(long?))
{
if (buffer.Length == 0)
return default;

if (Utf8Parser.TryParse(span, out long value, out _))
{
return (T)(object)value;
Expand All @@ -487,6 +523,9 @@

if (typeof(T) == typeof(sbyte) || typeof(T) == typeof(sbyte?))
{
if (buffer.Length == 0)
return default;

if (Utf8Parser.TryParse(span, out sbyte value, out _))
{
return (T)(object)value;
Expand All @@ -497,6 +536,9 @@

if (typeof(T) == typeof(short) || typeof(T) == typeof(short?))
{
if (buffer.Length == 0)
return default;

if (Utf8Parser.TryParse(span, out short value, out _))
{
return (T)(object)value;
Expand All @@ -507,6 +549,9 @@

if (typeof(T) == typeof(uint) || typeof(T) == typeof(uint?))
{
if (buffer.Length == 0)
return default;

if (Utf8Parser.TryParse(span, out uint value, out _))
{
return (T)(object)value;
Expand All @@ -517,6 +562,9 @@

if (typeof(T) == typeof(ulong) || typeof(T) == typeof(ulong?))
{
if (buffer.Length == 0)
return default;

if (Utf8Parser.TryParse(span, out ulong value, out _))
{
return (T)(object)value;
Expand Down Expand Up @@ -615,26 +663,36 @@
{
if (typeof(T) == typeof(byte[]))
{
if (buffer.Length == 0)
return default;
return (T)(object)buffer.ToArray();
}

if (typeof(T) == typeof(Memory<byte>))
{
if (buffer.Length == 0)
return default;
return (T)(object)new Memory<byte>(buffer.ToArray());
}

if (typeof(T) == typeof(ReadOnlyMemory<byte>))
{
if (buffer.Length == 0)
return default;
return (T)(object)new ReadOnlyMemory<byte>(buffer.ToArray());
}

if (typeof(T) == typeof(ReadOnlySequence<byte>))
{
if (buffer.Length == 0)
return default;
return (T)(object)new ReadOnlySequence<byte>(buffer.ToArray());
}

if (typeof(T) == typeof(IMemoryOwner<byte>) || typeof(T) == typeof(NatsMemoryOwner<byte>))
{
if (buffer.Length == 0)
return default;
var memoryOwner = NatsMemoryOwner<byte>.Allocate((int)buffer.Length);
buffer.CopyTo(memoryOwner.Memory.Span);
return (T)(object)memoryOwner;
Expand Down Expand Up @@ -725,6 +783,11 @@
/// <inheritdoc />
public T? Deserialize(in ReadOnlySequence<byte> buffer)
{
if (buffer.Length == 0)
{
return default;
}

foreach (var context in _contexts)
{
if (context.GetTypeInfo(typeof(T)) is JsonTypeInfo<T> jsonTypeInfo)
Expand Down
5 changes: 1 addition & 4 deletions src/NATS.Client.Core/NatsMsg.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,8 @@ internal static NatsMsg<T> Build(

headers?.SetReadOnly();

// Consider an empty payload as null or default value for value types. This way we are able to
// receive sentinels as nulls or default values. This might cause an issue with where we are not
// able to differentiate between an empty sentinel and actual default value of a struct e.g. 0 (zero).
T? data;
if (headers?.Error == null && payloadBuffer.Length > 0)
if (headers?.Error == null)
{
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ internal sealed class NatsJSErrorAwareJsonSerializer<T> : INatsDeserialize<T>

public T? Deserialize(in ReadOnlySequence<byte> buffer)
{
if (buffer.Length == 0)
{
return default;
}

// We need to determine what type we're deserializing into
// .NET 6 new APIs to the rescue: we can read the buffer once
// by deserializing into a document, inspect and using the new
Expand Down
5 changes: 5 additions & 0 deletions src/NATS.Client.Serializers.Json/NatsJsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public void Serialize(IBufferWriter<byte> bufferWriter, T? value)
/// <inheritdoc />
public T? Deserialize(in ReadOnlySequence<byte> buffer)
{
if (buffer.Length == 0)
{
return default;
}

var reader = new Utf8JsonReader(buffer); // Utf8JsonReader is ref struct, no allocate.
return JsonSerializer.Deserialize<T>(ref reader, _opts);
}
Expand Down
74 changes: 74 additions & 0 deletions tests/NATS.Client.Core.Tests/SerializerTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Buffers;
using System.Text;

namespace NATS.Client.Core.Tests;

Expand Down Expand Up @@ -58,6 +59,68 @@ public async Task NatsMemoryOwner_empty_payload_should_not_throw()
Assert.Equal(0, msg.Data.Span.Length);
}
}

[Fact]
public async Task Deserialize_with_empty()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection();

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

await nats.ConnectAsync();

var serializer = new TestSerializerWithEmpty<TestData>();
var sub = await nats.SubscribeCoreAsync("foo", serializer: serializer, cancellationToken: cancellationToken);

await nats.PublishAsync("foo", cancellationToken: cancellationToken);
await nats.PublishAsync("foo", "something", cancellationToken: cancellationToken);

var result1 = await sub.Msgs.ReadAsync(cancellationToken);
Assert.NotNull(result1.Data);
Assert.Equal("__EMPTY__", result1.Data.Name);

var result2 = await sub.Msgs.ReadAsync(cancellationToken);
Assert.NotNull(result2.Data);
Assert.Equal("something", result2.Data.Name);
}

[Fact]
public async Task Deserialize_chained_with_empty()
{
await using var server = NatsServer.Start();
await using var nats = server.CreateClientConnection(new NatsOpts
{
SerializerRegistry = new TestSerializerRegistry(),
});

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

await nats.ConnectAsync();

var serializer = new TestSerializerWithEmpty<string>();
var sub = await nats.SubscribeCoreAsync<TestData>("foo", cancellationToken: cancellationToken);

await nats.PublishAsync("foo", cancellationToken: cancellationToken);
await nats.PublishAsync("foo", "something", cancellationToken: cancellationToken);

var result1 = await sub.Msgs.ReadAsync(cancellationToken);
Assert.NotNull(result1.Data);
Assert.Equal("__EMPTY__", result1.Data.Name);

var result2 = await sub.Msgs.ReadAsync(cancellationToken);
Assert.NotNull(result2.Data);
Assert.Equal("something", result2.Data.Name);
}
}

public class TestSerializerRegistry : INatsSerializerRegistry
{
public INatsSerialize<T> GetSerializer<T>() => new NatsUtf8PrimitivesSerializer<T>(new TestSerializerWithEmpty<T>());

public INatsDeserialize<T> GetDeserializer<T>() => new NatsUtf8PrimitivesSerializer<T>(new TestSerializerWithEmpty<T>());
}

public class TestSerializer<T> : INatsSerialize<T>, INatsDeserialize<T>
Expand All @@ -70,3 +133,14 @@ public class TestSerializer<T> : INatsSerialize<T>, INatsDeserialize<T>
public class TestSerializerException : Exception
{
}

public class TestSerializerWithEmpty<T> : INatsSerializer<T>
{
public T? Deserialize(in ReadOnlySequence<byte> buffer) => (T)(object)(buffer.IsEmpty
? new TestData("__EMPTY__")
: new TestData(Encoding.ASCII.GetString(buffer)));

public void Serialize(IBufferWriter<byte> bufferWriter, T value) => throw new Exception("not used");
}

public record TestData(string Name);
Loading