| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- /*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
- var grpc = require('bindings')('grpc.node');
- var common = require('./common');
- var Duplex = require('stream').Duplex;
- var util = require('util');
- util.inherits(GrpcClientStream, Duplex);
- /**
- * Class for representing a gRPC client side stream as a Node stream. Extends
- * from stream.Duplex.
- * @constructor
- * @param {grpc.Call} call Call object to proxy
- * @param {object} options Stream options
- */
- function GrpcClientStream(call, options) {
- Duplex.call(this, options);
- var self = this;
- var finished = false;
- // Indicates that a read is currently pending
- var reading = false;
- // Indicates that a write is currently pending
- var writing = false;
- this._call = call;
- /**
- * Callback to be called when a READ event is received. Pushes the data onto
- * the read queue and starts reading again if applicable
- * @param {grpc.Event} event READ event object
- */
- function readCallback(event) {
- if (finished) {
- self.push(null);
- return;
- }
- var data = event.data;
- if (self.push(data) && data != null) {
- self._call.startRead(readCallback);
- } else {
- reading = false;
- }
- }
- call.invoke(function(event) {
- self.emit('metadata', event.data);
- }, function(event) {
- finished = true;
- self.emit('status', event.data);
- }, 0);
- this.on('finish', function() {
- call.writesDone(function() {});
- });
- /**
- * Start reading if there is not already a pending read. Reading will
- * continue until self.push returns false (indicating reads should slow
- * down) or the read data is null (indicating that there is no more data).
- */
- this.startReading = function() {
- if (finished) {
- self.push(null);
- } else {
- if (!reading) {
- reading = true;
- self._call.startRead(readCallback);
- }
- }
- };
- }
- /**
- * Start reading. This is an implementation of a method needed for implementing
- * stream.Readable.
- * @param {number} size Ignored
- */
- GrpcClientStream.prototype._read = function(size) {
- this.startReading();
- };
- /**
- * Attempt to write the given chunk. Calls the callback when done. This is an
- * implementation of a method needed for implementing stream.Writable.
- * @param {Buffer} chunk The chunk to write
- * @param {string} encoding Ignored
- * @param {function(Error=)} callback Ignored
- */
- GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
- var self = this;
- self._call.startWrite(chunk, function(event) {
- callback();
- }, 0);
- };
- /**
- * Make a request on the channel to the given method with the given arguments
- * @param {grpc.Channel} channel The channel on which to make the request
- * @param {string} method The method to request
- * @param {array=} metadata Array of metadata key/value pairs to add to the call
- * @param {(number|Date)=} deadline The deadline for processing this request.
- * Defaults to infinite future.
- * @return {stream=} The stream of responses
- */
- function makeRequest(channel,
- method,
- metadata,
- deadline) {
- if (deadline === undefined) {
- deadline = Infinity;
- }
- var call = new grpc.Call(channel, method, deadline);
- if (metadata) {
- call.addMetadata(metadata);
- }
- return new GrpcClientStream(call);
- }
- /**
- * See documentation for makeRequest above
- */
- exports.makeRequest = makeRequest;
- /**
- * Represents a client side gRPC channel associated with a single host.
- */
- exports.Channel = grpc.Channel;
- /**
- * Status name to code number mapping
- */
- exports.status = grpc.status;
- /**
- * Call error name to code number mapping
- */
- exports.callError = grpc.callError;
|