@@ -284,77 +284,164 @@ Readable.prototype[SymbolAsyncDispose] = function() {
284284// similar to how Writable.write() returns true if you should
285285// write() some more.
286286Readable.prototype.push = function(chunk, encoding) {
287- return readableAddChunk ( this , chunk , encoding , false ) ;
287+ debug('push', chunk);
288+
289+ const state = this._readableState;
290+ return (state[kState] & kObjectMode) === 0 ?
291+ readableAddChunkPushByteMode(this, state, chunk, encoding) :
292+ readableAddChunkPushObjectMode(this, state, chunk, encoding);
288293};
289294
290295// Unshift should *always* be something directly out of read().
291296Readable.prototype.unshift = function(chunk, encoding) {
292- return readableAddChunk ( this , chunk , encoding , true ) ;
297+ debug('unshift', chunk);
298+ const state = this._readableState;
299+ return (state[kState] & kObjectMode) === 0 ?
300+ readableAddChunkUnshiftByteMode(this, state, chunk, encoding) :
301+ readableAddChunkUnshiftObjectMode(this, state, chunk);
293302};
294303
295- function readableAddChunk ( stream , chunk , encoding , addToFront ) {
296- debug ( 'readableAddChunk' , chunk ) ;
297- const state = stream . _readableState ;
298304
299- let err ;
300- if ( ( state [ kState ] & kObjectMode ) === 0 ) {
301- if ( typeof chunk === 'string' ) {
302- encoding = encoding || state . defaultEncoding ;
303- if ( state . encoding !== encoding ) {
304- if ( addToFront && state . encoding ) {
305- // When unshifting, if state.encoding is set, we have to save
306- // the string in the BufferList with the state encoding.
307- chunk = Buffer . from ( chunk , encoding ) . toString ( state . encoding ) ;
308- } else {
309- chunk = Buffer . from ( chunk , encoding ) ;
310- encoding = '' ;
311- }
305+ function readableAddChunkUnshiftByteMode(stream, state, chunk, encoding) {
306+ if (chunk === null) {
307+ state[kState] &= ~kReading;
308+ onEofChunk(stream, state);
309+
310+ return false;
311+ }
312+
313+ if (typeof chunk === 'string') {
314+ encoding = encoding || state.defaultEncoding;
315+ if (state.encoding !== encoding) {
316+ if (state.encoding) {
317+ // When unshifting, if state.encoding is set, we have to save
318+ // the string in the BufferList with the state encoding.
319+ chunk = Buffer.from(chunk, encoding).toString(state.encoding);
320+ } else {
321+ chunk = Buffer.from(chunk, encoding);
312322 }
313- } else if ( chunk instanceof Buffer ) {
314- encoding = '' ;
315- } else if ( Stream . _isUint8Array ( chunk ) ) {
316- chunk = Stream . _uint8ArrayToBuffer ( chunk ) ;
317- encoding = '' ;
318- } else if ( chunk != null ) {
319- err = new ERR_INVALID_ARG_TYPE (
320- 'chunk' , [ 'string' , 'Buffer' , 'Uint8Array' ] , chunk ) ;
321323 }
324+ } else if (Stream._isUint8Array(chunk)) {
325+ chunk = Stream._uint8ArrayToBuffer(chunk);
326+ } else if (chunk !== undefined && !(chunk instanceof Buffer)) {
327+ errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE(
328+ 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk));
329+ return false;
322330 }
323331
324- if ( err ) {
325- errorOrDestroy ( stream , err ) ;
326- } else if ( chunk === null ) {
332+
333+ if (!(chunk && chunk.length > 0)) {
334+ return canPushMore(state);
335+ }
336+
337+ return readableAddChunkUnshiftValue(stream, state, chunk);
338+ }
339+
340+ function readableAddChunkUnshiftObjectMode(stream, state, chunk) {
341+ if (chunk === null) {
327342 state[kState] &= ~kReading;
328343 onEofChunk(stream, state);
329- } else if ( ( ( state [ kState ] & kObjectMode ) !== 0 ) || ( chunk && chunk . length > 0 ) ) {
330- if ( addToFront ) {
331- if ( ( state [ kState ] & kEndEmitted ) !== 0 )
332- errorOrDestroy ( stream , new ERR_STREAM_UNSHIFT_AFTER_END_EVENT ( ) ) ;
333- else if ( state . destroyed || state . errored )
334- return false ;
335- else
336- addChunk ( stream , state , chunk , true ) ;
337- } else if ( state . ended ) {
338- errorOrDestroy ( stream , new ERR_STREAM_PUSH_AFTER_EOF ( ) ) ;
339- } else if ( state . destroyed || state . errored ) {
340- return false ;
341- } else {
342- state [ kState ] &= ~ kReading ;
343- if ( state . decoder && ! encoding ) {
344- chunk = state . decoder . write ( chunk ) ;
345- if ( state . objectMode || chunk . length !== 0 )
346- addChunk ( stream , state , chunk , false ) ;
347- else
348- maybeReadMore ( stream , state ) ;
349- } else {
350- addChunk ( stream , state , chunk , false ) ;
351- }
344+
345+ return false;
346+ }
347+
348+ return readableAddChunkUnshiftValue(stream, state, chunk);
349+ }
350+
351+ function readableAddChunkUnshiftValue(stream, state, chunk) {
352+ if ((state[kState] & kEndEmitted) !== 0)
353+ errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
354+ else if (state.destroyed || state.errored)
355+ return false;
356+ else
357+ addChunk(stream, state, chunk, true);
358+
359+ return canPushMore(state);
360+ }
361+
362+ function readableAddChunkPushByteMode(stream, state, chunk, encoding) {
363+ if (chunk === null) {
364+ state[kState] &= ~kReading;
365+ onEofChunk(stream, state);
366+
367+ return false;
368+ }
369+
370+ if (typeof chunk === 'string') {
371+ encoding = encoding || state.defaultEncoding;
372+ if (state.encoding !== encoding) {
373+ chunk = Buffer.from(chunk, encoding);
374+ encoding = '';
352375 }
353- } else if ( ! addToFront ) {
376+ } else if (chunk instanceof Buffer) {
377+ encoding = '';
378+ } else if (Stream._isUint8Array(chunk)) {
379+ chunk = Stream._uint8ArrayToBuffer(chunk);
380+ encoding = '';
381+ } else if (chunk !== undefined) {
382+ errorOrDestroy(stream, new ERR_INVALID_ARG_TYPE(
383+ 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk));
384+ return false;
385+ }
386+
387+ if (!chunk || chunk.length <= 0) {
354388 state[kState] &= ~kReading;
355389 maybeReadMore(stream, state);
390+
391+ return canPushMore(state);
392+ }
393+
394+ if (state.ended) {
395+ errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
396+
397+ return false;
398+ }
399+
400+ if (state.destroyed || state.errored) {
401+ return false;
402+ }
403+
404+ state[kState] &= ~kReading;
405+ if (state.decoder && !encoding) {
406+ chunk = state.decoder.write(chunk);
407+ if (chunk.length === 0) {
408+ maybeReadMore(stream, state);
409+
410+ return canPushMore(state);
411+ }
356412 }
357413
414+ addChunk(stream, state, chunk, false);
415+ return canPushMore(state);
416+ }
417+
418+ function readableAddChunkPushObjectMode(stream, state, chunk, encoding) {
419+ if (chunk === null) {
420+ state[kState] &= ~kReading;
421+ onEofChunk(stream, state);
422+
423+ return false;
424+ }
425+
426+ if (state.ended) {
427+ errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
428+ return false;
429+ }
430+
431+ if (state.destroyed || state.errored) {
432+ return false;
433+ }
434+
435+ state[kState] &= ~kReading;
436+ if (state.decoder && !encoding) {
437+ chunk = state.decoder.write(chunk);
438+ }
439+
440+ addChunk(stream, state, chunk, false);
441+ return canPushMore(state);
442+ }
443+
444+ function canPushMore(state) {
358445 // We can push more data if we are below the highWaterMark.
359446 // Also, if we have no data yet, we can stand some more bytes.
360447 // This is to work around cases where hwm=0, such as the repl.
0 commit comments