diff --git a/src/Nethermind/Directory.Packages.props b/src/Nethermind/Directory.Packages.props index 8e45df73b9f..c5ea08ce0dd 100644 --- a/src/Nethermind/Directory.Packages.props +++ b/src/Nethermind/Directory.Packages.props @@ -44,6 +44,7 @@ + diff --git a/src/Nethermind/Nethermind.Core/Nethermind.Core.csproj b/src/Nethermind/Nethermind.Core/Nethermind.Core.csproj index 9c993255683..6ddeabf9513 100644 --- a/src/Nethermind/Nethermind.Core/Nethermind.Core.csproj +++ b/src/Nethermind/Nethermind.Core/Nethermind.Core.csproj @@ -11,6 +11,7 @@ + diff --git a/src/Nethermind/Nethermind.Core/Resettables/RecyclableStream.cs b/src/Nethermind/Nethermind.Core/Resettables/RecyclableStream.cs new file mode 100644 index 00000000000..4fff709ea63 --- /dev/null +++ b/src/Nethermind/Nethermind.Core/Resettables/RecyclableStream.cs @@ -0,0 +1,12 @@ +// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using Microsoft.IO; + +namespace Nethermind.Core.Resettables; + +public class RecyclableStream +{ + private static readonly RecyclableMemoryStreamManager _manager = new RecyclableMemoryStreamManager(); + public static RecyclableMemoryStream GetStream(string tag) => _manager.GetStream(tag); +} diff --git a/src/Nethermind/Nethermind.JsonRpc.TraceStore/ParityLikeTraceSerializer.cs b/src/Nethermind/Nethermind.JsonRpc.TraceStore/ParityLikeTraceSerializer.cs index 77f22416f83..9cc6f03a2d1 100644 --- a/src/Nethermind/Nethermind.JsonRpc.TraceStore/ParityLikeTraceSerializer.cs +++ b/src/Nethermind/Nethermind.JsonRpc.TraceStore/ParityLikeTraceSerializer.cs @@ -2,6 +2,8 @@ // SPDX-License-Identifier: LGPL-3.0-only using System.IO.Compression; + +using Nethermind.Core.Resettables; using Nethermind.Evm.Tracing.ParityStyle; using Nethermind.Logging; using Nethermind.Serialization.Json; @@ -49,8 +51,8 @@ public byte[] Serialize(IReadOnlyCollection traces) CheckDepth(traces); - using MemoryStream output = new(); - using (GZipStream compressionStream = new(output, CompressionMode.Compress)) + using MemoryStream output = RecyclableStream.GetStream("Parity"); + using (GZipStream compressionStream = new(output, CompressionMode.Compress, leaveOpen: true)) { _jsonSerializer.Serialize(compressionStream, traces); } diff --git a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs index c9faf4d8833..7739a1006b8 100644 --- a/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs +++ b/src/Nethermind/Nethermind.JsonRpc/JsonRpcProcessor.cs @@ -15,6 +15,7 @@ using Microsoft.AspNetCore.Http; using Nethermind.Core.Collections; using Nethermind.Core.Extensions; +using Nethermind.Core.Resettables; using Nethermind.Logging; using Nethermind.Serialization.Json; @@ -116,161 +117,169 @@ private ArrayPoolList DeserializeArray(JsonElement element) => public async IAsyncEnumerable ProcessAsync(PipeReader reader, JsonRpcContext context) { - reader = await RecordRequest(reader); + Stream? stream = null; + (reader, stream) = await RecordRequest(reader); Stopwatch stopwatch = Stopwatch.StartNew(); - - // Initializes a buffer to store the data read from the reader. - ReadOnlySequence buffer = default; try { - // Continuously read data from the PipeReader in a loop. - // Can read multiple requests, ends when there is no more requests to read or there is an error in deserialization. - while (true) + // Initializes a buffer to store the data read from the reader. + ReadOnlySequence buffer = default; + try { - // Asynchronously reads data from the PipeReader. - ReadResult readResult = await reader.ReadAsync(); - buffer = readResult.Buffer; - // Placeholder for a result in case of deserialization failure. - JsonRpcResult? deserializationFailureResult = null; - - // Processes the buffer while it's not empty; before going out to outer loop to get more data. - while (!buffer.IsEmpty) + // Continuously read data from the PipeReader in a loop. + // Can read multiple requests, ends when there is no more requests to read or there is an error in deserialization. + while (true) { - JsonDocument? jsonDocument = null; - JsonRpcRequest? model = null; - ArrayPoolList? collection = null; - try + // Asynchronously reads data from the PipeReader. + ReadResult readResult = await reader.ReadAsync(); + buffer = readResult.Buffer; + // Placeholder for a result in case of deserialization failure. + JsonRpcResult? deserializationFailureResult = null; + + // Processes the buffer while it's not empty; before going out to outer loop to get more data. + while (!buffer.IsEmpty) { - // Tries to parse the JSON from the buffer. - if (!TryParseJson(ref buffer, out jsonDocument)) + JsonDocument? jsonDocument = null; + JsonRpcRequest? model = null; + ArrayPoolList? collection = null; + try { - // More data needs to be read to complete a document - break; - } - - // Deserializes the JSON document into a request object or a collection of requests. - (model, collection) = DeserializeObjectOrArray(jsonDocument); - } - catch (BadHttpRequestException e) - { - // Increments failure metric and logs the exception, then stops processing. - Metrics.JsonRpcRequestDeserializationFailures++; - if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}"); - yield break; - } - catch (Exception ex) - { - // Handles general exceptions during parsing and validation. - // Sends an error response and stops the stopwatch. - Metrics.JsonRpcRequestDeserializationFailures++; - if (_logger.IsError) _logger.Error($"Error during parsing/validation.", ex); - JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message"); - TraceResult(response); - stopwatch.Stop(); - deserializationFailureResult = JsonRpcResult.Single( - RecordResponse(response, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false))); - } + // Tries to parse the JSON from the buffer. + if (!TryParseJson(ref buffer, out jsonDocument)) + { + // More data needs to be read to complete a document + break; + } - // Checks for deserialization failure and yields the result. - if (deserializationFailureResult.HasValue) - { - yield return deserializationFailureResult.Value; - break; - } - else - { - // Handles a single JSON RPC request. - if (model is not null) + // Deserializes the JSON document into a request object or a collection of requests. + (model, collection) = DeserializeObjectOrArray(jsonDocument); + } + catch (BadHttpRequestException e) { - if (_logger.IsDebug) _logger.Debug($"JSON RPC request {model}"); - - // Processes the individual request. - JsonRpcResult.Entry result = await HandleSingleRequest(model, context); - result.Response.AddDisposable(() => jsonDocument.Dispose()); - - // Returns the result of the processed request. - yield return JsonRpcResult.Single(RecordResponse(result)); + // Increments failure metric and logs the exception, then stops processing. + Metrics.JsonRpcRequestDeserializationFailures++; + if (_logger.IsDebug) _logger.Debug($"Couldn't read request.{Environment.NewLine}{e}"); + yield break; + } + catch (Exception ex) + { + // Handles general exceptions during parsing and validation. + // Sends an error response and stops the stopwatch. + Metrics.JsonRpcRequestDeserializationFailures++; + if (_logger.IsError) _logger.Error($"Error during parsing/validation.", ex); + JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message"); + TraceResult(response); + stopwatch.Stop(); + deserializationFailureResult = JsonRpcResult.Single( + RecordResponse(response, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false))); } - // Processes a collection of JSON RPC requests. - if (collection is not null) + // Checks for deserialization failure and yields the result. + if (deserializationFailureResult.HasValue) { - if (_logger.IsDebug) _logger.Debug($"{collection.Count} JSON RPC requests"); + yield return deserializationFailureResult.Value; + break; + } + else + { + // Handles a single JSON RPC request. + if (model is not null) + { + if (_logger.IsDebug) _logger.Debug($"JSON RPC request {model}"); + + // Processes the individual request. + JsonRpcResult.Entry result = await HandleSingleRequest(model, context); + result.Response.AddDisposable(() => jsonDocument.Dispose()); - // Checks for authentication and batch size limit. - if (!context.IsAuthenticated && collection.Count > _jsonRpcConfig.MaxBatchSize) + // Returns the result of the processed request. + yield return JsonRpcResult.Single(RecordResponse(result)); + } + + // Processes a collection of JSON RPC requests. + if (collection is not null) { - if (_logger.IsWarn) _logger.Warn($"The batch size limit was exceeded. The requested batch size {collection.Count}, and the current config setting is JsonRpc.{nameof(_jsonRpcConfig.MaxBatchSize)} = {_jsonRpcConfig.MaxBatchSize}."); - JsonRpcErrorResponse? response = _jsonRpcService.GetErrorResponse(ErrorCodes.LimitExceeded, "Batch size limit exceeded"); - response.AddDisposable(() => jsonDocument.Dispose()); + if (_logger.IsDebug) _logger.Debug($"{collection.Count} JSON RPC requests"); + + // Checks for authentication and batch size limit. + if (!context.IsAuthenticated && collection.Count > _jsonRpcConfig.MaxBatchSize) + { + if (_logger.IsWarn) _logger.Warn($"The batch size limit was exceeded. The requested batch size {collection.Count}, and the current config setting is JsonRpc.{nameof(_jsonRpcConfig.MaxBatchSize)} = {_jsonRpcConfig.MaxBatchSize}."); + JsonRpcErrorResponse? response = _jsonRpcService.GetErrorResponse(ErrorCodes.LimitExceeded, "Batch size limit exceeded"); + response.AddDisposable(() => jsonDocument.Dispose()); + + deserializationFailureResult = JsonRpcResult.Single(RecordResponse(response, RpcReport.Error)); + yield return deserializationFailureResult.Value; + break; + } + + // Stops the stopwatch and yields the batch processing result. + stopwatch.Stop(); + yield return JsonRpcResult.Collection(new JsonRpcBatchResult((e, c) => IterateRequest(collection, context, e).GetAsyncEnumerator(c))); + } - deserializationFailureResult = JsonRpcResult.Single(RecordResponse(response, RpcReport.Error)); + // Handles invalid requests. + if (model is null && collection is null) + { + Metrics.JsonRpcInvalidRequests++; + JsonRpcErrorResponse errorResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Invalid request"); + errorResponse.AddDisposable(() => jsonDocument.Dispose()); + + TraceResult(errorResponse); + stopwatch.Stop(); + if (_logger.IsDebug) _logger.Debug($" Failed request handled in {stopwatch.Elapsed.TotalMilliseconds}ms"); + deserializationFailureResult = JsonRpcResult.Single(RecordResponse(errorResponse, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false))); yield return deserializationFailureResult.Value; break; } - - // Stops the stopwatch and yields the batch processing result. - stopwatch.Stop(); - yield return JsonRpcResult.Collection(new JsonRpcBatchResult((e, c) => IterateRequest(collection, context, e).GetAsyncEnumerator(c))); } + } - // Handles invalid requests. - if (model is null && collection is null) - { - Metrics.JsonRpcInvalidRequests++; - JsonRpcErrorResponse errorResponse = _jsonRpcService.GetErrorResponse(ErrorCodes.InvalidRequest, "Invalid request"); - errorResponse.AddDisposable(() => jsonDocument.Dispose()); + // Checks if the deserialization failed + if (deserializationFailureResult.HasValue) + { + break; + } - TraceResult(errorResponse); + // Checks if the read operation is completed. + if (readResult.IsCompleted) + { + if (buffer.Length > 0 && (buffer.IsSingleSegment ? buffer.FirstSpan : buffer.ToArray()).IndexOfAnyExcept(WhiteSpace()) >= 0) + { + Metrics.JsonRpcRequestDeserializationFailures++; + if (_logger.IsError) _logger.Error($"Error during parsing/validation. Incomplete request"); + JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message"); + TraceResult(response); stopwatch.Stop(); - if (_logger.IsDebug) _logger.Debug($" Failed request handled in {stopwatch.Elapsed.TotalMilliseconds}ms"); - deserializationFailureResult = JsonRpcResult.Single(RecordResponse(errorResponse, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false))); + deserializationFailureResult = JsonRpcResult.Single( + RecordResponse(response, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false))); yield return deserializationFailureResult.Value; - break; } + + break; } - } - // Checks if the deserialization failed - if (deserializationFailureResult.HasValue) - { - break; + // Advances the reader to the next segment of the buffer. + reader.AdvanceTo(buffer.Start, buffer.End); + buffer = default; } - - // Checks if the read operation is completed. - if (readResult.IsCompleted) + } + finally + { + // Advances the reader to the end of the buffer if not null. + if (!buffer.FirstSpan.IsNull()) { - if (buffer.Length > 0 && (buffer.IsSingleSegment ? buffer.FirstSpan : buffer.ToArray()).IndexOfAnyExcept(WhiteSpace()) >= 0) - { - Metrics.JsonRpcRequestDeserializationFailures++; - if (_logger.IsError) _logger.Error($"Error during parsing/validation. Incomplete request"); - JsonRpcErrorResponse response = _jsonRpcService.GetErrorResponse(ErrorCodes.ParseError, "Incorrect message"); - TraceResult(response); - stopwatch.Stop(); - deserializationFailureResult = JsonRpcResult.Single( - RecordResponse(response, new RpcReport("# parsing error #", stopwatch.ElapsedMicroseconds(), false))); - yield return deserializationFailureResult.Value; - } - - break; + reader.AdvanceTo(buffer.End); } - // Advances the reader to the next segment of the buffer. - reader.AdvanceTo(buffer.Start, buffer.End); - buffer = default; } + + // Completes the PipeReader's asynchronous reading operation. + await reader.CompleteAsync(); } finally { - // Advances the reader to the end of the buffer if not null. - if (!buffer.FirstSpan.IsNull()) - { - reader.AdvanceTo(buffer.End); - } + stream?.Dispose(); } - - // Completes the PipeReader's asynchronous reading operation. - await reader.CompleteAsync(); } private static ReadOnlySpan WhiteSpace() => " \n\r\t"u8; @@ -365,11 +374,11 @@ private JsonRpcResult.Entry RecordResponse(JsonRpcResult.Entry result) return result; } - private async ValueTask RecordRequest(PipeReader reader) + private async ValueTask<(PipeReader, Stream?)> RecordRequest(PipeReader reader) { if ((_jsonRpcConfig.RpcRecorderState & RpcRecorderState.Request) != 0) { - Stream memoryStream = new MemoryStream(); + Stream memoryStream = RecyclableStream.GetStream("recorder"); await reader.CopyToAsync(memoryStream); memoryStream.Seek(0, SeekOrigin.Begin); @@ -379,10 +388,10 @@ private async ValueTask RecordRequest(PipeReader reader) _recorder.RecordRequest(requestString); memoryStream.Seek(0, SeekOrigin.Begin); - return PipeReader.Create(memoryStream); + return (PipeReader.Create(memoryStream), memoryStream); } - return reader; + return (reader, null); } private void TraceResult(JsonRpcResult.Entry response) diff --git a/src/Nethermind/Nethermind.KeyStore/AesEncrypter.cs b/src/Nethermind/Nethermind.KeyStore/AesEncrypter.cs index 40984d11355..f0a0147641f 100644 --- a/src/Nethermind/Nethermind.KeyStore/AesEncrypter.cs +++ b/src/Nethermind/Nethermind.KeyStore/AesEncrypter.cs @@ -5,6 +5,8 @@ using System.Collections.Generic; using System.IO; using System.Security.Cryptography; + +using Nethermind.Core.Resettables; using Nethermind.KeyStore.Config; using Nethermind.Logging; @@ -41,7 +43,7 @@ public byte[] Encrypt(byte[] content, byte[] key, byte[] iv, string cipherType) } case "aes-128-ctr": { - using var outputEncryptedStream = new MemoryStream(); + using var outputEncryptedStream = RecyclableStream.GetStream("aes-128-ctr-encrypt"); using var inputStream = new MemoryStream(content); AesCtr(key, iv, inputStream, outputEncryptedStream); outputEncryptedStream.Position = 0; @@ -78,7 +80,7 @@ public byte[] Decrypt(byte[] cipher, byte[] key, byte[] iv, string cipherType) case "aes-128-ctr": { using var outputEncryptedStream = new MemoryStream(cipher); - using var outputDecryptedStream = new MemoryStream(); + using var outputDecryptedStream = RecyclableStream.GetStream("aes-128-ctr-decrypt"); AesCtr(key, iv, outputEncryptedStream, outputDecryptedStream); outputDecryptedStream.Position = 0; return outputDecryptedStream.ToArray(); @@ -96,9 +98,9 @@ public byte[] Decrypt(byte[] cipher, byte[] key, byte[] iv, string cipherType) private byte[] Execute(ICryptoTransform cryptoTransform, byte[] data) { - using (var memoryStream = new MemoryStream()) + using (var memoryStream = RecyclableStream.GetStream(nameof(AesEncrypter))) { - using (var cryptoStream = new CryptoStream(memoryStream, cryptoTransform, CryptoStreamMode.Write)) + using (var cryptoStream = new CryptoStream(memoryStream, cryptoTransform, CryptoStreamMode.Write, leaveOpen: true)) { cryptoStream.Write(data, 0, data.Length); cryptoStream.FlushFinalBlock(); diff --git a/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs b/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs index 7113c723299..41ea95377d1 100644 --- a/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs +++ b/src/Nethermind/Nethermind.Runner/JsonRpc/Startup.cs @@ -26,6 +26,7 @@ using Nethermind.Config; using Nethermind.Core.Authentication; using Nethermind.Core.Extensions; +using Nethermind.Core.Resettables; using Nethermind.HealthChecks; using Nethermind.JsonRpc; using Nethermind.JsonRpc.Modules; @@ -167,7 +168,7 @@ void SerializeTimeoutException(IJsonRpcService service, IBufferWriter resu JsonRpcContext jsonRpcContext = JsonRpcContext.Http(jsonRpcUrl); await foreach (JsonRpcResult result in jsonRpcProcessor.ProcessAsync(request, jsonRpcContext)) { - Stream stream = jsonRpcConfig.BufferResponses ? new MemoryStream() : null; + using Stream stream = jsonRpcConfig.BufferResponses ? RecyclableStream.GetStream("http") : null; ICountingBufferWriter resultWriter = stream is not null ? new CountingStreamPipeWriter(stream) : new CountingPipeWriter(ctx.Response.BodyWriter); try {