Skip to content

Commit

Permalink
stream: Expose DuplexPair API
Browse files Browse the repository at this point in the history
  • Loading branch information
awwright committed Jul 1, 2020
1 parent 6213fce commit 0c116a9
Show file tree
Hide file tree
Showing 27 changed files with 180 additions and 122 deletions.
24 changes: 23 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ There are four fundamental stream types within Node.js:
is written and read (for example, [`zlib.createDeflate()`][]).

Additionally, this module includes the utility functions
[`stream.pipeline()`][], [`stream.finished()`][] and
[`DuplexPair`][],
[`stream.pipeline()`][],
[`stream.finished()`][], and
[`stream.Readable.from()`][].

### Object mode
Expand Down Expand Up @@ -1508,6 +1510,25 @@ unless `emitClose` is set in false.
Once `destroy()` has been called any further calls will be a noop and no
further errors except from `_destroy` may be emitted as `'error'`.

#### Class: `stream.DuplexPair`
<!-- YAML
added: REPLACEME
-->

The utility class `DuplexPair` returns an iterable object with two `Duplex`
streams on properties `0` and `1`, each connected to the other side:

```js
const [ clientSide, serverSide ] = new DuplexPair();
```

Whatever is written to one stream is made readable on the other. It provides
behavior analagous to a network connection, where the data which a client writes
to its socket becomes readable on the server's socket.

The Duplex streams are symmetrical; one or the other may be used without any
difference in behavior.

