|  | @@ -35,6 +35,7 @@
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  var fs = require('fs');
 | 
	
		
			
				|  |  |  var path = require('path');
 | 
	
		
			
				|  |  | +var async = require('async');
 | 
	
		
			
				|  |  |  var _ = require('lodash');
 | 
	
		
			
				|  |  |  var grpc = require('..');
 | 
	
		
			
				|  |  |  var testProto = grpc.load({
 | 
	
	
		
			
				|  | @@ -86,6 +87,22 @@ function getEchoTrailer(call) {
 | 
	
		
			
				|  |  |    return response_trailer;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +/**
 | 
	
		
			
				|  |  | + * @typedef Payload
 | 
	
		
			
				|  |  | + * @type {object}
 | 
	
		
			
				|  |  | + * @property {string} payload_type The payload type
 | 
	
		
			
				|  |  | + * @property {Buffer} body The payload body
 | 
	
		
			
				|  |  | + */
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +/**
 | 
	
		
			
				|  |  | + * Get a payload of the specified type and size. If the requested payload is
 | 
	
		
			
				|  |  | + * COMPRESSABLE, it returns a zero buffer. If the type is UNCOMRESSABLE, it
 | 
	
		
			
				|  |  | + * returns a slice of pre-loaded uncompressable data. If the type is RANDOM,
 | 
	
		
			
				|  |  | + * it returns one of the other choices, chosen at random.
 | 
	
		
			
				|  |  | + * @param {string} payload_type The type of payload to return
 | 
	
		
			
				|  |  | + * @param {Number} size The size of the payload body
 | 
	
		
			
				|  |  | + * @return {Payload} The requested payload
 | 
	
		
			
				|  |  | + */
 | 
	
		
			
				|  |  |  function getPayload(payload_type, size) {
 | 
	
		
			
				|  |  |    if (payload_type === 'RANDOM') {
 | 
	
		
			
				|  |  |      payload_type = ['COMPRESSABLE',
 | 
	
	
		
			
				|  | @@ -99,6 +116,15 @@ function getPayload(payload_type, size) {
 | 
	
		
			
				|  |  |    return {type: payload_type, body: body};
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +function respondWithStream(call, request, callback) {
 | 
	
		
			
				|  |  | +  async.eachSeries(request.response_parameters, function(resp_param, callback) {
 | 
	
		
			
				|  |  | +    setTimeout(function() {
 | 
	
		
			
				|  |  | +      call.write({payload: getPayload(request.response_type, resp_param.size)});
 | 
	
		
			
				|  |  | +      callback();
 | 
	
		
			
				|  |  | +    }, resp_param.interval_us/1000);
 | 
	
		
			
				|  |  | +  }, callback);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  /**
 | 
	
		
			
				|  |  |   * Respond to an empty parameter with an empty response.
 | 
	
		
			
				|  |  |   * NOTE: this currently does not work due to issue #137
 | 
	
	
		
			
				|  | @@ -162,10 +188,13 @@ function handleStreamingOutput(call) {
 | 
	
		
			
				|  |  |      call.emit('error', status);
 | 
	
		
			
				|  |  |      return;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  _.each(req.response_parameters, function(resp_param) {
 | 
	
		
			
				|  |  | -    call.write({payload: getPayload(req.response_type, resp_param.size)});
 | 
	
		
			
				|  |  | +  respondWithStream(call, req, function(err) {
 | 
	
		
			
				|  |  | +    if (err) {
 | 
	
		
			
				|  |  | +      call.emit(err);
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      call.end(getEchoTrailer(call));
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    });
 | 
	
		
			
				|  |  | -  call.end(getEchoTrailer(call));
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /**
 | 
	
	
		
			
				|  | @@ -175,6 +204,7 @@ function handleStreamingOutput(call) {
 | 
	
		
			
				|  |  |   */
 | 
	
		
			
				|  |  |  function handleFullDuplex(call) {
 | 
	
		
			
				|  |  |    echoHeader(call);
 | 
	
		
			
				|  |  | +  var call_ended;
 | 
	
		
			
				|  |  |    call.on('data', function(value) {
 | 
	
		
			
				|  |  |      if (value.response_status) {
 | 
	
		
			
				|  |  |        var status = value.response_status;
 | 
	
	
		
			
				|  | @@ -182,12 +212,17 @@ function handleFullDuplex(call) {
 | 
	
		
			
				|  |  |        call.emit('error', status);
 | 
	
		
			
				|  |  |        return;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    _.each(value.response_parameters, function(resp_param) {
 | 
	
		
			
				|  |  | -      call.write({payload: getPayload(value.response_type, resp_param.size)});
 | 
	
		
			
				|  |  | +    call.pause();
 | 
	
		
			
				|  |  | +    respondWithStream(call, value, function(err) {
 | 
	
		
			
				|  |  | +      call.resume();
 | 
	
		
			
				|  |  | +      if (call_ended) {
 | 
	
		
			
				|  |  | +        call.end(getEchoTrailer(call));
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  |      });
 | 
	
		
			
				|  |  |    });
 | 
	
		
			
				|  |  |    call.on('end', function() {
 | 
	
		
			
				|  |  | -    call.end(getEchoTrailer(call));
 | 
	
		
			
				|  |  | +    call_ended = true;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    });
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 |