Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leaking _Closures et al when using overlapping Directory.watch streams #52703

Closed
jensjoha opened this issue Jun 14, 2023 · 6 comments
Closed
Labels
area-sdk Use area-sdk for general purpose SDK issues (packaging, distribution, …). library-io type-bug Incorrect behavior (everything from a crash to more subtle misbehavior)

Comments

@jensjoha
Copy link
Contributor

TL;DR: I can leak as much ram as I want. And the analyzer does.

Consider this example:

import 'dart:async';
import 'dart:developer';
import 'dart:io';

final List<StreamSubscription<FileSystemEvent>> watcherSubscriptions = [];

void main() async {
  print('pause...');
  debugger();
  for (int i = 0; i < 10; i++) {
    await create(Uri.base);
    print('$i');

    print('pause...');
    debugger();
  }
  print('Now created 10 ones...');
}

Future<void> create(Uri uri) async {
  // Create temporary one.
  List<StreamSubscription<FileSystemEvent>> tmpSubscriptions =
      <StreamSubscription<FileSystemEvent>>[];
  Stream<FileSystemEvent> stream = Directory.fromUri(uri).watch();
  tmpSubscriptions.add(stream.listen((event) {}));

  // Cancel previous one(s) if any.
  for (StreamSubscription<FileSystemEvent> prev in watcherSubscriptions) {
    await prev.cancel();
  }
  watcherSubscriptions.clear();

  // Create new one.
  stream = Directory.fromUri(uri).watch();
  watcherSubscriptions.add(stream.listen((event) {}));

  // Cancel temporary one.
  for (StreamSubscription<FileSystemEvent> prev in tmpSubscriptions) {
    await prev.cancel();
  }
}

which could be run for instance (with the file saved as leaking_closures) like this:

out/ReleaseX64/dart --enable-vm-service --serve-observatory leaking_closures.dart

This will start the observatory.

Opening it up I can go to allocation profile, hit the GC button at the top, search for _Closure and see that there's 25 of them.

I can click it, scroll down and click the all direct instances to get a list of those 25.

Clicking that list I will execute this expression:

print(map((dynamic e) => identityHashCode(e)).join(","))

and copy the data from the terminal. In my case it's

1957801911,2799861565,295749294,3607033467,2001722021,1819156124,880687672,2476592613,3065591878,1366402745,3020542853,331678246,4083017078,1514761460,1124323878,764042262,4207241379,3838757159,4215517583,219788470,556663193,3410728899,4031482779,2704432541,2323354312

I'll then go to the debugger and continue, then back to the allocation profile, hitting GC again. I now have 70 _Closure objects. Again I'll click it, click all direct instances, go to the list and this time execute

() {
var set = this.map((dynamic e) => identityHashCode(e)).toSet().difference({1957801911,2799861565,295749294,3607033467,2001722021,1819156124,880687672,2476592613,3065591878,1366402745,3020542853,331678246,4083017078,1514761460,1124323878,764042262,4207241379,3838757159,4215517583,219788470,556663193,3410728899,4031482779,2704432541,2323354312});

return this.where((e) => set.contains(identityHashCode(e))).toList();
}()

(replace the data in the hardcoded set with what you got before).
I get a list of 45 new ones. I'll pick one called Closure (_cancel), click it and execute

identityHashCode(this)

In my case it gives 1365245867.

I'll also click Retaining path and I get

Closure (_cancel) {  ⊞  }
retained by onCancel of _AsyncBroadcastStreamController {  ⊞  }
retained by _controller@4048458 of _BroadcastSubscription {  ⊞  }
retained by 0 of _GrowableList (1) {  ⊞  }
retained by offset -1 of top-level final List<StreamSubscription<FileSystemEvent>> watcherSubscriptions
retained by a GC root (user global)

Going to the debugger and continuing, back to allocation profile, hitting GC again. I now have 79 _Closure objects. Again I'll click it, click all direct instances, go to the list and execute

where((e) => identityHashCode(e) == 1365245867).toList()

(substitute with the identity hash code from above).

I get a list with one entry, I click that entry.

The retaining path is

Closure (_cancel) {  ⊞  }
retained by onCancel of _AsyncBroadcastStreamController {  ⊞  }
retained by 0 of Context (1) {  ⊞  }
retained by _context@0150898 of Closure (_close) {  ⊞  }
retained by _onDone@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by addSubscription of _AddStreamState {  ⊞  }
retained by _addStreamState@4048458 of _AsyncBroadcastStreamController {  ⊞  }
retained by _controller@4048458 of _BroadcastSubscription {  ⊞  }
retained by 0 of _GrowableList (1) {  ⊞  }
retained by offset -1 of top-level final List<StreamSubscription<FileSystemEvent>> watcherSubscriptions
retained by a GC root (user global)

I do the same thing again, there's now 91 and the retaining path for the chosen one is

