|  | @@ -47,10 +47,21 @@ util.inherits(GrpcServerStream, Duplex);
 | 
	
		
			
				|  |  |   * from stream.Duplex.
 | 
	
		
			
				|  |  |   * @constructor
 | 
	
		
			
				|  |  |   * @param {grpc.Call} call Call object to proxy
 | 
	
		
			
				|  |  | - * @param {object} options Stream options
 | 
	
		
			
				|  |  | + * @param {function(*):Buffer} serialize Serialization function for responses
 | 
	
		
			
				|  |  | + * @param {function(Buffer):*} deserialize Deserialization function for requests
 | 
	
		
			
				|  |  |   */
 | 
	
		
			
				|  |  | -function GrpcServerStream(call, options) {
 | 
	
		
			
				|  |  | -  Duplex.call(this, options);
 | 
	
		
			
				|  |  | +function GrpcServerStream(call, serialize, deserialize) {
 | 
	
		
			
				|  |  | +  Duplex.call(this, {objectMode: true});
 | 
	
		
			
				|  |  | +  if (!serialize) {
 | 
	
		
			
				|  |  | +    serialize = function(value) {
 | 
	
		
			
				|  |  | +      return value;
 | 
	
		
			
				|  |  | +    };
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (!deserialize) {
 | 
	
		
			
				|  |  | +    deserialize = function(value) {
 | 
	
		
			
				|  |  | +      return value;
 | 
	
		
			
				|  |  | +    };
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |    this._call = call;
 | 
	
		
			
				|  |  |    // Indicate that a status has been sent
 | 
	
		
			
				|  |  |    var finished = false;
 | 
	
	
		
			
				|  | @@ -59,6 +70,33 @@ function GrpcServerStream(call, options) {
 | 
	
		
			
				|  |  |      'code' : grpc.status.OK,
 | 
	
		
			
				|  |  |      'details' : 'OK'
 | 
	
		
			
				|  |  |    };
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /**
 | 
	
		
			
				|  |  | +   * Serialize a response value to a buffer. Always maps null to null. Otherwise
 | 
	
		
			
				|  |  | +   * uses the provided serialize function
 | 
	
		
			
				|  |  | +   * @param {*} value The value to serialize
 | 
	
		
			
				|  |  | +   * @return {Buffer} The serialized value
 | 
	
		
			
				|  |  | +   */
 | 
	
		
			
				|  |  | +  this.serialize = function(value) {
 | 
	
		
			
				|  |  | +    if (value === null || value === undefined) {
 | 
	
		
			
				|  |  | +      return null;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    return serialize(value);
 | 
	
		
			
				|  |  | +  };
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  /**
 | 
	
		
			
				|  |  | +   * Deserialize a request buffer to a value. Always maps null to null.
 | 
	
		
			
				|  |  | +   * Otherwise uses the provided deserialize function.
 | 
	
		
			
				|  |  | +   * @param {Buffer} buffer The buffer to deserialize
 | 
	
		
			
				|  |  | +   * @return {*} The deserialized value
 | 
	
		
			
				|  |  | +   */
 | 
	
		
			
				|  |  | +  this.deserialize = function(buffer) {
 | 
	
		
			
				|  |  | +    if (buffer === null) {
 | 
	
		
			
				|  |  | +      return null;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    return deserialize(buffer);
 | 
	
		
			
				|  |  | +  };
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    /**
 | 
	
		
			
				|  |  |     * Send the pending status
 | 
	
		
			
				|  |  |     */
 | 
	
	
		
			
				|  | @@ -75,7 +113,6 @@ function GrpcServerStream(call, options) {
 | 
	
		
			
				|  |  |     * @param {Error} err The error object
 | 
	
		
			
				|  |  |     */
 | 
	
		
			
				|  |  |    function setStatus(err) {
 | 
	
		
			
				|  |  | -    console.log('Server setting status to', err);
 | 
	
		
			
				|  |  |      var code = grpc.status.INTERNAL;
 | 
	
		
			
				|  |  |      var details = 'Unknown Error';
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -113,7 +150,7 @@ function GrpcServerStream(call, options) {
 | 
	
		
			
				|  |  |        return;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |      var data = event.data;
 | 
	
		
			
				|  |  | -    if (self.push(data) && data != null) {
 | 
	
		
			
				|  |  | +    if (self.push(deserialize(data)) && data != null) {
 | 
	
		
			
				|  |  |        self._call.startRead(readCallback);
 | 
	
		
			
				|  |  |      } else {
 | 
	
		
			
				|  |  |        reading = false;
 | 
	
	
		
			
				|  | @@ -155,7 +192,7 @@ GrpcServerStream.prototype._read = function(size) {
 | 
	
		
			
				|  |  |   */
 | 
	
		
			
				|  |  |  GrpcServerStream.prototype._write = function(chunk, encoding, callback) {
 | 
	
		
			
				|  |  |    var self = this;
 | 
	
		
			
				|  |  | -  self._call.startWrite(chunk, function(event) {
 | 
	
		
			
				|  |  | +  self._call.startWrite(self.serialize(chunk), function(event) {
 | 
	
		
			
				|  |  |      callback();
 | 
	
		
			
				|  |  |    }, 0);
 | 
	
		
			
				|  |  |  };
 | 
	
	
		
			
				|  | @@ -211,12 +248,13 @@ function Server(options) {
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |        }, 0);
 | 
	
		
			
				|  |  |        call.serverEndInitialMetadata(0);
 | 
	
		
			
				|  |  | -      var stream = new GrpcServerStream(call);
 | 
	
		
			
				|  |  | +      var stream = new GrpcServerStream(call, handler.serialize,
 | 
	
		
			
				|  |  | +                                        handler.deserialize);
 | 
	
		
			
				|  |  |        Object.defineProperty(stream, 'cancelled', {
 | 
	
		
			
				|  |  |          get: function() { return cancelled;}
 | 
	
		
			
				|  |  |        });
 | 
	
		
			
				|  |  |        try {
 | 
	
		
			
				|  |  | -        handler(stream, data.metadata);
 | 
	
		
			
				|  |  | +        handler.func(stream, data.metadata);
 | 
	
		
			
				|  |  |        } catch (e) {
 | 
	
		
			
				|  |  |          stream.emit('error', e);
 | 
	
		
			
				|  |  |        }
 | 
	
	
		
			
				|  | @@ -237,14 +275,20 @@ function Server(options) {
 | 
	
		
			
				|  |  |   *     handle/respond to.
 | 
	
		
			
				|  |  |   * @param {function} handler Function that takes a stream of request values and
 | 
	
		
			
				|  |  |   *     returns a stream of response values
 | 
	
		
			
				|  |  | + * @param {function(*):Buffer} serialize Serialization function for responses
 | 
	
		
			
				|  |  | + * @param {function(Buffer):*} deserialize Deserialization function for requests
 | 
	
		
			
				|  |  |   * @return {boolean} True if the handler was set. False if a handler was already
 | 
	
		
			
				|  |  |   *     set for that name.
 | 
	
		
			
				|  |  |   */
 | 
	
		
			
				|  |  | -Server.prototype.register = function(name, handler) {
 | 
	
		
			
				|  |  | +Server.prototype.register = function(name, handler, serialize, deserialize) {
 | 
	
		
			
				|  |  |    if (this.handlers.hasOwnProperty(name)) {
 | 
	
		
			
				|  |  |      return false;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  this.handlers[name] = handler;
 | 
	
		
			
				|  |  | +  this.handlers[name] = {
 | 
	
		
			
				|  |  | +    func: handler,
 | 
	
		
			
				|  |  | +    serialize: serialize,
 | 
	
		
			
				|  |  | +    deserialize: deserialize
 | 
	
		
			
				|  |  | +  };
 | 
	
		
			
				|  |  |    return true;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 |