Skip to content

Commit

Permalink
Refactor message pipeline (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShortDevelopment committed Mar 15, 2024
1 parent be987ad commit 7e22d60
Show file tree
Hide file tree
Showing 8 changed files with 404 additions and 350 deletions.
16 changes: 7 additions & 9 deletions lib/ShortDev.Microsoft.ConnectedDevices/CdpChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
using ShortDev.Microsoft.ConnectedDevices.Messages.Control;
using ShortDev.Microsoft.ConnectedDevices.Messages.Session;
using ShortDev.Microsoft.ConnectedDevices.Transports;
using System;
using System.Collections.Generic;

namespace ShortDev.Microsoft.ConnectedDevices;

Expand Down Expand Up @@ -58,14 +56,14 @@ public void SendBinaryMessage(BodyCallback bodyCallback, uint msgId, List<Additi
if (headers != null)
header.AdditionalHeaders = headers;

Session.SendMessage(Socket, header, writer =>
EndianWriter writer = new(Endianness.BigEndian);
new BinaryMsgHeader()
{
new BinaryMsgHeader()
{
MessageId = msgId
}.Write(writer);
bodyCallback(writer);
});
MessageId = msgId
}.Write(writer);
bodyCallback(writer);

Session.SendMessage(Socket, header, writer);
}

void IDisposable.Dispose()
Expand Down
365 changes: 166 additions & 199 deletions lib/ShortDev.Microsoft.ConnectedDevices/CdpSession.cs

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion lib/ShortDev.Microsoft.ConnectedDevices/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ public static class Constants
public const string RfcommServiceId = "c7f94713-891e-496a-a0e7-983a0946126e";
public const string RfcommServiceName = "CDP Proximal Transport";

public const int DefaultMessageFragmentSize = 16384;
public const int HMacSize = 32;
public const int IVSize = 16;

Expand Down
125 changes: 65 additions & 60 deletions lib/ShortDev.Microsoft.ConnectedDevices/Internal/UpgradeHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ void HandleTransportRequest(CdpSocket socket, ref EndianReader reader)
Type = MessageType.Connect
};

_session.SendMessage(socket, header, (writer) =>
EndianWriter writer = new(Endianness.BigEndian);
new ConnectionHeader()
{
new ConnectionHeader()
{
ConnectionMode = ConnectionMode.Proximal,
MessageType = allowed ? ConnectionType.TransportConfirmation : ConnectionType.UpgradeFailure
}.Write(writer);
msg.Write(writer);
});
ConnectionMode = ConnectionMode.Proximal,
MessageType = allowed ? ConnectionType.TransportConfirmation : ConnectionType.UpgradeFailure
}.Write(writer);
msg.Write(writer);

_session.SendMessage(socket, header, writer);
}

void HandleUpgradeRequest(CdpSocket socket, ref EndianReader reader)
Expand All @@ -127,26 +127,25 @@ void HandleUpgradeRequest(CdpSocket socket, ref EndianReader reader)
var localIp = _session.Platform.TryGetTransport<NetworkTransport>()?.Handler.TryGetLocalIp();
if (localIp == null)
{
_session.SendMessage(socket, header, (writer) =>
EndianWriter writer = new(Endianness.BigEndian);
new ConnectionHeader()
{
new ConnectionHeader()
{
ConnectionMode = ConnectionMode.Proximal,
MessageType = ConnectionType.UpgradeFailure
}.Write(writer);
new HResultPayload()
{
HResult = -1
}.Write(writer);
});
ConnectionMode = ConnectionMode.Proximal,
MessageType = ConnectionType.UpgradeFailure
}.Write(writer);
new HResultPayload()
{
HResult = -1
}.Write(writer);

_session.SendMessage(socket, header, writer);
return;
}

_upgradeIds.Add(msg.UpgradeId);

_session.SendMessage(socket, header, (writer) =>
{
EndianWriter writer = new(Endianness.BigEndian);
new ConnectionHeader()
{
ConnectionMode = ConnectionMode.Proximal,
Expand All @@ -163,7 +162,9 @@ void HandleUpgradeRequest(CdpSocket socket, ref EndianReader reader)
EndpointMetadata.Tcp
]
}.Write(writer);
});

_session.SendMessage(socket, header, writer);
}
}

void HandleUpgradeFinalization(CdpSocket socket, ref EndianReader reader)
Expand All @@ -178,14 +179,14 @@ void HandleUpgradeFinalization(CdpSocket socket, ref EndianReader reader)
Type = MessageType.Connect
};

_session.SendMessage(socket, header, (writer) =>
EndianWriter writer = new(Endianness.BigEndian);
new ConnectionHeader()
{
new ConnectionHeader()
{
ConnectionMode = ConnectionMode.Proximal,
MessageType = ConnectionType.UpgradeFinalizationResponse
}.Write(writer);
});
ConnectionMode = ConnectionMode.Proximal,
MessageType = ConnectionType.UpgradeFinalizationResponse
}.Write(writer);

_session.SendMessage(socket, header, writer);
}
#endregion

Expand Down Expand Up @@ -223,23 +224,23 @@ void SendUpgradeRequest(CdpSocket socket, Guid upgradeId)
Type = MessageType.Connect
};

_session.SendMessage(socket, header, writer =>
EndianWriter writer = new(Endianness.BigEndian);
new ConnectionHeader()
{
new ConnectionHeader()
{
ConnectionMode = ConnectionMode.Proximal,
MessageType = ConnectionType.UpgradeRequest
}.Write(writer);
ConnectionMode = ConnectionMode.Proximal,
MessageType = ConnectionType.UpgradeRequest
}.Write(writer);

new UpgradeRequest()
{
UpgradeId = upgradeId,
Endpoints =
[
EndpointMetadata.Tcp
]
}.Write(writer);
});
new UpgradeRequest()
{
UpgradeId = upgradeId,
Endpoints =
[
EndpointMetadata.Tcp
]
}.Write(writer);

