Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuration/Connections: Allow HTTP tunneling #2274

Merged
merged 18 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ The `ConfigurationOptions` object has a wide range of properties, all of which a
| asyncTimeout={int} | `AsyncTimeout` | `SyncTimeout` | Time (ms) to allow for asynchronous operations |
| tiebreaker={string} | `TieBreaker` | `__Booksleeve_TieBreak` | Key to use for selecting a server in an ambiguous primary scenario |
| version={string} | `DefaultVersion` | (`4.0` in Azure, else `2.0`) | Redis version level (useful when the server does not make this available) |

| tunnel={string} | `Tunnel` | `null` | Tunnel for connections (use `http:{proxy url}` for "connect"-based proxy server)

Additional code-only options:
- ReconnectRetryPolicy (`IReconnectRetryPolicy`) - Default: `ReconnectRetryPolicy = ExponentialRetry(ConnectTimeout / 2);`
Expand Down
1 change: 1 addition & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Current package versions:
## Unreleased

- Adds: `last-in` and `cur-in` (bytes) to timeout exceptions to help identify timeouts that were just-behind another large payload off the wire ([#2276 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2276))
- Adds: general-purpose tunnel support, with HTTP proxy "connect" support included ([#2274 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2274))

## 2.6.70

Expand Down
114 changes: 114 additions & 0 deletions src/StackExchange.Redis/Configuration/Tunnel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using System;
using System.Buffers;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Pipelines.Sockets.Unofficial;

namespace StackExchange.Redis.Configuration
{
/// <summary>
/// Allows interception of the transport used to communicate with Redis.
/// </summary>
public abstract class Tunnel
{
/// <summary>
/// Gets the underlying socket endpoint to use when connecting to a logical endpoint.
/// </summary>
/// <remarks><c>null</c> should be returned if a socket is not required for this endpoint.</remarks>
public virtual ValueTask<EndPoint?> GetSocketConnectEndpointAsync(EndPoint endpoint, CancellationToken cancellationToken) => new(endpoint);

/// <summary>
/// Allows modification of a <see cref="Socket"/> between creation and connection.
/// Passed in is the endpoint we're connecting to, which type of connection it is, and the socket itself.
/// For example, a specific local IP endpoint could be bound, linger time altered, etc.
/// </summary>
public virtual ValueTask BeforeSocketConnectAsync(EndPoint endPoint, ConnectionType connectionType, Socket? socket, CancellationToken cancellationToken) => default;

/// <summary>
/// Invoked on a connected endpoint before server authentication and other handshakes occur, allowing pre-redis handshakes. By returning a custom <see cref="Stream"/>,
/// the entire data flow can be intercepted, providing entire custom transports.
/// </summary>
public virtual ValueTask<Stream?> BeforeAuthenticateAsync(EndPoint endpoint, ConnectionType connectionType, Socket? socket, CancellationToken cancellationToken) => default;
/// <inheritdoc/>
public abstract override string ToString();

private sealed class HttpProxyTunnel : Tunnel
{
public EndPoint Proxy { get; }
public HttpProxyTunnel(EndPoint proxy) => Proxy = proxy ?? throw new ArgumentNullException(nameof(proxy));

public override ValueTask<EndPoint?> GetSocketConnectEndpointAsync(EndPoint endpoint, CancellationToken cancellationToken) => new(Proxy);

public override async ValueTask<Stream?> BeforeAuthenticateAsync(EndPoint endpoint, ConnectionType connectionType, Socket? socket, CancellationToken cancellationToken)
{
if (socket is not null)
{
var encoding = Encoding.ASCII;
var ep = Format.ToString(endpoint);
const string Prefix = "CONNECT ", Suffix = " HTTP/1.1\r\n\r\n", ExpectedResponse = "HTTP/1.1 200 OK\r\n\r\n";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgravell According to https://datatracker.ietf.org/doc/html/draft-luotonen-web-proxy-tunneling-01#section-3.2, the expected response should be HTTP/1.1 200 Connection established instead of HTTP/1.1 200 OK. Currently it's not possible to use a HTTP proxy that correctly implements this behavior (e.g. Squid).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a PR: #2448

byte[] chunk = ArrayPool<byte>.Shared.Rent(Math.Max(
encoding.GetByteCount(Prefix) + encoding.GetByteCount(ep) + encoding.GetByteCount(Suffix),
encoding.GetByteCount(ExpectedResponse)
));
var offset = 0;
offset += encoding.GetBytes(Prefix, 0, Prefix.Length, chunk, offset);
offset += encoding.GetBytes(ep, 0, ep.Length, chunk, offset);
offset += encoding.GetBytes(Suffix, 0, Suffix.Length, chunk, offset);

static void SafeAbort(object? obj)
{
try
{
(obj as SocketAwaitableEventArgs)?.Abort(SocketError.TimedOut);
}
catch { } // best effort only
}

using (var args = new SocketAwaitableEventArgs())
using (cancellationToken.Register(static s => SafeAbort(s), args))
{
args.SetBuffer(chunk, 0, offset);
if (!socket.SendAsync(args)) args.Complete();
await args;

// we expect to see: "HTTP/1.1 200 OK\n"; note our buffer is definitely big enough already
int toRead = encoding.GetByteCount(ExpectedResponse), read;
offset = 0;

while (toRead > 0)
{
args.SetBuffer(chunk, offset, toRead);
if (!socket.ReceiveAsync(args)) args.Complete();
read = await args;

if (read <= 0) break; // EOF (since we're never doing zero-length reads)
toRead -= read;
offset += read;
}
if (toRead != 0) throw new EndOfStreamException("EOF negotiating HTTP tunnel");
// lazy
var actualResponse = encoding.GetString(chunk, 0, offset);
if (ExpectedResponse != actualResponse)
{
throw new InvalidOperationException("Unexpected response negotiating HTTP tunnel");
}
ArrayPool<byte>.Shared.Return(chunk);
}
}
return default; // no need for custom stream wrapper here
}

public override string ToString() => "http:" + Format.ToString(Proxy);
}

/// <summary>
/// Create a tunnel via an HTTP proxy server.
/// </summary>
/// <param name="proxy">The endpoint to use as an HTTP proxy server.</param>
public static Tunnel HttpProxy(EndPoint proxy) => new HttpProxyTunnel(proxy);
}
}
32 changes: 31 additions & 1 deletion src/StackExchange.Redis/ConfigurationOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ internal const string
TieBreaker = "tiebreaker",
Version = "version",
WriteBuffer = "writeBuffer",
CheckCertificateRevocation = "checkCertificateRevocation";
CheckCertificateRevocation = "checkCertificateRevocation",
Tunnel = "tunnel";

private static readonly Dictionary<string, string> normalizedOptions = new[]
{
Expand Down Expand Up @@ -650,6 +651,7 @@ public static ConfigurationOptions Parse(string configuration, bool ignoreUnknow
#if NETCOREAPP3_1_OR_GREATER
SslClientAuthenticationOptions = SslClientAuthenticationOptions,
#endif
Tunnel = Tunnel,
};

/// <summary>
Expand Down Expand Up @@ -729,6 +731,10 @@ public string ToString(bool includePassword)
Append(sb, OptionKeys.ConfigCheckSeconds, configCheckSeconds);
Append(sb, OptionKeys.ResponseTimeout, responseTimeout);
Append(sb, OptionKeys.DefaultDatabase, DefaultDatabase);
if (Tunnel is Tunnel tunnel)
{
Append(sb, OptionKeys.Tunnel, tunnel.ToString());
}
commandMap?.AppendDeltas(sb);
return sb.ToString();
}
Expand Down Expand Up @@ -877,6 +883,25 @@ private ConfigurationOptions DoParse(string configuration, bool ignoreUnknown)
case OptionKeys.SslProtocols:
SslProtocols = OptionKeys.ParseSslProtocols(key, value);
break;
case OptionKeys.Tunnel:
if (value.IsNullOrWhiteSpace())
{
Tunnel = null;
}
else if (value.StartsWith("http:"))
{
value = value.Substring(5);
if (!Format.TryParseEndPoint(value, out var ep))
{
throw new ArgumentException("HTTP tunnel cannot be parsed: " + value);
}
Tunnel = Tunnel.HttpProxy(ep);
}
else
{
throw new ArgumentException("Tunnel cannot be parsed: " + value);
}
break;
// Deprecated options we ignore...
case OptionKeys.HighPrioritySocketThreads:
case OptionKeys.PreserveAsyncOrder:
Expand Down Expand Up @@ -914,5 +939,10 @@ private ConfigurationOptions DoParse(string configuration, bool ignoreUnknown)
}
return this;
}

/// <summary>
/// Allows custom transport implementations, such as http-tunneling via a proxy.
/// </summary>
public Tunnel? Tunnel { get; set; }
}
}
57 changes: 45 additions & 12 deletions src/StackExchange.Redis/PhysicalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,31 +103,48 @@ internal async Task BeginConnectAsync(LogProxy? log)
}

