client.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. /*
  2. *
  3. * Copyright 2015, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. 'use strict';
  34. var _ = require('underscore');
  35. var grpc = require('bindings')('grpc.node');
  36. var common = require('./common.js');
  37. var EventEmitter = require('events').EventEmitter;
  38. var stream = require('stream');
  39. var Readable = stream.Readable;
  40. var Writable = stream.Writable;
  41. var Duplex = stream.Duplex;
  42. var util = require('util');
  43. util.inherits(ClientWritableStream, Writable);
  44. /**
  45. * A stream that the client can write to. Used for calls that are streaming from
  46. * the client side.
  47. * @constructor
  48. * @param {grpc.Call} call The call object to send data with
  49. * @param {function(*):Buffer=} serialize Serialization function for writes.
  50. */
  51. function ClientWritableStream(call, serialize) {
  52. Writable.call(this, {objectMode: true});
  53. this.call = call;
  54. this.serialize = common.wrapIgnoreNull(serialize);
  55. this.on('finish', function() {
  56. var batch = {};
  57. batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
  58. call.startBatch(batch, function() {});
  59. });
  60. }
  61. /**
  62. * Attempt to write the given chunk. Calls the callback when done. This is an
  63. * implementation of a method needed for implementing stream.Writable.
  64. * @param {Buffer} chunk The chunk to write
  65. * @param {string} encoding Ignored
  66. * @param {function(Error=)} callback Called when the write is complete
  67. */
  68. function _write(chunk, encoding, callback) {
  69. /* jshint validthis: true */
  70. var batch = {};
  71. batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
  72. this.call.startBatch(batch, function(err, event) {
  73. if (err) {
  74. throw err;
  75. }
  76. callback();
  77. });
  78. }
  79. ClientWritableStream.prototype._write = _write;
  80. util.inherits(ClientReadableStream, Readable);
  81. /**
  82. * A stream that the client can read from. Used for calls that are streaming
  83. * from the server side.
  84. * @constructor
  85. * @param {grpc.Call} call The call object to read data with
  86. * @param {function(Buffer):*=} deserialize Deserialization function for reads
  87. */
  88. function ClientReadableStream(call, deserialize) {
  89. Readable.call(this, {objectMode: true});
  90. this.call = call;
  91. this.finished = false;
  92. this.reading = false;
  93. this.deserialize = common.wrapIgnoreNull(deserialize);
  94. }
  95. /**
  96. * Read the next object from the stream.
  97. * @param {*} size Ignored because we use objectMode=true
  98. */
  99. function _read(size) {
  100. /* jshint validthis: true */
  101. var self = this;
  102. /**
  103. * Callback to be called when a READ event is received. Pushes the data onto
  104. * the read queue and starts reading again if applicable
  105. * @param {grpc.Event} event READ event object
  106. */
  107. function readCallback(err, event) {
  108. if (err) {
  109. throw err;
  110. }
  111. if (self.finished) {
  112. self.push(null);
  113. return;
  114. }
  115. var data = event.read;
  116. if (self.push(self.deserialize(data)) && data !== null) {
  117. var read_batch = {};
  118. read_batch[grpc.opType.RECV_MESSAGE] = true;
  119. self.call.startBatch(read_batch, readCallback);
  120. } else {
  121. self.reading = false;
  122. }
  123. }
  124. if (self.finished) {
  125. self.push(null);
  126. } else {
  127. if (!self.reading) {
  128. self.reading = true;
  129. var read_batch = {};
  130. read_batch[grpc.opType.RECV_MESSAGE] = true;
  131. self.call.startBatch(read_batch, readCallback);
  132. }
  133. }
  134. }
  135. ClientReadableStream.prototype._read = _read;
  136. util.inherits(ClientDuplexStream, Duplex);
  137. /**
  138. * A stream that the client can read from or write to. Used for calls with
  139. * duplex streaming.
  140. * @constructor
  141. * @param {grpc.Call} call Call object to proxy
  142. * @param {function(*):Buffer=} serialize Serialization function for requests
  143. * @param {function(Buffer):*=} deserialize Deserialization function for
  144. * responses
  145. */
  146. function ClientDuplexStream(call, serialize, deserialize) {
  147. Duplex.call(this, {objectMode: true});
  148. this.serialize = common.wrapIgnoreNull(serialize);
  149. this.deserialize = common.wrapIgnoreNull(deserialize);
  150. this.call = call;
  151. this.on('finish', function() {
  152. var batch = {};
  153. batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
  154. call.startBatch(batch, function() {});
  155. });
  156. }
  157. ClientDuplexStream.prototype._read = _read;
  158. ClientDuplexStream.prototype._write = _write;
  159. /**
  160. * Cancel the ongoing call
  161. */
  162. function cancel() {
  163. /* jshint validthis: true */
  164. this.call.cancel();
  165. }
  166. ClientReadableStream.prototype.cancel = cancel;
  167. ClientWritableStream.prototype.cancel = cancel;
  168. ClientDuplexStream.prototype.cancel = cancel;
  169. /**
  170. * Get a function that can make unary requests to the specified method.
  171. * @param {string} method The name of the method to request
  172. * @param {function(*):Buffer} serialize The serialization function for inputs
  173. * @param {function(Buffer)} deserialize The deserialization function for
  174. * outputs
  175. * @return {Function} makeUnaryRequest
  176. */
  177. function makeUnaryRequestFunction(method, serialize, deserialize) {
  178. /**
  179. * Make a unary request with this method on the given channel with the given
  180. * argument, callback, etc.
  181. * @this {Client} Client object. Must have a channel member.
  182. * @param {*} argument The argument to the call. Should be serializable with
  183. * serialize
  184. * @param {function(?Error, value=)} callback The callback to for when the
  185. * response is received
  186. * @param {array=} metadata Array of metadata key/value pairs to add to the
  187. * call
  188. * @param {(number|Date)=} deadline The deadline for processing this request.
  189. * Defaults to infinite future
  190. * @return {EventEmitter} An event emitter for stream related events
  191. */
  192. function makeUnaryRequest(argument, callback, metadata, deadline) {
  193. /* jshint validthis: true */
  194. if (deadline === undefined) {
  195. deadline = Infinity;
  196. }
  197. var emitter = new EventEmitter();
  198. var call = new grpc.Call(this.channel, method, deadline);
  199. if (metadata === null || metadata === undefined) {
  200. metadata = {};
  201. }
  202. emitter.cancel = function cancel() {
  203. call.cancel();
  204. };
  205. this.updateMetadata(metadata, function(error, metadata) {
  206. if (error) {
  207. call.cancel();
  208. callback(error);
  209. return;
  210. }
  211. var client_batch = {};
  212. client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
  213. client_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
  214. client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
  215. client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
  216. client_batch[grpc.opType.RECV_MESSAGE] = true;
  217. client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
  218. call.startBatch(client_batch, function(err, response) {
  219. if (err) {
  220. callback(err);
  221. return;
  222. }
  223. if (response.status.code !== grpc.status.OK) {
  224. var error = new Error(response.status.details);
  225. error.code = response.status.code;
  226. callback(error);
  227. return;
  228. }
  229. emitter.emit('status', response.status);
  230. emitter.emit('metadata', response.metadata);
  231. callback(null, deserialize(response.read));
  232. });
  233. });
  234. return emitter;
  235. }
  236. return makeUnaryRequest;
  237. }
  238. /**
  239. * Get a function that can make client stream requests to the specified method.
  240. * @param {string} method The name of the method to request
  241. * @param {function(*):Buffer} serialize The serialization function for inputs
  242. * @param {function(Buffer)} deserialize The deserialization function for
  243. * outputs
  244. * @return {Function} makeClientStreamRequest
  245. */
  246. function makeClientStreamRequestFunction(method, serialize, deserialize) {
  247. /**
  248. * Make a client stream request with this method on the given channel with the
  249. * given callback, etc.
  250. * @this {Client} Client object. Must have a channel member.
  251. * @param {function(?Error, value=)} callback The callback to for when the
  252. * response is received
  253. * @param {array=} metadata Array of metadata key/value pairs to add to the
  254. * call
  255. * @param {(number|Date)=} deadline The deadline for processing this request.
  256. * Defaults to infinite future
  257. * @return {EventEmitter} An event emitter for stream related events
  258. */
  259. function makeClientStreamRequest(callback, metadata, deadline) {
  260. /* jshint validthis: true */
  261. if (deadline === undefined) {
  262. deadline = Infinity;
  263. }
  264. var call = new grpc.Call(this.channel, method, deadline);
  265. if (metadata === null || metadata === undefined) {
  266. metadata = {};
  267. }
  268. var stream = new ClientWritableStream(call, serialize);
  269. this.updateMetadata(metadata, function(error, metadata) {
  270. if (error) {
  271. call.cancel();
  272. callback(error);
  273. return;
  274. }
  275. var metadata_batch = {};
  276. metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
  277. metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
  278. call.startBatch(metadata_batch, function(err, response) {
  279. if (err) {
  280. callback(err);
  281. return;
  282. }
  283. stream.emit('metadata', response.metadata);
  284. });
  285. var client_batch = {};
  286. client_batch[grpc.opType.RECV_MESSAGE] = true;
  287. client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
  288. call.startBatch(client_batch, function(err, response) {
  289. if (err) {
  290. callback(err);
  291. return;
  292. }
  293. if (response.status.code !== grpc.status.OK) {
  294. var error = new Error(response.status.details);
  295. error.code = response.status.code;
  296. callback(error);
  297. return;
  298. }
  299. stream.emit('status', response.status);
  300. callback(null, deserialize(response.read));
  301. });
  302. });
  303. return stream;
  304. }
  305. return makeClientStreamRequest;
  306. }
  307. /**
  308. * Get a function that can make server stream requests to the specified method.
  309. * @param {string} method The name of the method to request
  310. * @param {function(*):Buffer} serialize The serialization function for inputs
  311. * @param {function(Buffer)} deserialize The deserialization function for
  312. * outputs
  313. * @return {Function} makeServerStreamRequest
  314. */
  315. function makeServerStreamRequestFunction(method, serialize, deserialize) {
  316. /**
  317. * Make a server stream request with this method on the given channel with the
  318. * given argument, etc.
  319. * @this {SurfaceClient} Client object. Must have a channel member.
  320. * @param {*} argument The argument to the call. Should be serializable with
  321. * serialize
  322. * @param {array=} metadata Array of metadata key/value pairs to add to the
  323. * call
  324. * @param {(number|Date)=} deadline The deadline for processing this request.
  325. * Defaults to infinite future
  326. * @return {EventEmitter} An event emitter for stream related events
  327. */
  328. function makeServerStreamRequest(argument, metadata, deadline) {
  329. /* jshint validthis: true */
  330. if (deadline === undefined) {
  331. deadline = Infinity;
  332. }
  333. var call = new grpc.Call(this.channel, method, deadline);
  334. if (metadata === null || metadata === undefined) {
  335. metadata = {};
  336. }
  337. var stream = new ClientReadableStream(call, deserialize);
  338. this.updateMetadata(metadata, function(error, metadata) {
  339. if (error) {
  340. call.cancel();
  341. stream.emit('error', error);
  342. return;
  343. }
  344. var start_batch = {};
  345. start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
  346. start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
  347. start_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
  348. start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
  349. call.startBatch(start_batch, function(err, response) {
  350. if (err) {
  351. throw err;
  352. }
  353. stream.emit('metadata', response.metadata);
  354. });
  355. var status_batch = {};
  356. status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
  357. call.startBatch(status_batch, function(err, response) {
  358. if (err) {
  359. throw err;
  360. }
  361. stream.emit('status', response.status);
  362. });
  363. });
  364. return stream;
  365. }
  366. return makeServerStreamRequest;
  367. }
  368. /**
  369. * Get a function that can make bidirectional stream requests to the specified
  370. * method.
  371. * @param {string} method The name of the method to request
  372. * @param {function(*):Buffer} serialize The serialization function for inputs
  373. * @param {function(Buffer)} deserialize The deserialization function for
  374. * outputs
  375. * @return {Function} makeBidiStreamRequest
  376. */
  377. function makeBidiStreamRequestFunction(method, serialize, deserialize) {
  378. /**
  379. * Make a bidirectional stream request with this method on the given channel.
  380. * @this {SurfaceClient} Client object. Must have a channel member.
  381. * @param {array=} metadata Array of metadata key/value pairs to add to the
  382. * call
  383. * @param {(number|Date)=} deadline The deadline for processing this request.
  384. * Defaults to infinite future
  385. * @return {EventEmitter} An event emitter for stream related events
  386. */
  387. function makeBidiStreamRequest(metadata, deadline) {
  388. /* jshint validthis: true */
  389. if (deadline === undefined) {
  390. deadline = Infinity;
  391. }
  392. var call = new grpc.Call(this.channel, method, deadline);
  393. if (metadata === null || metadata === undefined) {
  394. metadata = {};
  395. }
  396. var stream = new ClientDuplexStream(call, serialize, deserialize);
  397. this.updateMetadata(metadata, function(error, metadata) {
  398. if (error) {
  399. call.cancel();
  400. stream.emit('error', error);
  401. return;
  402. }
  403. var start_batch = {};
  404. start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
  405. start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
  406. call.startBatch(start_batch, function(err, response) {
  407. if (err) {
  408. throw err;
  409. }
  410. stream.emit('metadata', response.metadata);
  411. });
  412. var status_batch = {};
  413. status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
  414. call.startBatch(status_batch, function(err, response) {
  415. if (err) {
  416. throw err;
  417. }
  418. stream.emit('status', response.status);
  419. });
  420. });
  421. return stream;
  422. }
  423. return makeBidiStreamRequest;
  424. }
  425. /**
  426. * Map with short names for each of the requester maker functions. Used in
  427. * makeClientConstructor
  428. */
  429. var requester_makers = {
  430. unary: makeUnaryRequestFunction,
  431. server_stream: makeServerStreamRequestFunction,
  432. client_stream: makeClientStreamRequestFunction,
  433. bidi: makeBidiStreamRequestFunction
  434. };
  435. /**
  436. * Creates a constructor for a client with the given methods. The methods object
  437. * maps method name to an object with the following keys:
  438. * path: The path on the server for accessing the method. For example, for
  439. * protocol buffers, we use "/service_name/method_name"
  440. * requestStream: bool indicating whether the client sends a stream
  441. * resonseStream: bool indicating whether the server sends a stream
  442. * requestSerialize: function to serialize request objects
  443. * responseDeserialize: function to deserialize response objects
  444. * @param {Object} methods An object mapping method names to method attributes
  445. * @return {function(string, Object)} New client constructor
  446. */
  447. function makeClientConstructor(methods) {
  448. /**
  449. * Create a client with the given methods
  450. * @constructor
  451. * @param {string} address The address of the server to connect to
  452. * @param {Object} options Options to pass to the underlying channel
  453. * @param {function(Object, function)=} updateMetadata function to update the
  454. * metadata for each request
  455. */
  456. function Client(address, options, updateMetadata) {
  457. if (updateMetadata) {
  458. this.updateMetadata = updateMetadata;
  459. } else {
  460. this.updateMetadata = function(metadata, callback) {
  461. callback(null, metadata);
  462. };
  463. }
  464. this.channel = new grpc.Channel(address, options);
  465. }
  466. _.each(methods, function(attrs, name) {
  467. var method_type;
  468. if (attrs.requestStream) {
  469. if (attrs.responseStream) {
  470. method_type = 'bidi';
  471. } else {
  472. method_type = 'client_stream';
  473. }
  474. } else {
  475. if (attrs.responseStream) {
  476. method_type = 'server_stream';
  477. } else {
  478. method_type = 'unary';
  479. }
  480. }
  481. var serialize = attrs.requestSerialize;
  482. var deserialize = attrs.responseDeserialize;
  483. Client.prototype[name] = requester_makers[method_type](
  484. attrs.path, serialize, deserialize);
  485. Client.prototype[name].serialize = serialize;
  486. Client.prototype[name].deserialize = deserialize;
  487. });
  488. return Client;
  489. }
  490. /**
  491. * Creates a constructor for clients for the given service
  492. * @param {ProtoBuf.Reflect.Service} service The service to generate a client
  493. * for
  494. * @return {function(string, Object)} New client constructor
  495. */
  496. function makeProtobufClientConstructor(service) {
  497. var method_attrs = common.getProtobufServiceAttrs(service);
  498. var Client = makeClientConstructor(method_attrs);
  499. Client.service = service;
  500. return Client;
  501. }
  502. exports.makeClientConstructor = makeClientConstructor;
  503. exports.makeProtobufClientConstructor = makeProtobufClientConstructor;
  504. /**
  505. * See docs for client.status
  506. */
  507. exports.status = grpc.status;
  508. /**
  509. * See docs for client.callError
  510. */
  511. exports.callError = grpc.callError;