Closure (_cancel) {  ⊞  }
retained by onCancel of _AsyncBroadcastStreamController {  ⊞  }
retained by 0 of Context (1) {  ⊞  }
retained by _context@0150898 of Closure (_close) {  ⊞  }
retained by _onDone@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by addSubscription of _AddStreamState {  ⊞  }
retained by _addStreamState@4048458 of _AsyncBroadcastStreamController {  ⊞  }
retained by _controller@4048458 of _BroadcastSubscription {  ⊞  }
retained by 0 of _GrowableList (1) {  ⊞  }
retained by offset -1 of top-level final List<StreamSubscription<FileSystemEvent>> watcherSubscriptions
retained by a GC root (user global)

and again: 103 and

Closure (_cancel) {  ⊞  }
retained by onCancel of _AsyncBroadcastStreamController {  ⊞  }
retained by 0 of Context (1) {  ⊞  }
retained by _context@0150898 of Closure (_close) {  ⊞  }
retained by _onDone@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by addSubscription of _AddStreamState {  ⊞  }
retained by _addStreamState@4048458 of _AsyncBroadcastStreamController {  ⊞  }
retained by _controller@4048458 of _BroadcastSubscription {  ⊞  }
retained by 0 of _GrowableList (1) {  ⊞  }
retained by offset -1 of top-level final List<StreamSubscription<FileSystemEvent>> watcherSubscriptions
retained by a GC root (user global)

and for good measure once more: 115 and

Closure (_cancel) {  ⊞  }
retained by onCancel of _AsyncBroadcastStreamController {  ⊞  }
retained by 0 of Context (1) {  ⊞  }
retained by _context@0150898 of Closure (_close) {  ⊞  }
retained by _onDone@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by addSubscription of _AddStreamState {  ⊞  }
retained by _addStreamState@4048458 of _AsyncBroadcastStreamController {  ⊞  }
retained by _controller@4048458 of _BroadcastSubscription {  ⊞  }
retained by 0 of _GrowableList (1) {  ⊞  }
retained by offset -1 of top-level final List<StreamSubscription<FileSystemEvent>> watcherSubscriptions
retained by a GC root (user global)

So we're leaking 12 _Closure objects per iteration and they're seemingly leaked via the _previous pointer on _BroadcastSubscription.

