i understand how use writable streams in node's new streams2
library, don't understand how use readable streams.
take, example, stream wrapper around dgram
module:
var dgram = require('dgram'); var thumbs = { twiddle: function() {} }; var defaults = { address: '0.0.0.0', type: 'udp4', port: 12345, broadcast: null, multicast: null, multicastttl: 1 }; var udpstream = function(options) { if (!(this instanceof udpstream)) return new udpstream(options); duplex.call(this); options = options || {}; this.address = options.address || defaults.address; this.type = options.type || defaults.type; this.port = options.port || defaults.port; this.broadcast = options.broadcast || defaults.broadcast; this.multicast = options.multicast || defaults.multicast; this.multicastttl = options.multicastttl || defaults.multicastttl; this._socket = dgram.createsocket(this.type, setup.bind(this)); this._socket.on('message', this.push.bind(this)); }; util.inherits(udpstream, duplex); var setup = function() { if (this.multicast) { this._socket.addmembership(this.multicast); this._socket.setmulticastttl(this.multicastttl); this.destination = this.multicast; } else { // default using broadcast if multicast address not specified. this._socket.setbroadcast(true); // todo: default broadcast address os.networkinterfaces() (not returned) this.destination = this.broadcast || '255.255.255.255'; } }; udpstream.prototype._read = function(size) { thumbs.twiddle(); }; udpstream.prototype._write = function(chunk, encoding, callback) { this._socket.send(chunk, 0, chunk.length, this.port, this.destination); callback(); }; module.exports = udpstream;
everything makes sense except _read
implementation. it's literally twiddling thumbs because don't understand i'm supposed there. data pushed when udp socket emits new message, have no way of pausing or resuming underlying resource. should like?
_read part of pause resume mechanism. nodejs api docs
when data available, put read queue calling readable.push(chunk). if push returns false, should stop reading. when _read called again, should start pushing more data.
so in _write function, if socket.send
call fails either returning false or calling callback error should pause stream. _read
can simple this._paused = false
might this.
udpstream.prototype._read = function() { this._paused = false; } udpstream.prototype._write = function(chunk, encoding, callback) { if(!this._paused) this._socket.send(chunk, 0, chunk.length, this.port, this.destination); };
Comments
Post a Comment