Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream stuck when used with mongodb and node10 #670

Closed
jeanbaptiste-brasselet opened this issue Feb 4, 2019 · 7 comments
Closed

Stream stuck when used with mongodb and node10 #670

jeanbaptiste-brasselet opened this issue Feb 4, 2019 · 7 comments

Comments

@jeanbaptiste-brasselet
Copy link

jeanbaptiste-brasselet commented Feb 4, 2019

Hello, I really like Highland and I would like to continue to use it but it starts to be a problem with newest version of node.

The following example works just fine with node 8/9 but stays stuck when using node 10+

const MongoClient = require('mongodb').MongoClient;
const hl = require('highland');

const MONGO_URL = 'mongodb://localhost:27017/';

const DB_NAME = 'someDb';
const COLLECTION = 'someCollection';

const run = async () => {
  const mongo = new MongoClient(url);

  await mongo.connect();
  const cursor = mongo.db(DB_NAME)
    .collection(COLLECTION)
    .find()
    .stream();

  return new Promise((resolve, reject) => {
    return hl(cursor)
    .batch(5)
    .flatten()
    .map(x => console.log(x && x._id))
    .stopOnError(reject)
    .done(resolve);
  });
};

run()
  .catch(console.log)
  .then(() => process.exit())

If you replace the mongoDb stream by a simple stream like this one it works just fine:

class Counter extends Readable {
  constructor(opt) {
    super({ ...opt, objectMode: true });
    this._max = 1000000;
    this._index = 1;
  }

  _read() {
    const i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      this.push({ count: i });
    }
  }
}

Thing is I had a look at the stream implementation inside mongoDB lib and they used the node stream implementation, I have seen nothing weird about it. And something else is a little bit strange, if you remove the flatten inside the hl chain, the stream will not be stucked anymore(even with node10).

Anyway I have no clue what the problem is so if anyone has an idea please share it.

@vqvu
Copy link
Collaborator

vqvu commented Feb 5, 2019 via email

@jeanbaptiste-brasselet
Copy link
Author

jeanbaptiste-brasselet commented Feb 5, 2019

  1. Indeed it makes no sense here, I was doing some other stuff but I have reduced the problem to this :)
  2. Already using the last 3 beta version
  3. No, the number / size does not matter. I have reproduced the issue on a test database with 10 documents with only the field _id.

About mongo : tried with last mongo driver and several mongo server. (3.0, 3.2, 4.0). I will try with another mongo driver version.

@vqvu
Copy link
Collaborator

vqvu commented Feb 6, 2019

Ok, I know what's wrong.

There's an awaitDrain variable in the readable state that is incremented whenever the call to write from a pipe returns false. A subsequent drain decrements it, and only when it is 0 will the drain actually take effect. I think it's used to pause the source when at least one pipe destination is not accepting new data. So if awaitDrain ever gets incremented twice in before a drain decrements it, we're in trouble.

Turns out, it's possible for Highland to emit drain from within a write. So a write causes a drain, which causes another write, all synchronously. If the second write returns false and the first returns false, awaitDrain ends up getting incremented twice in a row, since the drain is emitted before either write returns. Hence deadlock.

Here's a simplified test case without mongo.

const {Readable, Writable} = require('stream')
const _ = require('highland');

class Counter extends Readable {
  constructor(opt) {
    super({ ...opt, objectMode: true });
    this._max = 10;
    this._index = 1;
    this._firstTime = false;
  }

  _emitNext() {
    const i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      this.push({ count: i });
    }
  }

  _read() {
    if (this._firstTime) {
      // Delay the first item so that when it's emitted, Highland isn't in a
      // generator loop. This allows 'drain' to be emitted within write().
      this._firstTime = false;
      setTimeout(() => this._emitNext(), 0);
    } else {
      this._emitNext();
    }
  }
}

async function fn() {
  await _(new Counter())
    .batch(2)
    .consume((err, x, push, next) => {
      // Delay the batch so that the write() -> emit('drain') -> write() stack
      // can unwind, causing multiple increments of awaitDrain.
      setTimeout(() => {
        if (x !== null) {
          next();
        }
        push(err, x);
      }, 0);
    })
    .filter((x) => {
      console.log(x);
      return false;
    })
    .toPromise(Promise);

  console.log('done');
}

fn();

In node 9, this wasn't a problem, since there was a guard that prevented this kind of double increment. Node 10+ contains nodejs/node#18516, which removed the guard in favor of some other logic.

It's not clear to me that the change in Node 10 is absolutely correct, since there can still be a deadlock if write happens to be called within a write. However, it's also clear that Highland's usage of the drain event is definitely incorrect, since it is draining when it hasn't even paused the source. Perhaps the changes to awaitDrain is correct if the drain event is used properly.

The fix is simple enough: just don't drain unless we've previously paused the source. I'll submit it later this week.

@jeanbaptiste-brasselet
Copy link
Author

Wow, thx for the detailed explanation and for the time spent on this.

vqvu added a commit to vqvu/highland that referenced this issue Feb 13, 2019
I am applying this fix to the 2.x branch even though caolan#670 only seems to
happen in 3.x, since the 2.x code isn't really doing the right thing.
It's likely that the issue doesn't manifest due to some other
coincidence.
vqvu added a commit to vqvu/highland that referenced this issue Feb 13, 2019
I am applying this fix to the 2.x branch even though caolan#670 only seems to
happen in 3.x, since the 2.x code isn't really doing the right thing.
It's likely that the issue doesn't manifest due to some other
coincidence.

As a bonus, correctly using the pipe/drain protocol allows us to remove
the target.pause() call from the implementation of through().
@vqvu
Copy link
Collaborator

vqvu commented Feb 13, 2019

I think I have a fix.

As a sanity check, can you apply this diff to your node_modules/highland/lib/index.js and see if it works?

diff --git a/lib/index.js b/lib/index.js
index 927ae19..3faf0de 100755
--- a/lib/index.js
+++ b/lib/index.js
@@ -834,6 +834,10 @@ function Stream(generator) {
     this._defer_run_generator = false;
     this._run_generator_deferred = false;
 
+    // Signals whether or not a call to write() returned false, and thus we can
+    // drain. This is only relevant for streams constructed with _().
+    this._can_drain = false;
+
     var self = this;
 
     // These are defined here instead of on the prototype
@@ -1258,8 +1262,9 @@ addMethod('resume', function () {
     if (this._generator) {
         this._runGenerator();
     }
-    else {
+    else if (this._can_drain) {
         // perhaps a node stream is being piped in
+        this._can_drain = false;
         this.emit('drain');
     }
 });
@@ -1641,6 +1646,11 @@ addMethod('pull', function (f) {
 addMethod('write', function (x) {
     // console.log(this.id, 'write', x, this.paused);
     this._writeOutgoing(x);
+
+    if (this.paused && !this._generator) {
+        this._can_drain = true;
+    }
+
     return !this.paused;
 });
 
@@ -3412,7 +3422,6 @@ addMethod('through', function (target) {
         return target(this);
     }
     else {
-        target.pause();
         output = this.createChild();
         this.on('error', writeErr);
         target.on('error', writeErr);

You should be able to run patch node_modules/highland/lib/index.js <diff-file>.

I want to verify that there's not a deeper issue that I've missed.

@jeanbaptiste-brasselet
Copy link
Author

Hello,

I can confirm the changes also fix my use case :)

@vqvu
Copy link
Collaborator

vqvu commented Feb 15, 2019

This fix has been released as 3.0.0-beta.8 and 2.13.1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants