|
@@ -28,7 +28,7 @@
|
|
|
<article>
|
|
|
<pre class="prettyprint source linenums"><code>/*
|
|
|
*
|
|
|
- * Copyright 2015-2016, Google Inc.
|
|
|
+ * Copyright 2015, Google Inc.
|
|
|
* All rights reserved.
|
|
|
*
|
|
|
* Redistribution and use in source and binary forms, with or without
|
|
@@ -78,6 +78,7 @@
|
|
|
'use strict';
|
|
|
|
|
|
var _ = require('lodash');
|
|
|
+var arguejs = require('arguejs');
|
|
|
|
|
|
var grpc = require('./grpc_extension');
|
|
|
|
|
@@ -159,8 +160,71 @@ function ClientReadableStream(call, deserialize) {
|
|
|
this.finished = false;
|
|
|
this.reading = false;
|
|
|
this.deserialize = common.wrapIgnoreNull(deserialize);
|
|
|
+ /* Status generated from reading messages from the server. Overrides the
|
|
|
+ * status from the server if not OK */
|
|
|
+ this.read_status = null;
|
|
|
+ /* Status received from the server. */
|
|
|
+ this.received_status = null;
|
|
|
}
|
|
|
|
|
|
+/**
|
|
|
+ * Called when all messages from the server have been processed. The status
|
|
|
+ * parameter indicates that the call should end with that status. status
|
|
|
+ * defaults to OK if not provided.
|
|
|
+ * @param {Object!} status The status that the call should end with
|
|
|
+ */
|
|
|
+function _readsDone(status) {
|
|
|
+ /* jshint validthis: true */
|
|
|
+ if (!status) {
|
|
|
+ status = {code: grpc.status.OK, details: 'OK'};
|
|
|
+ }
|
|
|
+ if (status.code !== grpc.status.OK) {
|
|
|
+ this.call.cancelWithStatus(status.code, status.details);
|
|
|
+ }
|
|
|
+ this.finished = true;
|
|
|
+ this.read_status = status;
|
|
|
+ this._emitStatusIfDone();
|
|
|
+}
|
|
|
+
|
|
|
+ClientReadableStream.prototype._readsDone = _readsDone;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Called to indicate that we have received a status from the server.
|
|
|
+ */
|
|
|
+function _receiveStatus(status) {
|
|
|
+ /* jshint validthis: true */
|
|
|
+ this.received_status = status;
|
|
|
+ this._emitStatusIfDone();
|
|
|
+}
|
|
|
+
|
|
|
+ClientReadableStream.prototype._receiveStatus = _receiveStatus;
|
|
|
+
|
|
|
+/**
|
|
|
+ * If we have both processed all incoming messages and received the status from
|
|
|
+ * the server, emit the status. Otherwise, do nothing.
|
|
|
+ */
|
|
|
+function _emitStatusIfDone() {
|
|
|
+ /* jshint validthis: true */
|
|
|
+ var status;
|
|
|
+ if (this.read_status && this.received_status) {
|
|
|
+ if (this.read_status.code !== grpc.status.OK) {
|
|
|
+ status = this.read_status;
|
|
|
+ } else {
|
|
|
+ status = this.received_status;
|
|
|
+ }
|
|
|
+ this.emit('status', status);
|
|
|
+ if (status.code !== grpc.status.OK) {
|
|
|
+ var error = new Error(status.details);
|
|
|
+ error.code = status.code;
|
|
|
+ error.metadata = status.metadata;
|
|
|
+ this.emit('error', error);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+ClientReadableStream.prototype._emitStatusIfDone = _emitStatusIfDone;
|
|
|
+
|
|
|
/**
|
|
|
* Read the next object from the stream.
|
|
|
* @access private
|
|
@@ -178,6 +242,7 @@ function _read(size) {
|
|
|
if (err) {
|
|
|
// Something has gone wrong. Stop reading and wait for status
|
|
|
self.finished = true;
|
|
|
+ self._readsDone();
|
|
|
return;
|
|
|
}
|
|
|
var data = event.read;
|
|
@@ -185,8 +250,11 @@ function _read(size) {
|
|
|
try {
|
|
|
deserialized = self.deserialize(data);
|
|
|
} catch (e) {
|
|
|
- self.call.cancelWithStatus(grpc.status.INTERNAL,
|
|
|
- 'Failed to parse server response');
|
|
|
+ self._readsDone({code: grpc.status.INTERNAL,
|
|
|
+ details: 'Failed to parse server response'});
|
|
|
+ }
|
|
|
+ if (data === null) {
|
|
|
+ self._readsDone();
|
|
|
}
|
|
|
if (self.push(deserialized) && data !== null) {
|
|
|
var read_batch = {};
|
|
@@ -226,6 +294,11 @@ function ClientDuplexStream(call, serialize, deserialize) {
|
|
|
this.serialize = common.wrapIgnoreNull(serialize);
|
|
|
this.deserialize = common.wrapIgnoreNull(deserialize);
|
|
|
this.call = call;
|
|
|
+ /* Status generated from reading messages from the server. Overrides the
|
|
|
+ * status from the server if not OK */
|
|
|
+ this.read_status = null;
|
|
|
+ /* Status received from the server. */
|
|
|
+ this.received_status = null;
|
|
|
this.on('finish', function() {
|
|
|
var batch = {};
|
|
|
batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
|
|
@@ -233,6 +306,9 @@ function ClientDuplexStream(call, serialize, deserialize) {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ClientDuplexStream.prototype._readsDone = _readsDone;
|
|
|
+ClientDuplexStream.prototype._receiveStatus = _receiveStatus;
|
|
|
+ClientDuplexStream.prototype._emitStatusIfDone = _emitStatusIfDone;
|
|
|
ClientDuplexStream.prototype._read = _read;
|
|
|
ClientDuplexStream.prototype._write = _write;
|
|
|
|
|
@@ -306,21 +382,23 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
|
|
|
* @this {Client} Client object. Must have a channel member.
|
|
|
* @param {*} argument The argument to the call. Should be serializable with
|
|
|
* serialize
|
|
|
- * @param {function(?Error, value=)} callback The callback to for when the
|
|
|
- * response is received
|
|
|
* @param {Metadata=} metadata Metadata to add to the call
|
|
|
* @param {Object=} options Options map
|
|
|
+ * @param {function(?Error, value=)} callback The callback to for when the
|
|
|
+ * response is received
|
|
|
* @return {EventEmitter} An event emitter for stream related events
|
|
|
*/
|
|
|
- function makeUnaryRequest(argument, callback, metadata, options) {
|
|
|
+ function makeUnaryRequest(argument, metadata, options, callback) {
|
|
|
/* jshint validthis: true */
|
|
|
+ /* While the arguments are listed in the function signature, those variables
|
|
|
+ * are not used directly. Instead, ArgueJS processes the arguments
|
|
|
+ * object. This allows for simple handling of optional arguments in the
|
|
|
+ * middle of the argument list, and also provides type checking. */
|
|
|
+ var args = arguejs({argument: null, metadata: [Metadata, new Metadata()],
|
|
|
+ options: [Object], callback: Function}, arguments);
|
|
|
var emitter = new EventEmitter();
|
|
|
- var call = getCall(this.$channel, method, options);
|
|
|
- if (metadata === null || metadata === undefined) {
|
|
|
- metadata = new Metadata();
|
|
|
- } else {
|
|
|
- metadata = metadata.clone();
|
|
|
- }
|
|
|
+ var call = getCall(this.$channel, method, args.options);
|
|
|
+ metadata = args.metadata.clone();
|
|
|
emitter.cancel = function cancel() {
|
|
|
call.cancel();
|
|
|
};
|
|
@@ -328,9 +406,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
|
|
|
return call.getPeer();
|
|
|
};
|
|
|
var client_batch = {};
|
|
|
- var message = serialize(argument);
|
|
|
- if (options) {
|
|
|
- message.grpcWriteFlags = options.flags;
|
|
|
+ var message = serialize(args.argument);
|
|
|
+ if (args.options) {
|
|
|
+ message.grpcWriteFlags = args.options.flags;
|
|
|
}
|
|
|
client_batch[grpc.opType.SEND_INITIAL_METADATA] =
|
|
|
metadata._getCoreRepresentation();
|
|
@@ -348,7 +426,7 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
|
|
|
if (status.code === grpc.status.OK) {
|
|
|
if (err) {
|
|
|
// Got a batch error, but OK status. Something went wrong
|
|
|
- callback(err);
|
|
|
+ args.callback(err);
|
|
|
return;
|
|
|
} else {
|
|
|
try {
|
|
@@ -367,9 +445,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
|
|
|
error = new Error(status.details);
|
|
|
error.code = status.code;
|
|
|
error.metadata = status.metadata;
|
|
|
- callback(error);
|
|
|
+ args.callback(error);
|
|
|
} else {
|
|
|
- callback(null, deserialized);
|
|
|
+ args.callback(null, deserialized);
|
|
|
}
|
|
|
emitter.emit('status', status);
|
|
|
emitter.emit('metadata', Metadata._fromCoreRepresentation(
|
|
@@ -393,21 +471,23 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
|
|
|
* Make a client stream request with this method on the given channel with the
|
|
|
* given callback, etc.
|
|
|
* @this {Client} Client object. Must have a channel member.
|
|
|
- * @param {function(?Error, value=)} callback The callback to for when the
|
|
|
- * response is received
|
|
|
* @param {Metadata=} metadata Array of metadata key/value pairs to add to the
|
|
|
* call
|
|
|
* @param {Object=} options Options map
|
|
|
+ * @param {function(?Error, value=)} callback The callback to for when the
|
|
|
+ * response is received
|
|
|
* @return {EventEmitter} An event emitter for stream related events
|
|
|
*/
|
|
|
- function makeClientStreamRequest(callback, metadata, options) {
|
|
|
+ function makeClientStreamRequest(metadata, options, callback) {
|
|
|
/* jshint validthis: true */
|
|
|
- var call = getCall(this.$channel, method, options);
|
|
|
- if (metadata === null || metadata === undefined) {
|
|
|
- metadata = new Metadata();
|
|
|
- } else {
|
|
|
- metadata = metadata.clone();
|
|
|
- }
|
|
|
+ /* While the arguments are listed in the function signature, those variables
|
|
|
+ * are not used directly. Instead, ArgueJS processes the arguments
|
|
|
+ * object. This allows for simple handling of optional arguments in the
|
|
|
+ * middle of the argument list, and also provides type checking. */
|
|
|
+ var args = arguejs({metadata: [Metadata, new Metadata()],
|
|
|
+ options: [Object], callback: Function}, arguments);
|
|
|
+ var call = getCall(this.$channel, method, args.options);
|
|
|
+ metadata = args.metadata.clone();
|
|
|
var stream = new ClientWritableStream(call, serialize);
|
|
|
var metadata_batch = {};
|
|
|
metadata_batch[grpc.opType.SEND_INITIAL_METADATA] =
|
|
@@ -434,7 +514,7 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
|
|
|
if (status.code === grpc.status.OK) {
|
|
|
if (err) {
|
|
|
// Got a batch error, but OK status. Something went wrong
|
|
|
- callback(err);
|
|
|
+ args.callback(err);
|
|
|
return;
|
|
|
} else {
|
|
|
try {
|
|
@@ -453,9 +533,9 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
|
|
|
error = new Error(response.status.details);
|
|
|
error.code = status.code;
|
|
|
error.metadata = status.metadata;
|
|
|
- callback(error);
|
|
|
+ args.callback(error);
|
|
|
} else {
|
|
|
- callback(null, deserialized);
|
|
|
+ args.callback(null, deserialized);
|
|
|
}
|
|
|
stream.emit('status', status);
|
|
|
});
|
|
@@ -486,17 +566,18 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
|
|
|
*/
|
|
|
function makeServerStreamRequest(argument, metadata, options) {
|
|
|
/* jshint validthis: true */
|
|
|
- var call = getCall(this.$channel, method, options);
|
|
|
- if (metadata === null || metadata === undefined) {
|
|
|
- metadata = new Metadata();
|
|
|
- } else {
|
|
|
- metadata = metadata.clone();
|
|
|
- }
|
|
|
+ /* While the arguments are listed in the function signature, those variables
|
|
|
+ * are not used directly. Instead, ArgueJS processes the arguments
|
|
|
+ * object. */
|
|
|
+ var args = arguejs({argument: null, metadata: [Metadata, new Metadata()],
|
|
|
+ options: [Object]}, arguments);
|
|
|
+ var call = getCall(this.$channel, method, args.options);
|
|
|
+ metadata = args.metadata.clone();
|
|
|
var stream = new ClientReadableStream(call, deserialize);
|
|
|
var start_batch = {};
|
|
|
- var message = serialize(argument);
|
|
|
- if (options) {
|
|
|
- message.grpcWriteFlags = options.flags;
|
|
|
+ var message = serialize(args.argument);
|
|
|
+ if (args.options) {
|
|
|
+ message.grpcWriteFlags = args.options.flags;
|
|
|
}
|
|
|
start_batch[grpc.opType.SEND_INITIAL_METADATA] =
|
|
|
metadata._getCoreRepresentation();
|
|
@@ -515,22 +596,13 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
|
|
|
var status_batch = {};
|
|
|
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
|
|
|
call.startBatch(status_batch, function(err, response) {
|
|
|
- response.status.metadata = Metadata._fromCoreRepresentation(
|
|
|
- response.status.metadata);
|
|
|
- stream.emit('status', response.status);
|
|
|
- if (response.status.code !== grpc.status.OK) {
|
|
|
- var error = new Error(response.status.details);
|
|
|
- error.code = response.status.code;
|
|
|
- error.metadata = response.status.metadata;
|
|
|
- stream.emit('error', error);
|
|
|
+ if (err) {
|
|
|
+ stream.emit('error', err);
|
|
|
return;
|
|
|
- } else {
|
|
|
- if (err) {
|
|
|
- // Got a batch error, but OK status. Something went wrong
|
|
|
- stream.emit('error', err);
|
|
|
- return;
|
|
|
- }
|
|
|
}
|
|
|
+ response.status.metadata = Metadata._fromCoreRepresentation(
|
|
|
+ response.status.metadata);
|
|
|
+ stream._receiveStatus(response.status);
|
|
|
});
|
|
|
return stream;
|
|
|
}
|
|
@@ -557,12 +629,13 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
|
|
|
*/
|
|
|
function makeBidiStreamRequest(metadata, options) {
|
|
|
/* jshint validthis: true */
|
|
|
- var call = getCall(this.$channel, method, options);
|
|
|
- if (metadata === null || metadata === undefined) {
|
|
|
- metadata = new Metadata();
|
|
|
- } else {
|
|
|
- metadata = metadata.clone();
|
|
|
- }
|
|
|
+ /* While the arguments are listed in the function signature, those variables
|
|
|
+ * are not used directly. Instead, ArgueJS processes the arguments
|
|
|
+ * object. */
|
|
|
+ var args = arguejs({metadata: [Metadata, new Metadata()],
|
|
|
+ options: [Object]}, arguments);
|
|
|
+ var call = getCall(this.$channel, method, args.options);
|
|
|
+ metadata = args.metadata.clone();
|
|
|
var stream = new ClientDuplexStream(call, serialize, deserialize);
|
|
|
var start_batch = {};
|
|
|
start_batch[grpc.opType.SEND_INITIAL_METADATA] =
|
|
@@ -580,22 +653,13 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
|
|
|
var status_batch = {};
|
|
|
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
|
|
|
call.startBatch(status_batch, function(err, response) {
|
|
|
- response.status.metadata = Metadata._fromCoreRepresentation(
|
|
|
- response.status.metadata);
|
|
|
- stream.emit('status', response.status);
|
|
|
- if (response.status.code !== grpc.status.OK) {
|
|
|
- var error = new Error(response.status.details);
|
|
|
- error.code = response.status.code;
|
|
|
- error.metadata = response.status.metadata;
|
|
|
- stream.emit('error', error);
|
|
|
+ if (err) {
|
|
|
+ stream.emit('error', err);
|
|
|
return;
|
|
|
- } else {
|
|
|
- if (err) {
|
|
|
- // Got a batch error, but OK status. Something went wrong
|
|
|
- stream.emit('error', err);
|
|
|
- return;
|
|
|
- }
|
|
|
}
|
|
|
+ response.status.metadata = Metadata._fromCoreRepresentation(
|
|
|
+ response.status.metadata);
|
|
|
+ stream._receiveStatus(response.status);
|
|
|
});
|
|
|
return stream;
|
|
|
}
|
|
@@ -614,6 +678,40 @@ var requester_makers = {
|
|
|
bidi: makeBidiStreamRequestFunction
|
|
|
};
|
|
|
|
|
|
+function getDefaultValues(metadata, options) {
|
|
|
+ var res = {};
|
|
|
+ res.metadata = metadata || new Metadata();
|
|
|
+ res.options = options || {};
|
|
|
+ return res;
|
|
|
+}
|
|
|
+
|
|
|
+/**
|
|
|
+ * Map with wrappers for each type of requester function to make it use the old
|
|
|
+ * argument order with optional arguments after the callback.
|
|
|
+ */
|
|
|
+var deprecated_request_wrap = {
|
|
|
+ unary: function(makeUnaryRequest) {
|
|
|
+ return function makeWrappedUnaryRequest(argument, callback,
|
|
|
+ metadata, options) {
|
|
|
+ /* jshint validthis: true */
|
|
|
+ var opt_args = getDefaultValues(metadata, metadata);
|
|
|
+ return makeUnaryRequest.call(this, argument, opt_args.metadata,
|
|
|
+ opt_args.options, callback);
|
|
|
+ };
|
|
|
+ },
|
|
|
+ client_stream: function(makeServerStreamRequest) {
|
|
|
+ return function makeWrappedClientStreamRequest(callback, metadata,
|
|
|
+ options) {
|
|
|
+ /* jshint validthis: true */
|
|
|
+ var opt_args = getDefaultValues(metadata, options);
|
|
|
+ return makeServerStreamRequest.call(this, opt_args.metadata,
|
|
|
+ opt_args.options, callback);
|
|
|
+ };
|
|
|
+ },
|
|
|
+ server_stream: _.identity,
|
|
|
+ bidi: _.identity
|
|
|
+};
|
|
|
+
|
|
|
/**
|
|
|
* Creates a constructor for a client with the given methods. The methods object
|
|
|
* maps method name to an object with the following keys:
|
|
@@ -625,9 +723,19 @@ var requester_makers = {
|
|
|
* responseDeserialize: function to deserialize response objects
|
|
|
* @param {Object} methods An object mapping method names to method attributes
|
|
|
* @param {string} serviceName The fully qualified name of the service
|
|
|
+ * @param {Object} class_options An options object. Currently only uses the key
|
|
|
+ * deprecatedArgumentOrder, a boolean that Indicates that the old argument
|
|
|
+ * order should be used for methods, with optional arguments at the end
|
|
|
+ * instead of the callback at the end. Defaults to false. This option is
|
|
|
+ * only a temporary stopgap measure to smooth an API breakage.
|
|
|
+ * It is deprecated, and new code should not use it.
|
|
|
* @return {function(string, Object)} New client constructor
|
|
|
*/
|
|
|
-exports.makeClientConstructor = function(methods, serviceName) {
|
|
|
+exports.makeClientConstructor = function(methods, serviceName,
|
|
|
+ class_options) {
|
|
|
+ if (!class_options) {
|
|
|
+ class_options = {};
|
|
|
+ }
|
|
|
/**
|
|
|
* Create a client with the given methods
|
|
|
* @constructor
|
|
@@ -674,8 +782,13 @@ exports.makeClientConstructor = function(methods, serviceName) {
|
|
|
}
|
|
|
var serialize = attrs.requestSerialize;
|
|
|
var deserialize = attrs.responseDeserialize;
|
|
|
- Client.prototype[name] = requester_makers[method_type](
|
|
|
+ var method_func = requester_makers[method_type](
|
|
|
attrs.path, serialize, deserialize);
|
|
|
+ if (class_options.deprecatedArgumentOrder) {
|
|
|
+ Client.prototype[name] = deprecated_request_wrap(method_func);
|
|
|
+ } else {
|
|
|
+ Client.prototype[name] = method_func;
|
|
|
+ }
|
|
|
// Associate all provided attributes with the method
|
|
|
_.assign(Client.prototype[name], attrs);
|
|
|
});
|
|
@@ -730,10 +843,14 @@ exports.waitForClientReady = function(client, deadline, callback) {
|
|
|
* @return {function(string, Object)} New client constructor
|
|
|
*/
|
|
|
exports.makeProtobufClientConstructor = function(service, options) {
|
|
|
- var method_attrs = common.getProtobufServiceAttrs(service, service.name,
|
|
|
- options);
|
|
|
+ var method_attrs = common.getProtobufServiceAttrs(service, options);
|
|
|
+ var deprecatedArgumentOrder = false;
|
|
|
+ if (options) {
|
|
|
+ deprecatedArgumentOrder = options.deprecatedArgumentOrder;
|
|
|
+ }
|
|
|
var Client = exports.makeClientConstructor(
|
|
|
- method_attrs, common.fullyQualifiedName(service));
|
|
|
+ method_attrs, common.fullyQualifiedName(service),
|
|
|
+ deprecatedArgumentOrder);
|
|
|
Client.service = service;
|
|
|
Client.service.grpc_options = options;
|
|
|
return Client;
|
|
@@ -764,7 +881,7 @@ exports.callError = grpc.callError;
|
|
|
<br class="clear">
|
|
|
|
|
|
<footer>
|
|
|
- Documentation generated by <a href="https://github.com/jsdoc3/jsdoc">JSDoc 3.4.0</a> on Tue Mar 08 2016 10:29:45 GMT-0800 (PST)
|
|
|
+ Documentation generated by <a href="https://github.com/jsdoc3/jsdoc">JSDoc 3.4.0</a> on Wed May 11 2016 18:17:25 GMT-0700 (PDT)
|
|
|
</footer>
|
|
|
|
|
|
<script> prettyPrint(); </script>
|