Skip to content

Commit

Permalink
lib: add readAsSSE() method
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonTian committed Sep 20, 2023
1 parent f7cbc35 commit f315ebc
Show file tree
Hide file tree
Showing 5 changed files with 398 additions and 41 deletions.
9 changes: 9 additions & 0 deletions lib/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,12 @@ export interface Options {
export function request(url: string, options: Options): Promise<IncomingMessage>;

export function read(response: IncomingMessage, encoding: string): Promise<string|Buffer>;

export interface Event {
'data'?: string;
'id'?: string;
'event'?: string;
'retry'?: string;
}

export function readAsSSE(response: IncomingMessage): AsyncGenerator<Event, void, unknown>;
141 changes: 128 additions & 13 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@ const READ_TIMER = Symbol('TIMER::READ_TIMER');
const READ_TIME_OUT = Symbol('TIMER::READ_TIME_OUT');
const READ_TIMER_START_AT = Symbol('TIMER::READ_TIMER_START_AT');

/**
* Check the content-encoding header, and auto decompress it.
* @param {Readable} response http response
* @returns Readable
*/
function decompress(response) {
switch (response.headers['content-encoding']) {
// or, just use zlib.createUnzip() to handle both cases
case 'gzip':
return response.pipe(zlib.createGunzip());
case 'deflate':
return response.pipe(zlib.createInflate());
default:
return response;
}
}

var append = function (err, name, message) {
err.name = name + err.name;
err.message = `${message}. ${err.message}`;
Expand All @@ -29,7 +46,6 @@ const isNumber = function (num) {
};

exports.request = function (url, opts) {
// request(url)
opts || (opts = {});

const parsed = typeof url === 'string' ? parse(url) : url;
Expand Down Expand Up @@ -165,18 +181,7 @@ exports.request = function (url, opts) {
};

exports.read = function (response, encoding) {
var readable = response;
switch (response.headers['content-encoding']) {
// or, just use zlib.createUnzip() to handle both cases
case 'gzip':
readable = response.pipe(zlib.createGunzip());
break;
case 'deflate':
readable = response.pipe(zlib.createInflate());
break;
default:
break;
}
const readable = decompress(response);

return new Promise((resolve, reject) => {
// node.js 14 use response.client
Expand Down Expand Up @@ -254,3 +259,113 @@ exports.read = function (response, encoding) {
readable.on('end', onEnd);
});
};

function readyToRead(readable) {
return new Promise((resolve, reject) => {
var onReadable, onEnd, onError;
var cleanup = function () {
// cleanup
readable.removeListener('error', onError);
readable.removeListener('end', onEnd);
readable.removeListener('readable', onReadable);
}

onReadable = function () {
cleanup();
resolve(false);
};

onEnd = function () {
cleanup();
resolve(true);
}

onError = function (err) {
cleanup();
reject(err);
}

readable.once('readable', onReadable);
readable.once('end', onEnd);
readable.once('error', onError);
});
}

class Event {
constructor(id, event, data, retry) {
this.id = id;
this.event = event;
this.data = data;
this.retry = retry;
}
}

exports.Event = Event;

const DATA_PREFIX = 'data: ';
const EVENT_PREFIX = 'event: ';
const ID_PREFIX = 'id: ';
const RETRY_PREFIX = 'retry: ';

function tryGetEvents(head, chunk) {
const all = head + chunk;
let start = 0;
const events = [];
for (let i = 0; i < all.length - 1; i++) {
const c = all[i];
const c2 = all[i + 1];
if (c === '\n' && c2 === '\n') {
const part = all.substring(start, i);
const lines = part.split('\n');
const event = new Event();
lines.forEach((line) => {
if (line.startsWith(DATA_PREFIX)) {
event.data = line.substring(DATA_PREFIX.length);
} else if (line.startsWith(EVENT_PREFIX)) {
event.event = line.substring(EVENT_PREFIX.length);
} else if (line.startsWith(ID_PREFIX)) {
event.id = line.substring(ID_PREFIX.length);
} else if (line.startsWith(RETRY_PREFIX)) {
event.retry = line.substring(RETRY_PREFIX.length);
}
});
events.push(event);
start = i + 2;
}
}

const rest = all.substring(start);
return [events, rest];
}

/**
* consume response and parse to event stream
* @param {ReadableStream} response
* @returns AsyncGenerator<Event, void, unknown>
*/
exports.readAsSSE = async function* (response) {
const readable = decompress(response);

const socket = response.socket || response.client;
clearTimeout(socket[READ_TIMER]);

let rest = '';

while (true) {
const ended = await readyToRead(readable);
if (ended) {
return;
}

let chunk;
while (null !== (chunk = readable.read())) {
const [ events, remain ] = tryGetEvents(rest, chunk.toString());
rest = remain;
if (events && events.length > 0) {
for (const event of events) {
yield event;
}
}
}
}
};
Loading

0 comments on commit f315ebc

Please sign in to comment.