Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixup: simplify #12

Open
wants to merge 6 commits into
base: transform-by
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -630,12 +630,6 @@ display if `block` does not throw.
An iterable argument (i.e. a value that works with `for...of` loops) was
required, but not provided to a Node.js API.

<a id="ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE"></a>
### ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE

A function argument that returns an async iterable (i.e. a value that works
with `for await...of` loops) was required, but not provided to a Node.js API.

<a id="ERR_ASSERTION"></a>
### ERR_ASSERTION

Expand Down
119 changes: 17 additions & 102 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ const {
ERR_TRANSFORM_WITH_LENGTH_0
} = require('internal/errors').codes;
const Duplex = require('_stream_duplex');
const Readable = require('_stream_readable');
const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);

Expand Down Expand Up @@ -232,110 +233,24 @@ function done(stream, er, data) {
return stream.push(null);
}

function SourceIterator(asyncGeneratorFn, opts) {
const source = this;
const result = asyncGeneratorFn(this);
if (typeof result[Symbol.asyncIterator] !== 'function') {
throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn');
}
const iter = result[Symbol.asyncIterator]();
if (typeof iter.next !== 'function') {
throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn');
}
const from = require('internal/streams/from');

this[kSourceIteratorPull] = null;
this[kSourceIteratorChunk] = null;
this[kSourceIteratorResolve] = null;
this[kSourceIteratorStream] = new Transform({
Transform.by = function by(asyncGeneratorFn, opts) {
let _resolve;
let _promise = new Promise((resolve) => _resolve = resolve);
return from(Duplex, asyncGeneratorFn(async function*() {
while (true) {
const { chunk, done, cb } = await _promise;
if (done) return cb();
yield chunk;
_promise = new Promise((resolve) => _resolve = resolve);
cb();
}
}()), {
objectMode: true,
autoDestroy: true,
...opts,
transform(chunk, encoding, cb) {
source.encoding = encoding;
if (source[kSourceIteratorResolve] === null) {
source[kSourceIteratorChunk] = chunk;
source[kSourceIteratorPull] = cb;
return;
}
source[kSourceIteratorResolve]({ value: chunk, done: false });
source[kSourceIteratorResolve] = null;
cb(null);
}
write: (chunk, encoding, cb) => _resolve({ chunk, done: false, cb }),
final: (cb) => _resolve({ done: true, cb })
});
this.encoding = this[kSourceIteratorStream]._transformState.writeencoding;
this[kSourceIteratorGrabResolve] = (resolve) => {
this[kSourceIteratorResolve] = resolve;
};
const first = iter.next();
this[kSourceIteratorPump](iter, first);
}

SourceIterator.prototype[Symbol.asyncIterator] = function() {
return this;
};

ObjectSetPrototypeOf(SourceIterator.prototype, AsyncIteratorPrototype);

SourceIterator.prototype.next = function next() {
if (this[kSourceIteratorPull] === null || this[kSourceIteratorChunk] === null)
return new Promise(this[kSourceIteratorGrabResolve]);

this[kSourceIteratorPull](null);
const result = Promise.resolve({
value: this[kSourceIteratorChunk],
done: false
});
this[kSourceIteratorChunk] = null;
this[kSourceIteratorPull] = null;
return result;
};

SourceIterator.prototype[kSourceIteratorPump] = async function pump(iter, p) {
const stream = this[kSourceIteratorStream];
try {
stream.removeListener('prefinish', prefinish);
stream.on('prefinish', () => {
if (this[kSourceIteratorResolve] !== null) {
this[kSourceIteratorResolve]({ value: undefined, done: true });
}
});
let next = await p;
while (true) {
const { done, value } = next;
if (done) {
if (value !== undefined) stream.push(value);

// In the event of an early return we explicitly
// discard any buffered state
if (stream._writableState.length > 0) {
const { length } = stream._writableState;
const { transforming } = stream._transformState;
stream._writableState.length = 0;
stream._transformState.transforming = false;
prefinish.call(stream);
stream._writableState.length = length;
stream._transformState.transforming = transforming;
} else {
prefinish.call(stream);
}
break;
}
stream.push(value);
next = await iter.next();
}
} catch (err) {
process.nextTick(() => stream.destroy(err));
} finally {
this[kSourceIteratorPull] = null;
this[kSourceIteratorChunk] = null;
this[kSourceIteratorResolve] = null;
this[kSourceIteratorStream] = null;
}
};


Transform.by = function by(asyncGeneratorFn, opts) {
const source = new SourceIterator(asyncGeneratorFn, opts);
const stream = source[kSourceIteratorStream];

return stream;
};
2 changes: 0 additions & 2 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,6 @@ module.exports = {
// Note: Node.js specific errors must begin with the prefix ERR_
E('ERR_AMBIGUOUS_ARGUMENT', 'The "%s" argument is ambiguous. %s', TypeError);
E('ERR_ARG_NOT_ITERABLE', '%s must be iterable', TypeError);
E('ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', '%s must return an async iterable',
TypeError);
E('ERR_ASSERTION', '%s', Error);
E('ERR_ASYNC_CALLBACK', '%s must be a function', TypeError);
E('ERR_ASYNC_TYPE', 'Invalid name for async "type": %s', TypeError);
Expand Down
7 changes: 4 additions & 3 deletions lib/internal/streams/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ function from(Readable, iterable, opts) {
});
}

if (iterable && iterable[SymbolAsyncIterator])
if (iterable && typeof iterable[SymbolAsyncIterator] === 'function')
iterator = iterable[SymbolAsyncIterator]();
else if (iterable && iterable[SymbolIterator])
else if (iterable && typeof iterable[SymbolIterator] === 'function')
iterator = iterable[SymbolIterator]();
else

if (!iterator || typeof iterator.next !== 'function')
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidmarkclements Moved invalid arg check to Readable.from.


const readable = new Readable({
Expand Down
57 changes: 17 additions & 40 deletions test/parallel/test-transform-by.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const { Readable, Transform } = require('stream');
const { strictEqual } = require('assert');

async function transformBy() {
const readable = Readable.from('test');
const readable = Readable.from('test'.split(''));
async function * mapper(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
Expand All @@ -21,7 +21,7 @@ async function transformBy() {
}

async function transformByFuncReturnsObjectWithSymbolAsyncIterator() {
const readable = Readable.from('test');
const readable = Readable.from('test'.split(''));
const mapper = (source) => ({
[Symbol.asyncIterator]() {
return {
Expand All @@ -41,26 +41,6 @@ async function transformByFuncReturnsObjectWithSymbolAsyncIterator() {
}
}

async function
transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext() {
const mapper = (source) => ({
[Symbol.asyncIterator]() {
return {
next() {
const { done, value } = source.next();
return { done, value: value ? value.toUpperCase() : value };
}
};
}
});

expectsError(() => Transform.by(mapper), {
message: 'asyncGeneratorFn must return an async iterable',
code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE',
type: TypeError
});
}

async function transformByObjReturnedWSymbolAsyncIteratorWithNoNext() {
const mapper = () => ({
[Symbol.asyncIterator]() {
Expand All @@ -69,8 +49,7 @@ async function transformByObjReturnedWSymbolAsyncIteratorWithNoNext() {
});

expectsError(() => Transform.by(mapper), {
message: 'asyncGeneratorFn must return an async iterable',
code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE',
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
});
}
Expand All @@ -81,8 +60,7 @@ async function transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction() {
});

expectsError(() => Transform.by(mapper), {
message: 'asyncGeneratorFn must return an async iterable',
code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE',
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
});
}
Expand All @@ -91,14 +69,13 @@ async function transformByFuncReturnsObjectWithoutSymbolAsyncIterator() {
const mapper = () => ({});

expectsError(() => Transform.by(mapper), {
message: 'asyncGeneratorFn must return an async iterable',
code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE',
code: 'ERR_INVALID_ARG_TYPE',
type: TypeError
});
}

async function transformByEncoding() {
const readable = Readable.from('test');
const readable = Readable.from('test'.split(''));
async function * mapper(source) {
for await (const chunk of source) {
strictEqual(source.encoding, 'ascii');
Expand All @@ -116,7 +93,7 @@ async function transformByEncoding() {
}

async function transformBySourceIteratorCompletes() {
const readable = Readable.from('test');
const readable = Readable.from('test'.split(''));
const mustReach = mustCall();
async function * mapper(source) {
for await (const chunk of source) {
Expand All @@ -134,7 +111,7 @@ async function transformBySourceIteratorCompletes() {
}

async function transformByYieldPlusReturn() {
const readable = Readable.from('test');
const readable = Readable.from('test'.split(''));
async function * mapper(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
Expand All @@ -151,7 +128,7 @@ async function transformByYieldPlusReturn() {
}

async function transformByReturnEndsStream() {
const readable = Readable.from('test');
const readable = Readable.from('test'.split(''));
async function * mapper(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
Expand All @@ -170,7 +147,7 @@ async function transformByReturnEndsStream() {
}

async function transformByOnData() {
const readable = Readable.from('test');
const readable = Readable.from('test'.split(''));
async function * mapper(source) {
for await (const chunk of source) {
yield chunk.toUpperCase();
Expand All @@ -191,7 +168,7 @@ async function transformByOnData() {
}

async function transformByOnDataNonObject() {
const readable = Readable.from('test', { objectMode: false });
const readable = Readable.from('test'.split(''), { objectMode: false });
async function * mapper(source) {
for await (const chunk of source) {
yield chunk.toString().toUpperCase();
Expand All @@ -212,7 +189,7 @@ async function transformByOnDataNonObject() {
}

async function transformByOnErrorAndDestroyed() {
const stream = Readable.from('test').pipe(Transform.by(
const stream = Readable.from('test'.split('')).pipe(Transform.by(
async function * mapper(source) {
for await (const chunk of source) {
if (chunk === 'e') throw new Error('kaboom');
Expand All @@ -230,7 +207,7 @@ async function transformByOnErrorAndDestroyed() {
}

async function transformByErrorTryCatchAndDestroyed() {
const stream = Readable.from('test').pipe(Transform.by(
const stream = Readable.from('test'.split('')).pipe(Transform.by(
async function * mapper(source) {
for await (const chunk of source) {
if (chunk === 'e') throw new Error('kaboom');
Expand All @@ -250,7 +227,7 @@ async function transformByErrorTryCatchAndDestroyed() {
}

async function transformByOnErrorAndTryCatchAndDestroyed() {
const stream = Readable.from('test').pipe(Transform.by(
const stream = Readable.from('test'.split('')).pipe(Transform.by(
async function * mapper(source) {
for await (const chunk of source) {
if (chunk === 'e') throw new Error('kaboom');
Expand Down Expand Up @@ -286,17 +263,17 @@ async function transformByThrowPriorToForAwait() {
strictEqual(err.message, 'kaboom');
}));

read.pipe(stream);
read.pipe(stream).resume();
}

Promise.all([
transformBy(),
transformByFuncReturnsObjectWithSymbolAsyncIterator(),
transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidmarkclements: I don't think this test is needed. Returning a non promise is fine?

transformByObjReturnedWSymbolAsyncIteratorWithNoNext(),
transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction(),
transformByFuncReturnsObjectWithoutSymbolAsyncIterator(),
transformByEncoding(),
// NOTE: This doesn't make sense for iterable? Is it consistent with Readable.from?
// transformByEncoding(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this test, tests for?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in order to have an analogue to (chunk, enc, cb) => { } we add an encoding property for cases where transform streams behave differently depending on the encoding.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is taken into account in Readable.from and since we are doing objectMode I'm still unsure what the semantics here should be. I need to think about this a bit more unless someone can clarify?

transformBySourceIteratorCompletes(),
transformByYieldPlusReturn(),
transformByReturnEndsStream(),
Expand Down