Trace("Connecting...");
_socket = SocketManager.CreateSocket(endpoint);
bridge.Multiplexer.RawConfig.BeforeSocketConnect?.Invoke(endpoint, bridge.ConnectionType, _socket);
var tunnel = bridge.Multiplexer.RawConfig.Tunnel;
var connectTo = endpoint;
if (tunnel is not null)
{
connectTo = await tunnel.GetSocketConnectEndpointAsync(endpoint, CancellationToken.None).ForAwait();
}
if (connectTo is not null)
{
_socket = SocketManager.CreateSocket(connectTo);
}

if (_socket is not null)
{
bridge.Multiplexer.RawConfig.BeforeSocketConnect?.Invoke(endpoint, bridge.ConnectionType, _socket);
if (tunnel is not null)
{ // same functionality as part of a tunnel
await tunnel.BeforeSocketConnectAsync(endpoint, bridge.ConnectionType, _socket, CancellationToken.None).ForAwait();
}
}
bridge.Multiplexer.OnConnecting(endpoint, bridge.ConnectionType);
log?.WriteLine($"{Format.ToString(endpoint)}: BeginConnectAsync");

CancellationTokenSource? timeoutSource = null;
try
{
using (var args = new SocketAwaitableEventArgs
using (var args = connectTo is null ? null : new SocketAwaitableEventArgs
{
RemoteEndPoint = endpoint,
RemoteEndPoint = connectTo,
})
{
var x = VolatileSocket;
if (x == null)
{
args.Abort();
args?.Abort();
}
else if (x.ConnectAsync(args))
else if (args is not null && x.ConnectAsync(args))
{ // asynchronous operation is pending
timeoutSource = ConfigureTimeout(args, bridge.Multiplexer.RawConfig.ConnectTimeout);
}
else
{ // completed synchronously
args.Complete();
args?.Complete();
}

// Complete connection
Expand All @@ -136,15 +153,18 @@ internal async Task BeginConnectAsync(LogProxy? log)
// If we're told to ignore connect, abort here
if (BridgeCouldBeNull?.Multiplexer?.IgnoreConnect ?? false) return;

await args; // wait for the connect to complete or fail (will throw)
if (args is not null)
{
await args; // wait for the connect to complete or fail (will throw)
}
if (timeoutSource != null)
{
timeoutSource.Cancel();
timeoutSource.Dispose();
}

x = VolatileSocket;
if (x == null)
if (x == null && args is not null)
{
ConnectionMultiplexer.TraceWithoutContext("Socket was already aborted");
}
Expand Down Expand Up @@ -1413,7 +1433,7 @@ public ConnectionStatus GetStatus()
return null;
}

internal async ValueTask<bool> ConnectedAsync(Socket socket, LogProxy? log, SocketManager manager)
internal async ValueTask<bool> ConnectedAsync(Socket? socket, LogProxy? log, SocketManager manager)
{
var bridge = BridgeCouldBeNull;
if (bridge == null) return false;
Expand All @@ -1430,6 +1450,13 @@ internal async ValueTask<bool> ConnectedAsync(Socket socket, LogProxy? log, Sock

var config = bridge.Multiplexer.RawConfig;

var tunnel = config.Tunnel;
Stream? stream = null;
if (tunnel is not null)
{
stream = await tunnel.BeforeAuthenticateAsync(bridge.ServerEndPoint.EndPoint, bridge.ConnectionType, socket, CancellationToken.None).ForAwait();
}

if (config.Ssl)
{
log?.WriteLine("Configuring TLS");
Expand All @@ -1439,7 +1466,8 @@ internal async ValueTask<bool> ConnectedAsync(Socket socket, LogProxy? log, Sock
host = Format.ToStringHostOnly(bridge.ServerEndPoint.EndPoint);
}

var ssl = new SslStream(new NetworkStream(socket), false,
stream ??= new NetworkStream(socket ?? throw new InvalidOperationException("No socket or stream available - possibly a tunnel error"));
var ssl = new SslStream(stream, false,
config.CertificateValidationCallback ?? GetAmbientIssuerCertificateCallback(),
config.CertificateSelectionCallback ?? GetAmbientClientCertificateCallback(),
EncryptionPolicy.RequireEncryption);
Expand Down Expand Up @@ -1475,7 +1503,12 @@ internal async ValueTask<bool> ConnectedAsync(Socket socket, LogProxy? log, Sock
bridge.Multiplexer.Trace("Encryption failure");
return false;
}
pipe = StreamConnection.GetDuplex(ssl, manager.SendPipeOptions, manager.ReceivePipeOptions, name: bridge.Name);
stream = ssl;
}

if (stream is not null)
{
pipe = StreamConnection.GetDuplex(stream, manager.SendPipeOptions, manager.ReceivePipeOptions, name: bridge.Name);
}
else
{
Expand Down
9 changes: 9 additions & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ StackExchange.Redis.Configuration.AzureOptionsProvider.AzureOptionsProvider() ->
StackExchange.Redis.Configuration.DefaultOptionsProvider
StackExchange.Redis.Configuration.DefaultOptionsProvider.ClientName.get -> string!
StackExchange.Redis.Configuration.DefaultOptionsProvider.DefaultOptionsProvider() -> void
StackExchange.Redis.Configuration.Tunnel
StackExchange.Redis.Configuration.Tunnel.Tunnel() -> void
override abstract StackExchange.Redis.Configuration.Tunnel.ToString() -> string!
static StackExchange.Redis.Configuration.Tunnel.HttpProxy(System.Net.EndPoint! proxy) -> StackExchange.Redis.Configuration.Tunnel!
virtual StackExchange.Redis.Configuration.Tunnel.BeforeAuthenticateAsync(System.Net.EndPoint! endpoint, StackExchange.Redis.ConnectionType connectionType, System.Net.Sockets.Socket? socket, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask<System.IO.Stream?>
virtual StackExchange.Redis.Configuration.Tunnel.BeforeSocketConnectAsync(System.Net.EndPoint! endPoint, StackExchange.Redis.ConnectionType connectionType, System.Net.Sockets.Socket? socket, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask
virtual StackExchange.Redis.Configuration.Tunnel.GetSocketConnectEndpointAsync(System.Net.EndPoint! endpoint, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask<System.Net.EndPoint?>
StackExchange.Redis.ConfigurationOptions
StackExchange.Redis.ConfigurationOptions.AbortOnConnectFail.get -> bool
StackExchange.Redis.ConfigurationOptions.AbortOnConnectFail.set -> void
Expand Down Expand Up @@ -260,6 +267,8 @@ StackExchange.Redis.ConfigurationOptions.TieBreaker.set -> void
StackExchange.Redis.ConfigurationOptions.ToString(bool includePassword) -> string!
StackExchange.Redis.ConfigurationOptions.TrustIssuer(string! issuerCertificatePath) -> void
StackExchange.Redis.ConfigurationOptions.TrustIssuer(System.Security.Cryptography.X509Certificates.X509Certificate2! issuer) -> void
StackExchange.Redis.ConfigurationOptions.Tunnel.get -> StackExchange.Redis.Configuration.Tunnel?
StackExchange.Redis.ConfigurationOptions.Tunnel.set -> void
StackExchange.Redis.ConfigurationOptions.User.get -> string?
StackExchange.Redis.ConfigurationOptions.User.set -> void
StackExchange.Redis.ConfigurationOptions.UseSsl.get -> bool
Expand Down
15 changes: 15 additions & 0 deletions tests/StackExchange.Redis.Tests/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -611,4 +611,19 @@ public async Task MutableOptions()
var newPass = options.Password = "newPassword";
Assert.Equal(newPass, conn.RawConfig.Password);
}

[Fact]
public void HttpTunnel()
{
var config = ConfigurationOptions.Parse("127.0.0.1:6380,tunnel=http:somewhere:22");
var ip = Assert.IsType<IPEndPoint>(Assert.Single(config.EndPoints));
Assert.Equal(6380, ip.Port);
Assert.Equal("127.0.0.1", ip.Address.ToString());

Assert.NotNull(config.Tunnel);
Assert.Equal("http:somewhere:22", config.Tunnel.ToString());

var cs = config.ToString();
Assert.Equal("127.0.0.1:6380,tunnel=http:somewhere:22", cs);
}
}
Loading