_session.SendMessage(socket, header, writer);
}
}

Expand Down Expand Up @@ -274,11 +275,10 @@ async void FindNewEndpoint()
return;
}

_session.SendMessage(oldSocket, new()
{
Type = MessageType.Connect,
}, writer =>
SendUpgradFinalization();
void SendUpgradFinalization()
{
EndianWriter writer = new(Endianness.BigEndian);
new ConnectionHeader()
{
ConnectionMode = ConnectionMode.Proximal,
Expand All @@ -288,7 +288,12 @@ async void FindNewEndpoint()
[
EndpointMetadata.Tcp
]);
});

_session.SendMessage(oldSocket, new()
{
Type = MessageType.Connect,
}, writer);
}

// Cancel after timeout if upgrade has not finished yet
await Task.Delay(UpgradeInstance.Timeout);
Expand All @@ -310,21 +315,21 @@ void HandleUpgradeFinalizationResponse()
_allowedAddresses.Add(_currentUpgrade.NewSocket.Endpoint.Address);

// Request transport permission for new socket
EndianWriter writer = new(Endianness.BigEndian);
new ConnectionHeader()
{
ConnectionMode = ConnectionMode.Proximal,
MessageType = ConnectionType.TransportRequest
}.Write(writer);
new UpgradeIdPayload()
{
UpgradeId = _currentUpgrade.Id
}.Write(writer);

_session.SendMessage(_currentUpgrade.NewSocket, new()
{
Type = MessageType.Connect,
}, writer =>
{
new ConnectionHeader()
{
ConnectionMode = ConnectionMode.Proximal,
MessageType = ConnectionType.TransportRequest
}.Write(writer);
new UpgradeIdPayload()
{
UpgradeId = _currentUpgrade.Id
}.Write(writer);
});
}, writer);
}

void HandleTransportConfirmation(CdpSocket socket, ref EndianReader reader)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using ShortDev.Microsoft.ConnectedDevices.Messages.Session;
using ShortDev.Microsoft.ConnectedDevices.Serialization;
using ShortDev.Microsoft.ConnectedDevices.Transports;

namespace ShortDev.Microsoft.ConnectedDevices.Messages;

Expand All @@ -10,7 +11,7 @@ public sealed class CdpMessage
public CdpMessage(CommonHeader header)
{
Header = header;
_buffer = new(header.FragmentCount * Constants.DefaultMessageFragmentSize);
_buffer = new(header.FragmentCount * MessageFragmenter.DefaultMessageFragmentSize);
}

public CdpMessage(CommonHeader header, byte[] payload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@
/// <summary>
/// Provides direct low-level access to inter-device communication.
/// </summary>
public sealed class CdpSocket : IDisposable
public sealed class CdpSocket : IFragmentSender, IDisposable
{
public CdpTransportType TransportType => Endpoint.TransportType;
public required EndpointInfo Endpoint { get; init; }
public required Stream InputStream { get; init; }
public required Stream OutputStream { get; init; }

public void SendData(EndianWriter writer)
public void SendFragment(ReadOnlySpan<byte> fragment)
{
lock (OutputStream)
writer.CopyTo(OutputStream);
{
OutputStream.Write(fragment);
OutputStream.Flush();
}
}

public bool IsClosed { get; private set; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using ShortDev.Microsoft.ConnectedDevices.Encryption;
using ShortDev.Microsoft.ConnectedDevices.Messages;
using System.Diagnostics;

namespace ShortDev.Microsoft.ConnectedDevices.Transports;
public static class MessageFragmenter
{
public const int DefaultMessageFragmentSize = 16384;

public static void SendMessage(this IFragmentSender sender, CommonHeader header, ReadOnlySpan<byte> payload, CdpCryptor? cryptor = null)
{
if (payload.Length <= DefaultMessageFragmentSize)
{
sender.SendFragment(header, payload, cryptor);
return;
}

header.FragmentCount = (ushort)(payload.Length / DefaultMessageFragmentSize);

var leftover = payload.Length % DefaultMessageFragmentSize;
if (leftover != 0)
header.FragmentCount++;

for (ushort fragmentIndex = 0; fragmentIndex < header.FragmentCount; fragmentIndex++)
{
int start = fragmentIndex * DefaultMessageFragmentSize;
int length = Math.Min(payload.Length - start, DefaultMessageFragmentSize);

header.FragmentIndex = fragmentIndex;
sender.SendFragment(header, payload.Slice(start, length), cryptor);
}
}

static void SendFragment(this IFragmentSender sender, CommonHeader header, ReadOnlySpan<byte> payload, CdpCryptor? cryptor)
{
Debug.Assert(payload.Length <= DefaultMessageFragmentSize);

EndianWriter writer = new(Endianness.BigEndian);
if (cryptor != null)
{
cryptor.EncryptMessage(writer, header, payload);
}
else
{
header.SetPayloadLength(payload.Length);
header.Write(writer);
writer.Write(payload);
}

sender.SendFragment(writer.Buffer.AsSpan());
}
}

public interface IFragmentSender
{
/// <summary>
/// Sends a binary fragment.
/// </summary>
void SendFragment(ReadOnlySpan<byte> fragment);
}
Loading

0 comments on commit 7e22d60

Please sign in to comment.