### `stream.finished(stream[, options], callback)`
<!-- YAML
added: v10.0.0
Expand Down Expand Up @@ -3031,6 +3052,7 @@ contain multi-byte characters.
[`'finish'`]: #stream_event_finish
[`'readable'`]: #stream_event_readable
[`Duplex`]: #stream_class_stream_duplex
[`DuplexPair`]: #stream_class_stream_duplexpair
[`EventEmitter`]: events.html#events_class_eventemitter
[`Readable`]: #stream_class_stream_readable
[`Symbol.hasInstance`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/hasInstance
Expand Down
23 changes: 11 additions & 12 deletions lib/internal/streams/duplexpair.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
'use strict';

const {
Symbol,
} = primordials;

const { Duplex } = require('stream');
const assert = require('internal/assert');

const { Symbol, Array } = primordials;
const kCallback = Symbol('Callback');
const kOtherSide = Symbol('Other');

class DuplexSocket extends Duplex {
class DuplexSide extends Duplex {
constructor() {
super();
this[kCallback] = null;
Expand All @@ -25,6 +22,8 @@ class DuplexSocket extends Duplex {
}

_write(chunk, encoding, callback) {
assert(this[kOtherSide] !== null);
assert(this[kOtherSide][kCallback] === null);
if (chunk.length === 0) {
process.nextTick(callback);
} else {
Expand All @@ -39,13 +38,13 @@ class DuplexSocket extends Duplex {
}
}

class DuplexPair {
class DuplexPair extends Array {
constructor() {
this.socket1 = new DuplexSocket();
this.socket2 = new DuplexSocket();
this.socket1[kOtherSide] = this.socket2;
this.socket2[kOtherSide] = this.socket1;
super();
const side0 = this[0] = new DuplexSide();
const side1 = this[1] = new DuplexSide();
side0[kOtherSide] = side1;
side1[kOtherSide] = side0;
}
}

module.exports = DuplexPair;
1 change: 1 addition & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Stream.Writable = require('_stream_writable');
Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');
Stream.DuplexPair = require('internal/streams/duplexpair');

Stream.pipeline = pipeline;
Stream.finished = eos;
Expand Down
4 changes: 2 additions & 2 deletions lib/tls.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const { getRootCertificates, getSSLCiphers } = internalBinding('crypto');
const { Buffer } = require('buffer');
const EventEmitter = require('events');
const { URL } = require('internal/url');
const DuplexPair = require('internal/streams/duplexpair');
const { DuplexPair } = require('stream');
const { canonicalizeIP } = internalBinding('cares_wrap');
const _tls_common = require('_tls_common');
const _tls_wrap = require('_tls_wrap');
Expand Down Expand Up @@ -296,7 +296,7 @@ class SecurePair extends EventEmitter {
rejectUnauthorized = false,
options = {}) {
super();
const { socket1, socket2 } = new DuplexPair();
const [ socket1, socket2 ] = new DuplexPair();

this.server = options.server;
this.credentials = secureContext;
Expand Down
2 changes: 1 addition & 1 deletion node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
'lib/_stream_duplex.js',
'lib/_stream_transform.js',
'lib/_stream_passthrough.js',
'lib/_stream_duplexpair.js',
'lib/_stream_wrap.js',
'lib/string_decoder.js',
'lib/sys.js',
Expand Down Expand Up @@ -231,7 +232,6 @@
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/async_iterator.js',
'lib/internal/streams/buffer_list.js',
'lib/internal/streams/duplexpair.js',
'lib/internal/streams/from.js',
'lib/internal/streams/legacy.js',
'lib/internal/streams/destroy.js',
Expand Down
9 changes: 0 additions & 9 deletions test/common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ This directory contains modules used to test the Node.js implementation.
* [Countdown module](#countdown-module)
* [CPU Profiler module](#cpu-profiler-module)
* [DNS module](#dns-module)
* [Duplex pair helper](#duplex-pair-helper)
* [Environment variables](#environment-variables)
* [Fixtures module](#fixtures-module)
* [Heap dump checker module](#heap-dump-checker-module)
Expand Down Expand Up @@ -564,14 +563,6 @@ Reads a Domain String and returns a Buffer containing the domain.
Takes in a parsed Object and writes its fields to a DNS packet as a Buffer
object.

## Duplex pair helper

The `common/duplexpair` module exports a single function `makeDuplexPair`,
which returns an object `{ clientSide, serverSide }` where each side is a
`Duplex` stream connected to the other side.

There is no difference between client or server side beyond their names.

## Environment variables

The behavior of the Node.js test suite can be altered using the following
Expand Down
49 changes: 0 additions & 49 deletions test/common/duplexpair.js

This file was deleted.

4 changes: 2 additions & 2 deletions test/parallel/test-gc-tls-external-memory.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');

const makeDuplexPair = require('../common/duplexpair');
const { DuplexPair } = require('stream');
const onGC = require('../common/ongc');
const assert = require('assert');
const tls = require('tls');
Expand Down Expand Up @@ -37,7 +37,7 @@ function connect() {
return;
}

const { clientSide, serverSide } = makeDuplexPair();
const [ clientSide, serverSide ] = new DuplexPair();

const tlsSocket = tls.connect({ socket: clientSide });
tlsSocket.on('error', common.mustCall(connect));
Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-http-agent-domain-reused-gc.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const common = require('../common');
const http = require('http');
const async_hooks = require('async_hooks');
const makeDuplexPair = require('../common/duplexpair');
const { DuplexPair } = require('stream');

// Regression test for https://github.com/nodejs/node/issues/30122
// When a domain is attached to an http Agent’s ReusedHandle object, that
Expand Down Expand Up @@ -36,7 +36,7 @@ async_hooks.createHook({
// attached to too many objects that use strong references (timers, the network
// socket handle, etc.) and wrap the client side in a JSStreamSocket so we don’t
// have to implement the whole _handle API ourselves.
const { serverSide, clientSide } = makeDuplexPair();
const [ serverSide, clientSide ] = new DuplexPair();
const JSStreamSocket = require('internal/js_stream_socket');
const wrappedClientSide = new JSStreamSocket(clientSide);

Expand Down
12 changes: 6 additions & 6 deletions test/parallel/test-http-generic-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
const common = require('../common');
const assert = require('assert');
const http = require('http');
const MakeDuplexPair = require('../common/duplexpair');
const { DuplexPair } = require('stream');

// Test 1: Simple HTTP test, no keep-alive.
{
Expand All @@ -13,7 +13,7 @@ const MakeDuplexPair = require('../common/duplexpair');
res.end(testData);
}));

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = new DuplexPair();
server.emit('connection', serverSide);

const req = http.request({
Expand All @@ -37,7 +37,7 @@ const MakeDuplexPair = require('../common/duplexpair');
res.end(testData);
}, 2));

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = new DuplexPair();
server.emit('connection', serverSide);

function doRequest(cb) {
Expand Down Expand Up @@ -77,7 +77,7 @@ const MakeDuplexPair = require('../common/duplexpair');
});
}));

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = new DuplexPair();
server.emit('connection', serverSide);
clientSide.on('end', common.mustCall());
serverSide.on('end', common.mustCall());
Expand Down Expand Up @@ -117,7 +117,7 @@ const MakeDuplexPair = require('../common/duplexpair');

}));

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = new DuplexPair();
server.emit('connection', serverSide);
clientSide.on('end', common.mustCall());
serverSide.on('end', common.mustCall());
Expand All @@ -143,7 +143,7 @@ const MakeDuplexPair = require('../common/duplexpair');
{
const server = http.createServer(common.mustNotCall());

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = new DuplexPair();
server.emit('connection', serverSide);

server.on('clientError', common.mustCall());
Expand Down
10 changes: 5 additions & 5 deletions test/parallel/test-http-insecure-parser-per-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
const common = require('../common');
const assert = require('assert');
const http = require('http');
const MakeDuplexPair = require('../common/duplexpair');
const { DuplexPair } = require('stream');

// Test that setting the `maxHeaderSize` option works on a per-stream-basis.

// Test 1: The server sends an invalid header.
{
const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = new DuplexPair();

const req = http.request({
createConnection: common.mustCall(() => clientSide),
Expand All @@ -29,7 +29,7 @@ const MakeDuplexPair = require('../common/duplexpair');

// Test 2: The same as Test 1 except without the option, to make sure it fails.
{
const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = new DuplexPair();

const req = http.request({
createConnection: common.mustCall(() => clientSide)
Expand Down Expand Up @@ -57,7 +57,7 @@ const MakeDuplexPair = require('../common/duplexpair');

server.on('clientError', common.mustNotCall());

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = new DuplexPair();
serverSide.server = server;
server.emit('connection', serverSide);

Expand All @@ -72,7 +72,7 @@ const MakeDuplexPair = require('../common/duplexpair');

server.on('clientError', common.mustCall());

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = new DuplexPair();
serverSide.server = server;
server.emit('connection', serverSide);

Expand Down
10 changes: 5 additions & 5 deletions test/parallel/test-http-max-header-size-per-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
const common = require('../common');
const assert = require('assert');
const http = require('http');
const MakeDuplexPair = require('../common/duplexpair');
const { DuplexPair } = require('stream');

// Test that setting the `maxHeaderSize` option works on a per-stream-basis.

// Test 1: The server sends larger headers than what would otherwise be allowed.
{
const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = new DuplexPair();

const req = http.request({
createConnection: common.mustCall(() => clientSide),
Expand All @@ -29,7 +29,7 @@ const MakeDuplexPair = require('../common/duplexpair');

// Test 2: The same as Test 1 except without the option, to make sure it fails.
{
const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = new DuplexPair();

const req = http.request({
createConnection: common.mustCall(() => clientSide)
Expand Down Expand Up @@ -57,7 +57,7 @@ const MakeDuplexPair = require('../common/duplexpair');

server.on('clientError', common.mustNotCall());

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = new DuplexPair();
serverSide.server = server;
server.emit('connection', serverSide);

Expand All @@ -72,7 +72,7 @@ const MakeDuplexPair = require('../common/duplexpair');

server.on('clientError', common.mustCall());

const { clientSide, serverSide } = MakeDuplexPair();
const [ clientSide, serverSide ] = new DuplexPair();
serverSide.server = server;
server.emit('connection', serverSide);

Expand Down
Loading

0 comments on commit 0c116a9

Please sign in to comment.