From 9024e2b3ea2b55f48ffbf23b68f5e4d90efb0393 Mon Sep 17 00:00:00 2001 From: Sam Rawlins Date: Mon, 10 Jul 2023 22:40:59 +0000 Subject: [PATCH] [stable] [vm] Fix leakage of file system watcher related data structures Closes https://github.com/dart-lang/sdk/issues/52703 TEST=runtime/tests/vm/dart/regress_52703_test.dart Cherry-pick: https://dart-review.googlesource.com/c/sdk/+/309862 Bug: https://github.com/dart-lang/sdk/issues/52703 Change-Id: Idd4b153a9c7b19995f6c6c8bda99b878a21425fb Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/311521 Reviewed-by: Lasse Nielsen Reviewed-by: Jens Johansen Reviewed-by: Martin Kustermann Commit-Queue: Samuel Rawlins --- CHANGELOG.md | 2 + runtime/tests/vm/dart/regress_52703_test.dart | 60 +++++++++++++++++++ sdk/lib/_internal/vm/bin/file_patch.dart | 26 +++++--- 3 files changed, 80 insertions(+), 8 deletions(-) create mode 100644 runtime/tests/vm/dart/regress_52703_test.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index 8039136ac792..6b414a98a942 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,9 +5,11 @@ This is a patch release that: - Fixes a flow in flow analysis that causes it to sometimes ignore destructuring assignments (issue [#52767]). - Fixes a memory leak in Dart analyzer's file-watching (issue [#52791]). +- Fixes a memory leak of file system watcher related data structures (issue [#52793]). [#52767]: https://github.com/dart-lang/sdk/issues/52767 [#52791]: https://github.com/dart-lang/sdk/issues/52791 +[#52793]: https://github.com/dart-lang/sdk/issues/52793 ## 3.0.5 - 2023-06-14 diff --git a/runtime/tests/vm/dart/regress_52703_test.dart b/runtime/tests/vm/dart/regress_52703_test.dart new file mode 100644 index 000000000000..aabc3af8752f --- /dev/null +++ b/runtime/tests/vm/dart/regress_52703_test.dart @@ -0,0 +1,60 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:developer'; +import 'dart:io'; + +import 'package:expect/expect.dart'; +import 'package:path/path.dart' as path; +import 'package:vm_service/vm_service.dart'; + +import 'use_flag_test_helper.dart'; +import '../../../tools/heapsnapshot/lib/src/analysis.dart'; + +void main() async { + if (buildDir.contains('Product')) return; + + final dir = Directory('sdk'); + + StreamSubscription? current; + Future iterate() async { + final temp = dir.watch().listen((event) {}); + await current?.cancel(); + current = dir.watch().listen((event) {}); + await temp.cancel(); + } + + Future finish() async { + await current?.cancel(); + } + + await withTempDir('ama-test', (String tempDir) async { + await iterate(); + + final before = countInstances(tempDir, '_BroadcastSubscription'); + print('Before = $before'); + for (int i = 0; i < 100; i++) { + await iterate(); + } + final after = countInstances(tempDir, '_BroadcastSubscription'); + print('After = $after'); + + await finish(); + + Expect.equals(0, (after - before)); + }); +} + +int countInstances(String tempDir, String className) { + final snapshotFile = path.join(tempDir, 'foo.heapsnapshot'); + NativeRuntime.writeHeapSnapshotToFile(snapshotFile); + final bytes = File(snapshotFile).readAsBytesSync(); + File(snapshotFile).deleteSync(); + final graph = HeapSnapshotGraph.fromChunks( + [bytes.buffer.asByteData(bytes.offsetInBytes, bytes.length)]); + final analysis = Analysis(graph); + return analysis + .filterByClassPatterns(analysis.reachableObjects, [className]).length; +} diff --git a/sdk/lib/_internal/vm/bin/file_patch.dart b/sdk/lib/_internal/vm/bin/file_patch.dart index d60b8f95da11..1c2b653bd063 100644 --- a/sdk/lib/_internal/vm/bin/file_patch.dart +++ b/sdk/lib/_internal/vm/bin/file_patch.dart @@ -137,6 +137,12 @@ abstract class _FileSystemWatcher { final StreamController _broadcastController = new StreamController.broadcast(); + /// Subscription on the stream returned by [_watchPath]. + /// + /// Stored while piping events from that stream into [_broadcastController], + /// so it can be cancelled when [_broadcastController] is cancelled. + StreamSubscription? _sourceSubscription; + @patch static Stream _watch( String path, int events, bool recursive) { @@ -193,7 +199,9 @@ abstract class _FileSystemWatcher { } _watcherPath = _idMap[pathId]; _watcherPath!.count++; - _pathWatched().pipe(_broadcastController); + _sourceSubscription = _pathWatched().listen(_broadcastController.add, + onError: _broadcastController.addError, + onDone: _broadcastController.close); } void _cancel() { @@ -222,6 +230,8 @@ abstract class _FileSystemWatcher { _doneWatcher(); _id = null; } + _sourceSubscription?.cancel(); + _sourceSubscription = null; } // Called when (and after) a new watcher instance is created and available. @@ -229,7 +239,7 @@ abstract class _FileSystemWatcher { // Called when a watcher is no longer needed. void _doneWatcher() {} // Called when a new path is being watched. - Stream _pathWatched(); + Stream _pathWatched(); // Called when a path is no longer being watched. void _donePathWatched() {} @@ -387,7 +397,7 @@ abstract class _FileSystemWatcher { } class _InotifyFileSystemWatcher extends _FileSystemWatcher { - static final Map _idMap = {}; + static final Map> _idMap = {}; static late StreamSubscription _subscription; _InotifyFileSystemWatcher(path, events, recursive) @@ -411,7 +421,7 @@ class _InotifyFileSystemWatcher extends _FileSystemWatcher { _subscription.cancel(); } - Stream _pathWatched() { + Stream _pathWatched() { var pathId = _watcherPath!.pathId; if (!_idMap.containsKey(pathId)) { _idMap[pathId] = new StreamController.broadcast(); @@ -429,12 +439,12 @@ class _InotifyFileSystemWatcher extends _FileSystemWatcher { class _Win32FileSystemWatcher extends _FileSystemWatcher { late StreamSubscription _subscription; - late StreamController _controller; + late StreamController _controller; _Win32FileSystemWatcher(path, events, recursive) : super._(path, events, recursive); - Stream _pathWatched() { + Stream _pathWatched() { var pathId = _watcherPath!.pathId; _controller = new StreamController(); _subscription = @@ -457,12 +467,12 @@ class _Win32FileSystemWatcher extends _FileSystemWatcher { class _FSEventStreamFileSystemWatcher extends _FileSystemWatcher { late StreamSubscription _subscription; - late StreamController _controller; + late StreamController _controller; _FSEventStreamFileSystemWatcher(path, events, recursive) : super._(path, events, recursive); - Stream _pathWatched() { + Stream _pathWatched() { var pathId = _watcherPath!.pathId; var socketId = _FileSystemWatcher._getSocketId(0, pathId); _controller = new StreamController();