This might not sound that bad, but this - though package:watcher is done in Analyzer for the opened directories; package:watcher - on Linux (seemingly only on Linux though) - then adds watchers for all subdirectories (as I understand it it has to because inotify doesn't work recursivly, whereas the windows and mac equivalents do).
When editing a pubspec.yaml file the analyzer basically does what the above script does and will leak similarly, but now with lots more watched it will leak a lot more.

When opening pkg and editing a pubspec.yaml file inside, waits a bit, edits the yaml file file again waits a bit (and so on and so forth) --- doing it 25 times I have 937,835 _Closures (directly using 57.2MB of ram) --- and 937,057 Contexts (28.7MB), 157,623 _BroadcastSubscriptions (16.8MB) etc etc.

At somewhere between 70 and 80 iterations it's at:
_Closure: 2,998,725 (183.0MB),
Context: 2,997,881 (91.5MB),
_BroadcastSubscription: 501,262 (53.5MB)

/cc @lrhn

@jensjoha
Copy link
Contributor Author

The many iterations isn't that far fetched btw, in #52447 it likely has occurred several hundred times --- though in that instance it's a mac which doesn't suffer this significantly).

@jensjoha
Copy link
Contributor Author

And for completions sake: After 250 such iterations we're at

_Closure: 8,868,011 (541.3MB)
Context: 8,867,233 (270.7MB)
_BroadcastSubscription: 1,479,319 (158.0MB)
_AsyncBroadcastStreamController 1,477,520 (112.7MB)
_FutureListener: 1,475,922 (90.1MB)
_InotifyFileSystemWatcher: 1,475,918 (67.6MB)

(and the heap is now 2GB+)

@lrhn
Copy link
Member

lrhn commented Jun 14, 2023

The BroadcastsSubscription._previous should only hold on to other subscriptions that are currently active on the same controller. If they are actually active, it's fair that they are not GC'ed.

So, do we have a bug in the cancellation, or is the analyzer keeping subscriptions alive longer than necessary?

@jensjoha
Copy link
Contributor Author

If we keep the analyzer out of it for the moment and just look at my example code --- you tell me.
I'm not great with streams et al, but I'd say the old subscriptions have been canceled.

@mkustermann
Copy link
Member

mkustermann commented Jun 14, 2023

If we simplify the example a bit and dump a heapsnapshot:

import 'dart:async';
import 'dart:developer';
import 'dart:io';

void main() async {
  final dir = Directory.fromUri(Uri.parse('pkg/'));

  StreamSubscription? current;
  for (int i = 0; i < 15; i++) {
    final temp = dir.watch().listen((event) {});
    await current?.cancel();
    current = dir.watch().listen((event) {});
    await temp.cancel();
  }
  NativeRuntime.writeHeapSnapshotToFile('watch.heapsnapshot');

  current?.cancel();
}

then examine the watch.heapsnapshot using <sdk>/runtime/tools/heapsnapshot

(hsa) load ../../../watch.heapsnapshot
Loaded heapsnapshot from "../../../watch.heapsnapshot".          
                                                                                                                                                                     
(hsa) all = closure roots
 all {#63864}                                                                                                                                                                                                                         

(hsa) retainers -n1 -d100 sample (filter all _Closure)
There are 1 retaining paths of                                                                                                                                                                                                        
_Closure (dart:core)
⮑ ・_AsyncBroadcastStreamController.onCancel (dart:async)
    ⮑ ﹢Context ()
        ⮑ ・_Closure.context_ (dart:core)
            ⮑ ・_BroadcastSubscription._onData (dart:async)
                ⮑ ﹢_BroadcastSubscription._next (dart:async)
                    ⮑ ﹢_BroadcastSubscription._next (dart:async)
                        ⮑ ﹢_BroadcastSubscription._next (dart:async)
                            ⮑ ﹢_BroadcastSubscription._next (dart:async)
                                ⮑ ﹢_BroadcastSubscription._next (dart:async)
                                    ⮑ ﹢_BroadcastSubscription._next (dart:async)
                                        ⮑ ﹢_BroadcastSubscription._next (dart:async)
                                            ⮑ ﹢_AsyncBroadcastStreamController._firstSubscription (dart:async)
                                                ⮑ ﹢_List (dart:core)
                                                    ⮑ ・_Map.data_ (dart:collection)
                                                        ⮑ ・Isolate._idMap ()
                                                            ⮑ ・Root ()

That indicates that a static _idMap holds on to all of them, indeed we have:

class _InotifyFileSystemWatcher {
  static final Map<int, StreamController> _idMap = {};
}

Whenever we start watching a path first time we put an entry in there, whenever the last watch of a path gets cancelled we clear it out (to share multiple watches of the same path).

Looking closer how the individual fse.watch() streams get their element we find

   _pathWatched().pipe(_broadcastController);

This will unconditionally pipe all elements from the shared watch stream into the per fse.watch() controller. Since we always keep one fse.watch() open on the path, the _patchWatched() will be an infinite stream.

Internally this _pathWatched().pipe(_broadcastController) will of course makes a .listen() on the source and adds it to the destination, but more importantly, it will never cancel that subscription.

When the fse.watch() subscription gets cancelled, we only decrement some counters, but we'll actually not stop this pipe() operation.

A change like this seems to fix it

diff --git a/sdk/lib/_internal/vm/bin/file_patch.dart b/sdk/lib/_internal/vm/bin/file_patch.dart
index d60b8f95da1..353e30430b1 100644
--- a/sdk/lib/_internal/vm/bin/file_patch.dart
+++ b/sdk/lib/_internal/vm/bin/file_patch.dart
@@ -136,6 +136,7 @@ abstract class _FileSystemWatcher {
 
   final StreamController<FileSystemEvent> _broadcastController =
       new StreamController<FileSystemEvent>.broadcast();
+  late StreamSubscription _sourceSubscription;
 
   @patch
   static Stream<FileSystemEvent> _watch(
@@ -193,7 +194,13 @@ abstract class _FileSystemWatcher {
     }
     _watcherPath = _idMap[pathId];
     _watcherPath!.count++;
-    _pathWatched().pipe(_broadcastController);
+    _sourceSubscription = _pathWatched().listen((event) {
+      _broadcastController.add(event);
+    }, onError: (error, stack) {
+      _broadcastController.addError(error, stack);
+    }, onDone: () {
+      _broadcastController.close();
+    });
   }
 
   void _cancel() {
@@ -222,6 +229,7 @@ abstract class _FileSystemWatcher {
       _doneWatcher();
       _id = null;
     }
+    _sourceSubscription.cancel();
   }
 
   // Called when (and after) a new watcher instance is created and available.

@srawlins srawlins added area-sdk Use area-sdk for general purpose SDK issues (packaging, distribution, …). type-bug Incorrect behavior (everything from a crash to more subtle misbehavior) labels Jun 14, 2023
@jensjoha
Copy link
Contributor Author

Martins proposed fix looks to be working to me!

copybara-service bot pushed a commit that referenced this issue Jul 10, 2023
Closes #52703

TEST=runtime/tests/vm/dart/regress_52703_test.dart

Cherry-pick: https://dart-review.googlesource.com/c/sdk/+/309862
Bug: #52703
Change-Id: Idd4b153a9c7b19995f6c6c8bda99b878a21425fb
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/311521
Reviewed-by: Lasse Nielsen <lrn@google.com>
Reviewed-by: Jens Johansen <jensj@google.com>
Reviewed-by: Martin Kustermann <kustermann@google.com>
Commit-Queue: Samuel Rawlins <srawlins@google.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area-sdk Use area-sdk for general purpose SDK issues (packaging, distribution, …). library-io type-bug Incorrect behavior (everything from a crash to more subtle misbehavior)
Projects
None yet
Development

No branches or pull requests

4 participants