From b291878d1b9fb779fca76dda19258336324658e6 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Mon, 16 Dec 2019 22:17:43 +0100 Subject: [PATCH 1/6] fixup: simplify --- lib/_stream_transform.js | 136 ++++++++--------------------- test/parallel/test-transform-by.js | 43 +++++---- 2 files changed, 58 insertions(+), 121 deletions(-) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 0eee13369be307..45bd819bfdf74f 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -78,6 +78,7 @@ const { ERR_TRANSFORM_WITH_LENGTH_0 } = require('internal/errors').codes; const Duplex = require('_stream_duplex'); +const Readable = require('_stream_readable'); const AsyncIteratorPrototype = ObjectGetPrototypeOf( ObjectGetPrototypeOf(async function* () {}).prototype); @@ -232,110 +233,47 @@ function done(stream, er, data) { return stream.push(null); } -function SourceIterator(asyncGeneratorFn, opts) { - const source = this; - const result = asyncGeneratorFn(this); - if (typeof result[Symbol.asyncIterator] !== 'function') { - throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn'); - } - const iter = result[Symbol.asyncIterator](); - if (typeof iter.next !== 'function') { - throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn'); - } +const from = require('internal/streams/from'); +const createReadableStreamAsyncIterator = require('internal/streams/async_iterator'); - this[kSourceIteratorPull] = null; - this[kSourceIteratorChunk] = null; - this[kSourceIteratorResolve] = null; - this[kSourceIteratorStream] = new Transform({ +Transform.by = function by(asyncGeneratorFn, opts) { + let resume = null; + function next() { + if (resume) { + const doResume = resume; + resume = null; + doResume(); + } + } + const input = new Readable({ objectMode: true, - ...opts, - transform(chunk, encoding, cb) { - source.encoding = encoding; - if (source[kSourceIteratorResolve] === null) { - source[kSourceIteratorChunk] = chunk; - source[kSourceIteratorPull] = cb; - return; - } - source[kSourceIteratorResolve]({ value: chunk, done: false }); - source[kSourceIteratorResolve] = null; - cb(null); + autoDestroy: true, + read: next, + highWaterMark: 1, // TODO: Buffer here? + destroy (err, callback) { + next(); + callback(err); } }); - this.encoding = this[kSourceIteratorStream]._transformState.writeencoding; - this[kSourceIteratorGrabResolve] = (resolve) => { - this[kSourceIteratorResolve] = resolve; - }; - const first = iter.next(); - this[kSourceIteratorPump](iter, first); -} - -SourceIterator.prototype[Symbol.asyncIterator] = function() { - return this; -}; -ObjectSetPrototypeOf(SourceIterator.prototype, AsyncIteratorPrototype); - -SourceIterator.prototype.next = function next() { - if (this[kSourceIteratorPull] === null || this[kSourceIteratorChunk] === null) - return new Promise(this[kSourceIteratorGrabResolve]); - - this[kSourceIteratorPull](null); - const result = Promise.resolve({ - value: this[kSourceIteratorChunk], - done: false - }); - this[kSourceIteratorChunk] = null; - this[kSourceIteratorPull] = null; - return result; -}; - -SourceIterator.prototype[kSourceIteratorPump] = async function pump(iter, p) { - const stream = this[kSourceIteratorStream]; - try { - stream.removeListener('prefinish', prefinish); - stream.on('prefinish', () => { - if (this[kSourceIteratorResolve] !== null) { - this[kSourceIteratorResolve]({ value: undefined, done: true }); - } - }); - let next = await p; - while (true) { - const { done, value } = next; - if (done) { - if (value !== undefined) stream.push(value); - - // In the event of an early return we explicitly - // discard any buffered state - if (stream._writableState.length > 0) { - const { length } = stream._writableState; - const { transforming } = stream._transformState; - stream._writableState.length = 0; - stream._transformState.transforming = false; - prefinish.call(stream); - stream._writableState.length = length; - stream._transformState.transforming = transforming; - } else { - prefinish.call(stream); - } - break; + const iterator = createReadableStreamAsyncIterator(input); + return from(Duplex, asyncGeneratorFn(iterator), { + objectMode: true, + autoDestroy: true, + ...opts, + write(chunk, encoding, callback) { + if (!input.push(chunk)) { + resume = callback; + } else { + callback(); } - stream.push(value); - next = await iter.next(); + }, + final(callback) { + input.push(null); + resume = callback; + }, + destroy(err, callback) { + input.destroy(err, callback); } - } catch (err) { - process.nextTick(() => stream.destroy(err)); - } finally { - this[kSourceIteratorPull] = null; - this[kSourceIteratorChunk] = null; - this[kSourceIteratorResolve] = null; - this[kSourceIteratorStream] = null; - } -}; - - -Transform.by = function by(asyncGeneratorFn, opts) { - const source = new SourceIterator(asyncGeneratorFn, opts); - const stream = source[kSourceIteratorStream]; - - return stream; + }); }; diff --git a/test/parallel/test-transform-by.js b/test/parallel/test-transform-by.js index fe01986b8d1a0b..6c7328c3fcc728 100644 --- a/test/parallel/test-transform-by.js +++ b/test/parallel/test-transform-by.js @@ -5,7 +5,7 @@ const { Readable, Transform } = require('stream'); const { strictEqual } = require('assert'); async function transformBy() { - const readable = Readable.from('test'); + const readable = Readable.from('test'.split('')); async function * mapper(source) { for await (const chunk of source) { yield chunk.toUpperCase(); @@ -21,7 +21,7 @@ async function transformBy() { } async function transformByFuncReturnsObjectWithSymbolAsyncIterator() { - const readable = Readable.from('test'); + const readable = Readable.from('test'.split('')); const mapper = (source) => ({ [Symbol.asyncIterator]() { return { @@ -55,8 +55,7 @@ transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext() { }); expectsError(() => Transform.by(mapper), { - message: 'asyncGeneratorFn must return an async iterable', - code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', + code: 'ERR_INVALID_ARG_TYPE', type: TypeError }); } @@ -69,8 +68,7 @@ async function transformByObjReturnedWSymbolAsyncIteratorWithNoNext() { }); expectsError(() => Transform.by(mapper), { - message: 'asyncGeneratorFn must return an async iterable', - code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', + code: 'ERR_INVALID_ARG_TYPE', type: TypeError }); } @@ -91,14 +89,13 @@ async function transformByFuncReturnsObjectWithoutSymbolAsyncIterator() { const mapper = () => ({}); expectsError(() => Transform.by(mapper), { - message: 'asyncGeneratorFn must return an async iterable', - code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', + code: 'ERR_INVALID_ARG_TYPE', type: TypeError }); } async function transformByEncoding() { - const readable = Readable.from('test'); + const readable = Readable.from('test'.split('')); async function * mapper(source) { for await (const chunk of source) { strictEqual(source.encoding, 'ascii'); @@ -116,7 +113,7 @@ async function transformByEncoding() { } async function transformBySourceIteratorCompletes() { - const readable = Readable.from('test'); + const readable = Readable.from('test'.split('')); const mustReach = mustCall(); async function * mapper(source) { for await (const chunk of source) { @@ -134,7 +131,7 @@ async function transformBySourceIteratorCompletes() { } async function transformByYieldPlusReturn() { - const readable = Readable.from('test'); + const readable = Readable.from('test'.split('')); async function * mapper(source) { for await (const chunk of source) { yield chunk.toUpperCase(); @@ -151,7 +148,7 @@ async function transformByYieldPlusReturn() { } async function transformByReturnEndsStream() { - const readable = Readable.from('test'); + const readable = Readable.from('test'.split('')); async function * mapper(source) { for await (const chunk of source) { yield chunk.toUpperCase(); @@ -170,7 +167,7 @@ async function transformByReturnEndsStream() { } async function transformByOnData() { - const readable = Readable.from('test'); + const readable = Readable.from('test'.split('')); async function * mapper(source) { for await (const chunk of source) { yield chunk.toUpperCase(); @@ -191,7 +188,7 @@ async function transformByOnData() { } async function transformByOnDataNonObject() { - const readable = Readable.from('test', { objectMode: false }); + const readable = Readable.from('test'.split(''), { objectMode: false }); async function * mapper(source) { for await (const chunk of source) { yield chunk.toString().toUpperCase(); @@ -212,7 +209,7 @@ async function transformByOnDataNonObject() { } async function transformByOnErrorAndDestroyed() { - const stream = Readable.from('test').pipe(Transform.by( + const stream = Readable.from('test'.split('')).pipe(Transform.by( async function * mapper(source) { for await (const chunk of source) { if (chunk === 'e') throw new Error('kaboom'); @@ -230,7 +227,7 @@ async function transformByOnErrorAndDestroyed() { } async function transformByErrorTryCatchAndDestroyed() { - const stream = Readable.from('test').pipe(Transform.by( + const stream = Readable.from('test'.split('')).pipe(Transform.by( async function * mapper(source) { for await (const chunk of source) { if (chunk === 'e') throw new Error('kaboom'); @@ -250,7 +247,7 @@ async function transformByErrorTryCatchAndDestroyed() { } async function transformByOnErrorAndTryCatchAndDestroyed() { - const stream = Readable.from('test').pipe(Transform.by( + const stream = Readable.from('test'.split('')).pipe(Transform.by( async function * mapper(source) { for await (const chunk of source) { if (chunk === 'e') throw new Error('kaboom'); @@ -286,17 +283,19 @@ async function transformByThrowPriorToForAwait() { strictEqual(err.message, 'kaboom'); })); - read.pipe(stream); + read.pipe(stream).resume(); } Promise.all([ transformBy(), transformByFuncReturnsObjectWithSymbolAsyncIterator(), - transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext(), - transformByObjReturnedWSymbolAsyncIteratorWithNoNext(), - transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction(), + // NOTE: These should be handled by Readable.from. + // transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext(), + // transformByObjReturnedWSymbolAsyncIteratorWithNoNext(), + // transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction(), transformByFuncReturnsObjectWithoutSymbolAsyncIterator(), - transformByEncoding(), + // NOTE: This doesn't make sense for iterable? Is it consistent with Readable.from? + // transformByEncoding(), transformBySourceIteratorCompletes(), transformByYieldPlusReturn(), transformByReturnEndsStream(), From 62d632650fe60cef97bc212e58e125dde74d54f9 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 17 Dec 2019 00:04:34 +0100 Subject: [PATCH 2/6] fixup: more simplify --- lib/_stream_transform.js | 47 ++++++++++------------------------------ 1 file changed, 12 insertions(+), 35 deletions(-) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 45bd819bfdf74f..959fe1e0c90e4b 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -234,46 +234,23 @@ function done(stream, er, data) { } const from = require('internal/streams/from'); -const createReadableStreamAsyncIterator = require('internal/streams/async_iterator'); Transform.by = function by(asyncGeneratorFn, opts) { - let resume = null; - function next() { - if (resume) { - const doResume = resume; - resume = null; - doResume(); + let _resolve; + let _promise = new Promise((resolve) => _resolve = resolve); + return from(Duplex, asyncGeneratorFn(async function*() { + while (true) { + const { chunk, done, cb } = await _promise; + if (done) return cb(); + yield chunk; + _promise = new Promise((resolve) => _resolve = resolve); + cb(); } - } - const input = new Readable({ - objectMode: true, - autoDestroy: true, - read: next, - highWaterMark: 1, // TODO: Buffer here? - destroy (err, callback) { - next(); - callback(err); - } - }); - - const iterator = createReadableStreamAsyncIterator(input); - return from(Duplex, asyncGeneratorFn(iterator), { + }()), { objectMode: true, autoDestroy: true, ...opts, - write(chunk, encoding, callback) { - if (!input.push(chunk)) { - resume = callback; - } else { - callback(); - } - }, - final(callback) { - input.push(null); - resume = callback; - }, - destroy(err, callback) { - input.destroy(err, callback); - } + write: (chunk, encoding, cb) => _resolve({ chunk, done: false, cb }), + final: (cb) => _resolve({ done: true, cb }) }); }; From 824e2f8bc92c2d5fb7d8ed5f2fadf3bf8c234fc8 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 17 Dec 2019 13:16:58 +0100 Subject: [PATCH 3/6] fixup: errors --- doc/api/errors.md | 6 ------ lib/internal/errors.js | 2 -- lib/internal/streams/from.js | 7 ++++--- test/parallel/test-transform-by.js | 28 +++------------------------- 4 files changed, 7 insertions(+), 36 deletions(-) diff --git a/doc/api/errors.md b/doc/api/errors.md index 278e3386923e76..cf21c142d6dd97 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -630,12 +630,6 @@ display if `block` does not throw. An iterable argument (i.e. a value that works with `for...of` loops) was required, but not provided to a Node.js API. - -### ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE - -A function argument that returns an async iterable (i.e. a value that works -with `for await...of` loops) was required, but not provided to a Node.js API. - ### ERR_ASSERTION diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 39f580a609cdef..ad12d99c7cc49c 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -720,8 +720,6 @@ module.exports = { // Note: Node.js specific errors must begin with the prefix ERR_ E('ERR_AMBIGUOUS_ARGUMENT', 'The "%s" argument is ambiguous. %s', TypeError); E('ERR_ARG_NOT_ITERABLE', '%s must be iterable', TypeError); -E('ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', '%s must return an async iterable', - TypeError); E('ERR_ASSERTION', '%s', Error); E('ERR_ASYNC_CALLBACK', '%s must be a function', TypeError); E('ERR_ASYNC_TYPE', 'Invalid name for async "type": %s', TypeError); diff --git a/lib/internal/streams/from.js b/lib/internal/streams/from.js index ab6db00a125a0b..34712b3f6c6656 100644 --- a/lib/internal/streams/from.js +++ b/lib/internal/streams/from.js @@ -23,11 +23,12 @@ function from(Readable, iterable, opts) { }); } - if (iterable && iterable[SymbolAsyncIterator]) + if (iterable && typeof iterable[SymbolAsyncIterator] === 'function') iterator = iterable[SymbolAsyncIterator](); - else if (iterable && iterable[SymbolIterator]) + else if (iterable && typeof iterable[SymbolIterator] === 'function') iterator = iterable[SymbolIterator](); - else + + if (!iterator || typeof iterator.next !== 'function') throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable); const readable = new Readable({ diff --git a/test/parallel/test-transform-by.js b/test/parallel/test-transform-by.js index 6c7328c3fcc728..64b12e8b3cc6b1 100644 --- a/test/parallel/test-transform-by.js +++ b/test/parallel/test-transform-by.js @@ -41,25 +41,6 @@ async function transformByFuncReturnsObjectWithSymbolAsyncIterator() { } } -async function -transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext() { - const mapper = (source) => ({ - [Symbol.asyncIterator]() { - return { - next() { - const { done, value } = source.next(); - return { done, value: value ? value.toUpperCase() : value }; - } - }; - } - }); - - expectsError(() => Transform.by(mapper), { - code: 'ERR_INVALID_ARG_TYPE', - type: TypeError - }); -} - async function transformByObjReturnedWSymbolAsyncIteratorWithNoNext() { const mapper = () => ({ [Symbol.asyncIterator]() { @@ -79,8 +60,7 @@ async function transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction() { }); expectsError(() => Transform.by(mapper), { - message: 'asyncGeneratorFn must return an async iterable', - code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', + code: 'ERR_INVALID_ARG_TYPE', type: TypeError }); } @@ -289,10 +269,8 @@ async function transformByThrowPriorToForAwait() { Promise.all([ transformBy(), transformByFuncReturnsObjectWithSymbolAsyncIterator(), - // NOTE: These should be handled by Readable.from. - // transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext(), - // transformByObjReturnedWSymbolAsyncIteratorWithNoNext(), - // transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction(), + transformByObjReturnedWSymbolAsyncIteratorWithNoNext(), + transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction(), transformByFuncReturnsObjectWithoutSymbolAsyncIterator(), // NOTE: This doesn't make sense for iterable? Is it consistent with Readable.from? // transformByEncoding(), From 9fbb8d4de8e6b6e7b31afbf5bf710e65b9da7d09 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 17 Dec 2019 22:10:51 +0100 Subject: [PATCH 4/6] fixup: benchmark --- benchmark/streams/transform-by.js | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 benchmark/streams/transform-by.js diff --git a/benchmark/streams/transform-by.js b/benchmark/streams/transform-by.js new file mode 100644 index 00000000000000..5b051820e044bc --- /dev/null +++ b/benchmark/streams/transform-by.js @@ -0,0 +1,23 @@ +'use strict'; + +const common = require('../common'); +const Transform = require('stream').Transform; + +const bench = common.createBenchmark(main, { + n: [1e6] +}); + +function main({ n }) { + const s = Transform.by(async function*(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + }); + s.resume(); + + bench.start(); + for (var k = 0; k < n; ++k) { + s.write(String.fromCharCode(n % 64 + 64)); + } + s.end(() => bench.end(n)); +} From de8bb15295cfb877413b545601593b7f10a0127d Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 17 Dec 2019 22:16:05 +0100 Subject: [PATCH 5/6] fixup: remove encoding test --- test/parallel/test-transform-by.js | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/test/parallel/test-transform-by.js b/test/parallel/test-transform-by.js index 64b12e8b3cc6b1..42991d9537f97b 100644 --- a/test/parallel/test-transform-by.js +++ b/test/parallel/test-transform-by.js @@ -74,24 +74,6 @@ async function transformByFuncReturnsObjectWithoutSymbolAsyncIterator() { }); } -async function transformByEncoding() { - const readable = Readable.from('test'.split('')); - async function * mapper(source) { - for await (const chunk of source) { - strictEqual(source.encoding, 'ascii'); - yield chunk.toUpperCase(); - } - } - const stream = Transform.by(mapper); - stream.setDefaultEncoding('ascii'); - readable.pipe(stream); - - const expected = ['T', 'E', 'S', 'T']; - for await (const chunk of stream) { - strictEqual(chunk, expected.shift()); - } -} - async function transformBySourceIteratorCompletes() { const readable = Readable.from('test'.split('')); const mustReach = mustCall(); @@ -272,8 +254,6 @@ Promise.all([ transformByObjReturnedWSymbolAsyncIteratorWithNoNext(), transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction(), transformByFuncReturnsObjectWithoutSymbolAsyncIterator(), - // NOTE: This doesn't make sense for iterable? Is it consistent with Readable.from? - // transformByEncoding(), transformBySourceIteratorCompletes(), transformByYieldPlusReturn(), transformByReturnEndsStream(), From 7f5a5bc5f9f4f70446a485de5d71c98a2daf6c08 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 18 Dec 2019 08:32:42 +0100 Subject: [PATCH 6/6] fixup: benchmrk --- benchmark/streams/transform-by.js | 12 +++++++++--- doc/api/stream.md | 8 ++++---- lib/_stream_transform.js | 20 +++++++------------- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/benchmark/streams/transform-by.js b/benchmark/streams/transform-by.js index 5b051820e044bc..8c2afa0732c22f 100644 --- a/benchmark/streams/transform-by.js +++ b/benchmark/streams/transform-by.js @@ -16,8 +16,14 @@ function main({ n }) { s.resume(); bench.start(); - for (var k = 0; k < n; ++k) { - s.write(String.fromCharCode(n % 64 + 64)); + + let k = 0; + function run() { + while (k++ < n && s.write(b)); + if (k >= n) + s.end(); } - s.end(() => bench.end(n)); + s.on('drain', run); + s.on('finish', () => bench.end(n)); + run(); } diff --git a/doc/api/stream.md b/doc/api/stream.md index 3526f869578c8e..c8857812a1a481 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1646,14 +1646,14 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have the strings or buffers be iterated to match the other streams semantics for performance reasons. -### stream.Transform.by(asyncGeneratorFunction[, options]) +### stream.Transform.by(transform[, options]) -* `asyncGeneratorFunction` {AsyncGeneratorFunction} A mapping function which -accepts a `source` async iterable which can be used to read incoming data, while -transformed data is pushed to the stream with the `yield` keyword. +* `transform` {AsyncGeneratorFunction} A mapping function which accepts a `source` +async iterable which can be used to read incoming data, while transformed data is +pushed to the stream with the `yield` keyword. * `options` {Object} Options provided to `new stream.Transform([options])`. By default, `Transform.by()` will set `options.objectMode` to `true`, unless this is explicitly opted out by setting `options.objectMode` to `false`. diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 959fe1e0c90e4b..ed25f7ea069113 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -71,23 +71,12 @@ const { module.exports = Transform; const { - ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE, ERR_METHOD_NOT_IMPLEMENTED, ERR_MULTIPLE_CALLBACK, ERR_TRANSFORM_ALREADY_TRANSFORMING, ERR_TRANSFORM_WITH_LENGTH_0 } = require('internal/errors').codes; const Duplex = require('_stream_duplex'); -const Readable = require('_stream_readable'); -const AsyncIteratorPrototype = ObjectGetPrototypeOf( - ObjectGetPrototypeOf(async function* () {}).prototype); - -const kSourceIteratorPull = Symbol('kSourceIteratorPull'); -const kSourceIteratorResolve = Symbol('kSourceIteratorResolve'); -const kSourceIteratorChunk = Symbol('kSourceIteratorChunk'); -const kSourceIteratorStream = Symbol('kSourceIteratorStream'); -const kSourceIteratorPump = Symbol('kSourceIteratorPump'); -const kSourceIteratorGrabResolve = Symbol('kSourceIteratorGrabResolve'); ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); ObjectSetPrototypeOf(Transform, Duplex); @@ -235,10 +224,15 @@ function done(stream, er, data) { const from = require('internal/streams/from'); -Transform.by = function by(asyncGeneratorFn, opts) { +Transform.by = function by(transform, opts) { let _resolve; let _promise = new Promise((resolve) => _resolve = resolve); - return from(Duplex, asyncGeneratorFn(async function*() { + + if (typeof transform !== 'function') { + throw new ERR_INVALID_ARG_TYPE('transform', ['function'], iterable); + } + + return from(Duplex, transform(async function*() { while (true) { const { chunk, done, cb } = await _promise; if (done) return cb();