src_server.js.html 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036
  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="utf-8">
  5. <title>JSDoc: Source: src/server.js</title>
  6. <script src="scripts/prettify/prettify.js"> </script>
  7. <script src="scripts/prettify/lang-css.js"> </script>
  8. <!--[if lt IE 9]>
  9. <script src="//html5shiv.googlecode.com/svn/trunk/html5.js"></script>
  10. <![endif]-->
  11. <link type="text/css" rel="stylesheet" href="styles/prettify-tomorrow.css">
  12. <link type="text/css" rel="stylesheet" href="styles/jsdoc-default.css">
  13. </head>
  14. <body>
  15. <div id="main">
  16. <h1 class="page-title">Source: src/server.js</h1>
  17. <section>
  18. <article>
  19. <pre class="prettyprint source linenums"><code>/**
  20. * @license
  21. * Copyright 2015, Google Inc.
  22. * All rights reserved.
  23. *
  24. * Redistribution and use in source and binary forms, with or without
  25. * modification, are permitted provided that the following conditions are
  26. * met:
  27. *
  28. * * Redistributions of source code must retain the above copyright
  29. * notice, this list of conditions and the following disclaimer.
  30. * * Redistributions in binary form must reproduce the above
  31. * copyright notice, this list of conditions and the following disclaimer
  32. * in the documentation and/or other materials provided with the
  33. * distribution.
  34. * * Neither the name of Google Inc. nor the names of its
  35. * contributors may be used to endorse or promote products derived from
  36. * this software without specific prior written permission.
  37. *
  38. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  39. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  40. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  41. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  42. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  43. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  44. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  45. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  46. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  47. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  48. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  49. *
  50. */
  51. 'use strict';
  52. var _ = require('lodash');
  53. var grpc = require('./grpc_extension');
  54. var common = require('./common');
  55. var Metadata = require('./metadata');
  56. var constants = require('./constants');
  57. var stream = require('stream');
  58. var Readable = stream.Readable;
  59. var Writable = stream.Writable;
  60. var Duplex = stream.Duplex;
  61. var util = require('util');
  62. var EventEmitter = require('events').EventEmitter;
  63. /**
  64. * Handle an error on a call by sending it as a status
  65. * @private
  66. * @param {grpc.internal~Call} call The call to send the error on
  67. * @param {(Object|Error)} error The error object
  68. */
  69. function handleError(call, error) {
  70. var statusMetadata = new Metadata();
  71. var status = {
  72. code: constants.status.UNKNOWN,
  73. details: 'Unknown Error'
  74. };
  75. if (error.hasOwnProperty('message')) {
  76. status.details = error.message;
  77. }
  78. if (error.hasOwnProperty('code')) {
  79. status.code = error.code;
  80. if (error.hasOwnProperty('details')) {
  81. status.details = error.details;
  82. }
  83. }
  84. if (error.hasOwnProperty('metadata')) {
  85. statusMetadata = error.metadata;
  86. }
  87. status.metadata = statusMetadata._getCoreRepresentation();
  88. var error_batch = {};
  89. if (!call.metadataSent) {
  90. error_batch[grpc.opType.SEND_INITIAL_METADATA] =
  91. (new Metadata())._getCoreRepresentation();
  92. }
  93. error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
  94. call.startBatch(error_batch, function(){});
  95. }
  96. /**
  97. * Send a response to a unary or client streaming call.
  98. * @private
  99. * @param {grpc.Call} call The call to respond on
  100. * @param {*} value The value to respond with
  101. * @param {grpc~serialize} serialize Serialization function for the
  102. * response
  103. * @param {grpc.Metadata=} metadata Optional trailing metadata to send with
  104. * status
  105. * @param {number=} [flags=0] Flags for modifying how the message is sent.
  106. */
  107. function sendUnaryResponse(call, value, serialize, metadata, flags) {
  108. var end_batch = {};
  109. var statusMetadata = new Metadata();
  110. var status = {
  111. code: constants.status.OK,
  112. details: 'OK'
  113. };
  114. if (metadata) {
  115. statusMetadata = metadata;
  116. }
  117. var message;
  118. try {
  119. message = serialize(value);
  120. } catch (e) {
  121. e.code = constants.status.INTERNAL;
  122. handleError(call, e);
  123. return;
  124. }
  125. status.metadata = statusMetadata._getCoreRepresentation();
  126. if (!call.metadataSent) {
  127. end_batch[grpc.opType.SEND_INITIAL_METADATA] =
  128. (new Metadata())._getCoreRepresentation();
  129. call.metadataSent = true;
  130. }
  131. message.grpcWriteFlags = flags;
  132. end_batch[grpc.opType.SEND_MESSAGE] = message;
  133. end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
  134. call.startBatch(end_batch, function (){});
  135. }
  136. /**
  137. * Initialize a writable stream. This is used for both the writable and duplex
  138. * stream constructors.
  139. * @private
  140. * @param {Writable} stream The stream to set up
  141. * @param {function(*):Buffer=} Serialization function for responses
  142. */
  143. function setUpWritable(stream, serialize) {
  144. stream.finished = false;
  145. stream.status = {
  146. code : constants.status.OK,
  147. details : 'OK',
  148. metadata : new Metadata()
  149. };
  150. stream.serialize = common.wrapIgnoreNull(serialize);
  151. function sendStatus() {
  152. var batch = {};
  153. if (!stream.call.metadataSent) {
  154. stream.call.metadataSent = true;
  155. batch[grpc.opType.SEND_INITIAL_METADATA] =
  156. (new Metadata())._getCoreRepresentation();
  157. }
  158. if (stream.status.metadata) {
  159. stream.status.metadata = stream.status.metadata._getCoreRepresentation();
  160. }
  161. batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
  162. stream.call.startBatch(batch, function(){});
  163. }
  164. stream.on('finish', sendStatus);
  165. /**
  166. * Set the pending status to a given error status. If the error does not have
  167. * code or details properties, the code will be set to grpc.status.UNKNOWN
  168. * and the details will be set to 'Unknown Error'.
  169. * @param {Error} err The error object
  170. */
  171. function setStatus(err) {
  172. var code = constants.status.UNKNOWN;
  173. var details = 'Unknown Error';
  174. var metadata = new Metadata();
  175. if (err.hasOwnProperty('message')) {
  176. details = err.message;
  177. }
  178. if (err.hasOwnProperty('code')) {
  179. code = err.code;
  180. if (err.hasOwnProperty('details')) {
  181. details = err.details;
  182. }
  183. }
  184. if (err.hasOwnProperty('metadata')) {
  185. metadata = err.metadata;
  186. }
  187. stream.status = {code: code, details: details, metadata: metadata};
  188. }
  189. /**
  190. * Terminate the call. This includes indicating that reads are done, draining
  191. * all pending writes, and sending the given error as a status
  192. * @param {Error} err The error object
  193. * @this GrpcServerStream
  194. */
  195. function terminateCall(err) {
  196. // Drain readable data
  197. setStatus(err);
  198. stream.end();
  199. }
  200. stream.on('error', terminateCall);
  201. /**
  202. * Override of Writable#end method that allows for sending metadata with a
  203. * success status.
  204. * @param {Metadata=} metadata Metadata to send with the status
  205. */
  206. stream.end = function(metadata) {
  207. if (metadata) {
  208. stream.status.metadata = metadata;
  209. }
  210. Writable.prototype.end.call(this);
  211. };
  212. }
  213. /**
  214. * Initialize a readable stream. This is used for both the readable and duplex
  215. * stream constructors.
  216. * @private
  217. * @param {Readable} stream The stream to initialize
  218. * @param {grpc~deserialize} deserialize Deserialization function for
  219. * incoming data.
  220. */
  221. function setUpReadable(stream, deserialize) {
  222. stream.deserialize = common.wrapIgnoreNull(deserialize);
  223. stream.finished = false;
  224. stream.reading = false;
  225. stream.terminate = function() {
  226. stream.finished = true;
  227. stream.on('data', function() {});
  228. };
  229. stream.on('cancelled', function() {
  230. stream.terminate();
  231. });
  232. }
  233. /**
  234. * Emitted when the call has been cancelled. After this has been emitted, the
  235. * call's `cancelled` property will be set to `true`.
  236. * @event grpc~ServerUnaryCall~cancelled
  237. */
  238. util.inherits(ServerUnaryCall, EventEmitter);
  239. /**
  240. * An EventEmitter. Used for unary calls.
  241. * @constructor grpc~ServerUnaryCall
  242. * @extends external:EventEmitter
  243. * @param {grpc.internal~Call} call The call object associated with the request
  244. * @param {grpc.Metadata} metadata The request metadata from the client
  245. */
  246. function ServerUnaryCall(call, metadata) {
  247. EventEmitter.call(this);
  248. this.call = call;
  249. /**
  250. * Indicates if the call has been cancelled
  251. * @member {boolean} grpc~ServerUnaryCall#cancelled
  252. */
  253. this.cancelled = false;
  254. /**
  255. * The request metadata from the client
  256. * @member {grpc.Metadata} grpc~ServerUnaryCall#metadata
  257. */
  258. this.metadata = metadata;
  259. /**
  260. * The request message from the client
  261. * @member {*} grpc~ServerUnaryCall#request
  262. */
  263. this.request = undefined;
  264. }
  265. /**
  266. * Emitted when the call has been cancelled. After this has been emitted, the
  267. * call's `cancelled` property will be set to `true`.
  268. * @event grpc~ServerWritableStream~cancelled
  269. */
  270. util.inherits(ServerWritableStream, Writable);
  271. /**
  272. * A stream that the server can write to. Used for calls that are streaming from
  273. * the server side.
  274. * @constructor grpc~ServerWritableStream
  275. * @extends external:Writable
  276. * @borrows grpc~ServerUnaryCall#sendMetadata as
  277. * grpc~ServerWritableStream#sendMetadata
  278. * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerWritableStream#getPeer
  279. * @param {grpc.internal~Call} call The call object to send data with
  280. * @param {grpc.Metadata} metadata The request metadata from the client
  281. * @param {grpc~serialize} serialize Serialization function for writes
  282. */
  283. function ServerWritableStream(call, metadata, serialize) {
  284. Writable.call(this, {objectMode: true});
  285. this.call = call;
  286. this.finished = false;
  287. setUpWritable(this, serialize);
  288. /**
  289. * Indicates if the call has been cancelled
  290. * @member {boolean} grpc~ServerWritableStream#cancelled
  291. */
  292. this.cancelled = false;
  293. /**
  294. * The request metadata from the client
  295. * @member {grpc.Metadata} grpc~ServerWritableStream#metadata
  296. */
  297. this.metadata = metadata;
  298. /**
  299. * The request message from the client
  300. * @member {*} grpc~ServerWritableStream#request
  301. */
  302. this.request = undefined;
  303. }
  304. /**
  305. * Start writing a chunk of data. This is an implementation of a method required
  306. * for implementing stream.Writable.
  307. * @private
  308. * @param {Buffer} chunk The chunk of data to write
  309. * @param {string} encoding Used to pass write flags
  310. * @param {function(Error=)} callback Callback to indicate that the write is
  311. * complete
  312. */
  313. function _write(chunk, encoding, callback) {
  314. /* jshint validthis: true */
  315. var batch = {};
  316. var self = this;
  317. var message;
  318. try {
  319. message = this.serialize(chunk);
  320. } catch (e) {
  321. e.code = constants.status.INTERNAL;
  322. callback(e);
  323. return;
  324. }
  325. if (!this.call.metadataSent) {
  326. batch[grpc.opType.SEND_INITIAL_METADATA] =
  327. (new Metadata())._getCoreRepresentation();
  328. this.call.metadataSent = true;
  329. }
  330. if (_.isFinite(encoding)) {
  331. /* Attach the encoding if it is a finite number. This is the closest we
  332. * can get to checking that it is valid flags */
  333. message.grpcWriteFlags = encoding;
  334. }
  335. batch[grpc.opType.SEND_MESSAGE] = message;
  336. this.call.startBatch(batch, function(err, value) {
  337. if (err) {
  338. self.emit('error', err);
  339. return;
  340. }
  341. callback();
  342. });
  343. }
  344. ServerWritableStream.prototype._write = _write;
  345. /**
  346. * Emitted when the call has been cancelled. After this has been emitted, the
  347. * call's `cancelled` property will be set to `true`.
  348. * @event grpc~ServerReadableStream~cancelled
  349. */
  350. util.inherits(ServerReadableStream, Readable);
  351. /**
  352. * A stream that the server can read from. Used for calls that are streaming
  353. * from the client side.
  354. * @constructor grpc~ServerReadableStream
  355. * @extends external:Readable
  356. * @borrows grpc~ServerUnaryCall#sendMetadata as
  357. * grpc~ServerReadableStream#sendMetadata
  358. * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerReadableStream#getPeer
  359. * @param {grpc.internal~Call} call The call object to read data with
  360. * @param {grpc.Metadata} metadata The request metadata from the client
  361. * @param {grpc~deserialize} deserialize Deserialization function for reads
  362. */
  363. function ServerReadableStream(call, metadata, deserialize) {
  364. Readable.call(this, {objectMode: true});
  365. this.call = call;
  366. setUpReadable(this, deserialize);
  367. /**
  368. * Indicates if the call has been cancelled
  369. * @member {boolean} grpc~ServerReadableStream#cancelled
  370. */
  371. this.cancelled = false;
  372. /**
  373. * The request metadata from the client
  374. * @member {grpc.Metadata} grpc~ServerReadableStream#metadata
  375. */
  376. this.metadata = metadata;
  377. }
  378. /**
  379. * Start reading from the gRPC data source. This is an implementation of a
  380. * method required for implementing stream.Readable
  381. * @access private
  382. * @param {number} size Ignored
  383. */
  384. function _read(size) {
  385. /* jshint validthis: true */
  386. var self = this;
  387. /**
  388. * Callback to be called when a READ event is received. Pushes the data onto
  389. * the read queue and starts reading again if applicable
  390. * @param {grpc.Event} event READ event object
  391. */
  392. function readCallback(err, event) {
  393. if (err) {
  394. self.terminate();
  395. return;
  396. }
  397. if (self.finished) {
  398. self.push(null);
  399. return;
  400. }
  401. var data = event.read;
  402. var deserialized;
  403. try {
  404. deserialized = self.deserialize(data);
  405. } catch (e) {
  406. e.code = constants.status.INTERNAL;
  407. self.emit('error', e);
  408. return;
  409. }
  410. if (self.push(deserialized) &amp;&amp; data !== null) {
  411. var read_batch = {};
  412. read_batch[grpc.opType.RECV_MESSAGE] = true;
  413. self.call.startBatch(read_batch, readCallback);
  414. } else {
  415. self.reading = false;
  416. }
  417. }
  418. if (self.finished) {
  419. self.push(null);
  420. } else {
  421. if (!self.reading) {
  422. self.reading = true;
  423. var batch = {};
  424. batch[grpc.opType.RECV_MESSAGE] = true;
  425. self.call.startBatch(batch, readCallback);
  426. }
  427. }
  428. }
  429. ServerReadableStream.prototype._read = _read;
  430. /**
  431. * Emitted when the call has been cancelled. After this has been emitted, the
  432. * call's `cancelled` property will be set to `true`.
  433. * @event grpc~ServerDuplexStream~cancelled
  434. */
  435. util.inherits(ServerDuplexStream, Duplex);
  436. /**
  437. * A stream that the server can read from or write to. Used for calls with
  438. * duplex streaming.
  439. * @constructor grpc~ServerDuplexStream
  440. * @extends external:Duplex
  441. * @borrows grpc~ServerUnaryCall#sendMetadata as
  442. * grpc~ServerDuplexStream#sendMetadata
  443. * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerDuplexStream#getPeer
  444. * @param {grpc.internal~Call} call Call object to proxy
  445. * @param {grpc.Metadata} metadata The request metadata from the client
  446. * @param {grpc~serialize} serialize Serialization function for requests
  447. * @param {grpc~deserialize} deserialize Deserialization function for
  448. * responses
  449. */
  450. function ServerDuplexStream(call, metadata, serialize, deserialize) {
  451. Duplex.call(this, {objectMode: true});
  452. this.call = call;
  453. setUpWritable(this, serialize);
  454. setUpReadable(this, deserialize);
  455. /**
  456. * Indicates if the call has been cancelled
  457. * @member {boolean} grpc~ServerReadableStream#cancelled
  458. */
  459. this.cancelled = false;
  460. /**
  461. * The request metadata from the client
  462. * @member {grpc.Metadata} grpc~ServerReadableStream#metadata
  463. */
  464. this.metadata = metadata;
  465. }
  466. ServerDuplexStream.prototype._read = _read;
  467. ServerDuplexStream.prototype._write = _write;
  468. /**
  469. * Send the initial metadata for a writable stream.
  470. * @alias grpc~ServerUnaryCall#sendMetadata
  471. * @param {Metadata} responseMetadata Metadata to send
  472. */
  473. function sendMetadata(responseMetadata) {
  474. /* jshint validthis: true */
  475. var self = this;
  476. if (!this.call.metadataSent) {
  477. this.call.metadataSent = true;
  478. var batch = {};
  479. batch[grpc.opType.SEND_INITIAL_METADATA] =
  480. responseMetadata._getCoreRepresentation();
  481. this.call.startBatch(batch, function(err) {
  482. if (err) {
  483. self.emit('error', err);
  484. return;
  485. }
  486. });
  487. }
  488. }
  489. ServerUnaryCall.prototype.sendMetadata = sendMetadata;
  490. ServerWritableStream.prototype.sendMetadata = sendMetadata;
  491. ServerReadableStream.prototype.sendMetadata = sendMetadata;
  492. ServerDuplexStream.prototype.sendMetadata = sendMetadata;
  493. /**
  494. * Get the endpoint this call/stream is connected to.
  495. * @alias grpc~ServerUnaryCall#getPeer
  496. * @return {string} The URI of the endpoint
  497. */
  498. function getPeer() {
  499. /* jshint validthis: true */
  500. return this.call.getPeer();
  501. }
  502. ServerUnaryCall.prototype.getPeer = getPeer;
  503. ServerReadableStream.prototype.getPeer = getPeer;
  504. ServerWritableStream.prototype.getPeer = getPeer;
  505. ServerDuplexStream.prototype.getPeer = getPeer;
  506. /**
  507. * Wait for the client to close, then emit a cancelled event if the client
  508. * cancelled.
  509. * @private
  510. */
  511. function waitForCancel() {
  512. /* jshint validthis: true */
  513. var self = this;
  514. var cancel_batch = {};
  515. cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
  516. self.call.startBatch(cancel_batch, function(err, result) {
  517. if (err) {
  518. self.emit('error', err);
  519. }
  520. if (result.cancelled) {
  521. self.cancelled = true;
  522. self.emit('cancelled');
  523. }
  524. });
  525. }
  526. ServerUnaryCall.prototype.waitForCancel = waitForCancel;
  527. ServerReadableStream.prototype.waitForCancel = waitForCancel;
  528. ServerWritableStream.prototype.waitForCancel = waitForCancel;
  529. ServerDuplexStream.prototype.waitForCancel = waitForCancel;
  530. /**
  531. * Callback function passed to server handlers that handle methods with unary
  532. * responses.
  533. * @callback grpc.Server~sendUnaryData
  534. * @param {grpc~ServiceError} error An error, if the call failed
  535. * @param {*} value The response value. Must be a valid argument to the
  536. * `responseSerialize` method of the method that is being handled
  537. * @param {grpc.Metadata=} trailer Trailing metadata to send, if applicable
  538. * @param {grpc.writeFlags=} flags Flags to modify writing the response
  539. */
  540. /**
  541. * User-provided method to handle unary requests on a server
  542. * @callback grpc.Server~handleUnaryCall
  543. * @param {grpc~ServerUnaryCall} call The call object
  544. * @param {grpc.Server~sendUnaryData} callback The callback to call to respond
  545. * to the request
  546. */
  547. /**
  548. * Fully handle a unary call
  549. * @private
  550. * @param {grpc.internal~Call} call The call to handle
  551. * @param {Object} handler Request handler object for the method that was called
  552. * @param {grpc~Server.handleUnaryCall} handler.func The handler function
  553. * @param {grpc~deserialize} handler.deserialize The deserialization function
  554. * for request data
  555. * @param {grpc~serialize} handler.serialize The serialization function for
  556. * response data
  557. * @param {grpc.Metadata} metadata Metadata from the client
  558. */
  559. function handleUnary(call, handler, metadata) {
  560. var emitter = new ServerUnaryCall(call, metadata);
  561. emitter.on('error', function(error) {
  562. handleError(call, error);
  563. });
  564. emitter.waitForCancel();
  565. var batch = {};
  566. batch[grpc.opType.RECV_MESSAGE] = true;
  567. call.startBatch(batch, function(err, result) {
  568. if (err) {
  569. handleError(call, err);
  570. return;
  571. }
  572. try {
  573. emitter.request = handler.deserialize(result.read);
  574. } catch (e) {
  575. e.code = constants.status.INTERNAL;
  576. handleError(call, e);
  577. return;
  578. }
  579. if (emitter.cancelled) {
  580. return;
  581. }
  582. handler.func(emitter, function sendUnaryData(err, value, trailer, flags) {
  583. if (err) {
  584. if (trailer) {
  585. err.metadata = trailer;
  586. }
  587. handleError(call, err);
  588. } else {
  589. sendUnaryResponse(call, value, handler.serialize, trailer, flags);
  590. }
  591. });
  592. });
  593. }
  594. /**
  595. * User provided method to handle server streaming methods on the server.
  596. * @callback grpc.Server~handleServerStreamingCall
  597. * @param {grpc~ServerWritableStream} call The call object
  598. */
  599. /**
  600. * Fully handle a server streaming call
  601. * @private
  602. * @param {grpc.internal~Call} call The call to handle
  603. * @param {Object} handler Request handler object for the method that was called
  604. * @param {grpc~Server.handleServerStreamingCall} handler.func The handler
  605. * function
  606. * @param {grpc~deserialize} handler.deserialize The deserialization function
  607. * for request data
  608. * @param {grpc~serialize} handler.serialize The serialization function for
  609. * response data
  610. * @param {grpc.Metadata} metadata Metadata from the client
  611. */
  612. function handleServerStreaming(call, handler, metadata) {
  613. var stream = new ServerWritableStream(call, metadata, handler.serialize);
  614. stream.waitForCancel();
  615. var batch = {};
  616. batch[grpc.opType.RECV_MESSAGE] = true;
  617. call.startBatch(batch, function(err, result) {
  618. if (err) {
  619. stream.emit('error', err);
  620. return;
  621. }
  622. try {
  623. stream.request = handler.deserialize(result.read);
  624. } catch (e) {
  625. e.code = constants.status.INTERNAL;
  626. stream.emit('error', e);
  627. return;
  628. }
  629. handler.func(stream);
  630. });
  631. }
  632. /**
  633. * User provided method to handle client streaming methods on the server.
  634. * @callback grpc.Server~handleClientStreamingCall
  635. * @param {grpc~ServerReadableStream} call The call object
  636. * @param {grpc.Server~sendUnaryData} callback The callback to call to respond
  637. * to the request
  638. */
  639. /**
  640. * Fully handle a client streaming call
  641. * @access private
  642. * @param {grpc.internal~Call} call The call to handle
  643. * @param {Object} handler Request handler object for the method that was called
  644. * @param {grpc~Server.handleClientStreamingCall} handler.func The handler
  645. * function
  646. * @param {grpc~deserialize} handler.deserialize The deserialization function
  647. * for request data
  648. * @param {grpc~serialize} handler.serialize The serialization function for
  649. * response data
  650. * @param {grpc.Metadata} metadata Metadata from the client
  651. */
  652. function handleClientStreaming(call, handler, metadata) {
  653. var stream = new ServerReadableStream(call, metadata, handler.deserialize);
  654. stream.on('error', function(error) {
  655. handleError(call, error);
  656. });
  657. stream.waitForCancel();
  658. handler.func(stream, function(err, value, trailer, flags) {
  659. stream.terminate();
  660. if (err) {
  661. if (trailer) {
  662. err.metadata = trailer;
  663. }
  664. handleError(call, err);
  665. } else {
  666. sendUnaryResponse(call, value, handler.serialize, trailer, flags);
  667. }
  668. });
  669. }
  670. /**
  671. * User provided method to handle bidirectional streaming calls on the server.
  672. * @callback grpc.Server~handleBidiStreamingCall
  673. * @param {grpc~ServerDuplexStream} call The call object
  674. */
  675. /**
  676. * Fully handle a bidirectional streaming call
  677. * @private
  678. * @param {grpc.internal~Call} call The call to handle
  679. * @param {Object} handler Request handler object for the method that was called
  680. * @param {grpc~Server.handleBidiStreamingCall} handler.func The handler
  681. * function
  682. * @param {grpc~deserialize} handler.deserialize The deserialization function
  683. * for request data
  684. * @param {grpc~serialize} handler.serialize The serialization function for
  685. * response data
  686. * @param {Metadata} metadata Metadata from the client
  687. */
  688. function handleBidiStreaming(call, handler, metadata) {
  689. var stream = new ServerDuplexStream(call, metadata, handler.serialize,
  690. handler.deserialize);
  691. stream.waitForCancel();
  692. handler.func(stream);
  693. }
  694. var streamHandlers = {
  695. unary: handleUnary,
  696. server_stream: handleServerStreaming,
  697. client_stream: handleClientStreaming,
  698. bidi: handleBidiStreaming
  699. };
  700. /**
  701. * Constructs a server object that stores request handlers and delegates
  702. * incoming requests to those handlers
  703. * @memberof grpc
  704. * @constructor
  705. * @param {Object=} options Options that should be passed to the internal server
  706. * implementation
  707. * @example
  708. * var server = new grpc.Server();
  709. * server.addProtoService(protobuf_service_descriptor, service_implementation);
  710. * server.bind('address:port', server_credential);
  711. * server.start();
  712. */
  713. function Server(options) {
  714. this.handlers = {};
  715. var server = new grpc.Server(options);
  716. this._server = server;
  717. this.started = false;
  718. }
  719. /**
  720. * Start the server and begin handling requests
  721. */
  722. Server.prototype.start = function() {
  723. if (this.started) {
  724. throw new Error('Server is already running');
  725. }
  726. var self = this;
  727. this.started = true;
  728. this._server.start();
  729. /**
  730. * Handles the SERVER_RPC_NEW event. If there is a handler associated with
  731. * the requested method, use that handler to respond to the request. Then
  732. * wait for the next request
  733. * @param {grpc.internal~Event} event The event to handle with tag
  734. * SERVER_RPC_NEW
  735. */
  736. function handleNewCall(err, event) {
  737. if (err) {
  738. return;
  739. }
  740. var details = event.new_call;
  741. var call = details.call;
  742. var method = details.method;
  743. var metadata = Metadata._fromCoreRepresentation(details.metadata);
  744. if (method === null) {
  745. return;
  746. }
  747. self._server.requestCall(handleNewCall);
  748. var handler;
  749. if (self.handlers.hasOwnProperty(method)) {
  750. handler = self.handlers[method];
  751. } else {
  752. var batch = {};
  753. batch[grpc.opType.SEND_INITIAL_METADATA] =
  754. (new Metadata())._getCoreRepresentation();
  755. batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
  756. code: constants.status.UNIMPLEMENTED,
  757. details: '',
  758. metadata: {}
  759. };
  760. batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
  761. call.startBatch(batch, function() {});
  762. return;
  763. }
  764. streamHandlers[handler.type](call, handler, metadata);
  765. }
  766. this._server.requestCall(handleNewCall);
  767. };
  768. /**
  769. * Unified type for application handlers for all types of calls
  770. * @typedef {(grpc.Server~handleUnaryCall
  771. * |grpc.Server~handleClientStreamingCall
  772. * |grpc.Server~handleServerStreamingCall
  773. * |grpc.Server~handleBidiStreamingCall)} grpc.Server~handleCall
  774. */
  775. /**
  776. * Registers a handler to handle the named method. Fails if there already is
  777. * a handler for the given method. Returns true on success
  778. * @param {string} name The name of the method that the provided function should
  779. * handle/respond to.
  780. * @param {grpc.Server~handleCall} handler Function that takes a stream of
  781. * request values and returns a stream of response values
  782. * @param {grpc~serialize} serialize Serialization function for responses
  783. * @param {grpc~deserialize} deserialize Deserialization function for requests
  784. * @param {string} type The streaming type of method that this handles
  785. * @return {boolean} True if the handler was set. False if a handler was already
  786. * set for that name.
  787. */
  788. Server.prototype.register = function(name, handler, serialize, deserialize,
  789. type) {
  790. if (this.handlers.hasOwnProperty(name)) {
  791. return false;
  792. }
  793. this.handlers[name] = {
  794. func: handler,
  795. serialize: serialize,
  796. deserialize: deserialize,
  797. type: type
  798. };
  799. return true;
  800. };
  801. /**
  802. * Gracefully shuts down the server. The server will stop receiving new calls,
  803. * and any pending calls will complete. The callback will be called when all
  804. * pending calls have completed and the server is fully shut down. This method
  805. * is idempotent with itself and forceShutdown.
  806. * @param {function()} callback The shutdown complete callback
  807. */
  808. Server.prototype.tryShutdown = function(callback) {
  809. this._server.tryShutdown(callback);
  810. };
  811. /**
  812. * Forcibly shuts down the server. The server will stop receiving new calls
  813. * and cancel all pending calls. When it returns, the server has shut down.
  814. * This method is idempotent with itself and tryShutdown, and it will trigger
  815. * any outstanding tryShutdown callbacks.
  816. */
  817. Server.prototype.forceShutdown = function() {
  818. this._server.forceShutdown();
  819. };
  820. var unimplementedStatusResponse = {
  821. code: constants.status.UNIMPLEMENTED,
  822. details: 'The server does not implement this method'
  823. };
  824. var defaultHandler = {
  825. unary: function(call, callback) {
  826. callback(unimplementedStatusResponse);
  827. },
  828. client_stream: function(call, callback) {
  829. callback(unimplementedStatusResponse);
  830. },
  831. server_stream: function(call) {
  832. call.emit('error', unimplementedStatusResponse);
  833. },
  834. bidi: function(call) {
  835. call.emit('error', unimplementedStatusResponse);
  836. }
  837. };
  838. /**
  839. * Add a service to the server, with a corresponding implementation.
  840. * @param {grpc~ServiceDefinition} service The service descriptor
  841. * @param {Object&lt;String, grpc.Server~handleCall>} implementation Map of method
  842. * names to method implementation for the provided service.
  843. */
  844. Server.prototype.addService = function(service, implementation) {
  845. if (!_.isObject(service) || !_.isObject(implementation)) {
  846. throw new Error('addService requires two objects as arguments');
  847. }
  848. if (_.keys(service).length === 0) {
  849. throw new Error('Cannot add an empty service to a server');
  850. }
  851. if (this.started) {
  852. throw new Error('Can\'t add a service to a started server.');
  853. }
  854. var self = this;
  855. _.forOwn(service, function(attrs, name) {
  856. var method_type;
  857. if (attrs.requestStream) {
  858. if (attrs.responseStream) {
  859. method_type = 'bidi';
  860. } else {
  861. method_type = 'client_stream';
  862. }
  863. } else {
  864. if (attrs.responseStream) {
  865. method_type = 'server_stream';
  866. } else {
  867. method_type = 'unary';
  868. }
  869. }
  870. var impl;
  871. if (implementation[name] === undefined) {
  872. /* Handle the case where the method is passed with the name exactly as
  873. written in the proto file, instead of using JavaScript function
  874. naming style */
  875. if (implementation[attrs.originalName] === undefined) {
  876. common.log(constants.logVerbosity.ERROR, 'Method handler ' + name +
  877. ' for ' + attrs.path + ' expected but not provided');
  878. impl = defaultHandler[method_type];
  879. } else {
  880. impl = _.bind(implementation[attrs.originalName], implementation);
  881. }
  882. } else {
  883. impl = _.bind(implementation[name], implementation);
  884. }
  885. var serialize = attrs.responseSerialize;
  886. var deserialize = attrs.requestDeserialize;
  887. var register_success = self.register(attrs.path, impl, serialize,
  888. deserialize, method_type);
  889. if (!register_success) {
  890. throw new Error('Method handler for ' + attrs.path +
  891. ' already provided.');
  892. }
  893. });
  894. };
  895. var logAddProtoServiceDeprecationOnce = _.once(function() {
  896. common.log(constants.logVerbosity.INFO,
  897. 'Server#addProtoService is deprecated. Use addService instead');
  898. });
  899. /**
  900. * Add a proto service to the server, with a corresponding implementation
  901. * @deprecated Use {@link grpc.Server#addService} instead
  902. * @param {Protobuf.Reflect.Service} service The proto service descriptor
  903. * @param {Object&lt;String, grpc.Server~handleCall>} implementation Map of method
  904. * names to method implementation for the provided service.
  905. */
  906. Server.prototype.addProtoService = function(service, implementation) {
  907. var options;
  908. var protobuf_js_5_common = require('./protobuf_js_5_common');
  909. var protobuf_js_6_common = require('./protobuf_js_6_common');
  910. logAddProtoServiceDeprecationOnce();
  911. if (protobuf_js_5_common.isProbablyProtobufJs5(service)) {
  912. options = _.defaults(service.grpc_options, common.defaultGrpcOptions);
  913. this.addService(
  914. protobuf_js_5_common.getProtobufServiceAttrs(service, options),
  915. implementation);
  916. } else if (protobuf_js_6_common.isProbablyProtobufJs6(service)) {
  917. options = _.defaults(service.grpc_options, common.defaultGrpcOptions);
  918. this.addService(
  919. protobuf_js_6_common.getProtobufServiceAttrs(service, options),
  920. implementation);
  921. } else {
  922. // We assume that this is a service attributes object
  923. this.addService(service, implementation);
  924. }
  925. };
  926. /**
  927. * Binds the server to the given port, with SSL disabled if creds is an
  928. * insecure credentials object
  929. * @param {string} port The port that the server should bind on, in the format
  930. * "address:port"
  931. * @param {grpc.ServerCredentials} creds Server credential object to be used for
  932. * SSL. Pass an insecure credentials object for an insecure port.
  933. */
  934. Server.prototype.bind = function(port, creds) {
  935. if (this.started) {
  936. throw new Error('Can\'t bind an already running server to an address');
  937. }
  938. return this._server.addHttp2Port(port, creds);
  939. };
  940. exports.Server = Server;
  941. </code></pre>
  942. </article>
  943. </section>
  944. </div>
  945. <nav>
  946. <h2><a href="index.html">Home</a></h2><h3>Externals</h3><ul><li><a href="external-Duplex.html">Duplex</a></li><li><a href="external-EventEmitter.html">EventEmitter</a></li><li><a href="external-GoogleCredential.html">GoogleCredential</a></li><li><a href="external-Readable.html">Readable</a></li><li><a href="external-Writable.html">Writable</a></li></ul><h3>Classes</h3><ul><li><a href="grpc.Client.html">Client</a></li><li><a href="grpc.credentials-CallCredentials.html">CallCredentials</a></li><li><a href="grpc.credentials-ChannelCredentials.html">ChannelCredentials</a></li><li><a href="grpc.Metadata.html">Metadata</a></li><li><a href="grpc.Server.html">Server</a></li><li><a href="grpc.ServerCredentials.html">ServerCredentials</a></li><li><a href="grpc-ClientDuplexStream.html">ClientDuplexStream</a></li><li><a href="grpc-ClientReadableStream.html">ClientReadableStream</a></li><li><a href="grpc-ClientUnaryCall.html">ClientUnaryCall</a></li><li><a href="grpc-ClientWritableStream.html">ClientWritableStream</a></li><li><a href="grpc-ServerDuplexStream.html">ServerDuplexStream</a></li><li><a href="grpc-ServerReadableStream.html">ServerReadableStream</a></li><li><a href="grpc-ServerUnaryCall.html">ServerUnaryCall</a></li><li><a href="grpc-ServerWritableStream.html">ServerWritableStream</a></li></ul><h3>Events</h3><ul><li><a href="grpc-ClientDuplexStream.html#event:metadata">metadata</a></li><li><a href="grpc-ClientDuplexStream.html#event:status">status</a></li><li><a href="grpc-ClientReadableStream.html#event:metadata">metadata</a></li><li><a href="grpc-ClientReadableStream.html#event:status">status</a></li><li><a href="grpc-ClientUnaryCall.html#event:metadata">metadata</a></li><li><a href="grpc-ClientUnaryCall.html#event:status">status</a></li><li><a href="grpc-ClientWritableStream.html#event:metadata">metadata</a></li><li><a href="grpc-ClientWritableStream.html#event:status">status</a></li><li><a href="grpc-ServerDuplexStream.html#~event:cancelled">cancelled</a></li><li><a href="grpc-ServerReadableStream.html#~event:cancelled">cancelled</a></li><li><a href="grpc-ServerUnaryCall.html#~event:cancelled">cancelled</a></li><li><a href="grpc-ServerWritableStream.html#~event:cancelled">cancelled</a></li></ul><h3>Namespaces</h3><ul><li><a href="grpc.html">grpc</a></li><li><a href="grpc.credentials.html">credentials</a></li></ul>
  947. </nav>
  948. <br class="clear">
  949. <footer>
  950. Documentation generated by <a href="https://github.com/jsdoc3/jsdoc">JSDoc 3.4.3</a> on Wed Jun 28 2017 09:44:06 GMT-0700 (PDT)
  951. </footer>
  952. <script> prettyPrint(); </script>
  953. <script src="scripts/linenumber.js"> </script>
  954. </body>
  955. </html>