src_server.js.html 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865
  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="utf-8">
  5. <title>JSDoc: Source: src/server.js</title>
  6. <script src="scripts/prettify/prettify.js"> </script>
  7. <script src="scripts/prettify/lang-css.js"> </script>
  8. <!--[if lt IE 9]>
  9. <script src="//html5shiv.googlecode.com/svn/trunk/html5.js"></script>
  10. <![endif]-->
  11. <link type="text/css" rel="stylesheet" href="styles/prettify-tomorrow.css">
  12. <link type="text/css" rel="stylesheet" href="styles/jsdoc-default.css">
  13. </head>
  14. <body>
  15. <div id="main">
  16. <h1 class="page-title">Source: src/server.js</h1>
  17. <section>
  18. <article>
  19. <pre class="prettyprint source linenums"><code>/*
  20. *
  21. * Copyright 2015, Google Inc.
  22. * All rights reserved.
  23. *
  24. * Redistribution and use in source and binary forms, with or without
  25. * modification, are permitted provided that the following conditions are
  26. * met:
  27. *
  28. * * Redistributions of source code must retain the above copyright
  29. * notice, this list of conditions and the following disclaimer.
  30. * * Redistributions in binary form must reproduce the above
  31. * copyright notice, this list of conditions and the following disclaimer
  32. * in the documentation and/or other materials provided with the
  33. * distribution.
  34. * * Neither the name of Google Inc. nor the names of its
  35. * contributors may be used to endorse or promote products derived from
  36. * this software without specific prior written permission.
  37. *
  38. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  39. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  40. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  41. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  42. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  43. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  44. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  45. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  46. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  47. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  48. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  49. *
  50. */
  51. /**
  52. * Server module
  53. *
  54. * This module contains all the server code for Node gRPC: both the Server
  55. * class itself and the method handler code for all types of methods.
  56. *
  57. * For example, to create a Server, add a service, and start it:
  58. *
  59. * var server = new server_module.Server();
  60. * server.addProtoService(protobuf_service_descriptor, service_implementation);
  61. * server.bind('address:port', server_credential);
  62. * server.start();
  63. *
  64. * @module
  65. */
  66. 'use strict';
  67. var _ = require('lodash');
  68. var grpc = require('./grpc_extension');
  69. var common = require('./common');
  70. var Metadata = require('./metadata');
  71. var stream = require('stream');
  72. var Readable = stream.Readable;
  73. var Writable = stream.Writable;
  74. var Duplex = stream.Duplex;
  75. var util = require('util');
  76. var EventEmitter = require('events').EventEmitter;
  77. /**
  78. * Handle an error on a call by sending it as a status
  79. * @access private
  80. * @param {grpc.Call} call The call to send the error on
  81. * @param {Object} error The error object
  82. */
  83. function handleError(call, error) {
  84. var statusMetadata = new Metadata();
  85. var status = {
  86. code: grpc.status.UNKNOWN,
  87. details: 'Unknown Error'
  88. };
  89. if (error.hasOwnProperty('message')) {
  90. status.details = error.message;
  91. }
  92. if (error.hasOwnProperty('code')) {
  93. status.code = error.code;
  94. if (error.hasOwnProperty('details')) {
  95. status.details = error.details;
  96. }
  97. }
  98. if (error.hasOwnProperty('metadata')) {
  99. statusMetadata = error.metadata;
  100. }
  101. status.metadata = statusMetadata._getCoreRepresentation();
  102. var error_batch = {};
  103. if (!call.metadataSent) {
  104. error_batch[grpc.opType.SEND_INITIAL_METADATA] =
  105. (new Metadata())._getCoreRepresentation();
  106. }
  107. error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
  108. call.startBatch(error_batch, function(){});
  109. }
  110. /**
  111. * Send a response to a unary or client streaming call.
  112. * @access private
  113. * @param {grpc.Call} call The call to respond on
  114. * @param {*} value The value to respond with
  115. * @param {function(*):Buffer=} serialize Serialization function for the
  116. * response
  117. * @param {Metadata=} metadata Optional trailing metadata to send with status
  118. * @param {number=} flags Flags for modifying how the message is sent.
  119. * Defaults to 0.
  120. */
  121. function sendUnaryResponse(call, value, serialize, metadata, flags) {
  122. var end_batch = {};
  123. var statusMetadata = new Metadata();
  124. var status = {
  125. code: grpc.status.OK,
  126. details: 'OK'
  127. };
  128. if (metadata) {
  129. statusMetadata = metadata;
  130. }
  131. var message;
  132. try {
  133. message = serialize(value);
  134. } catch (e) {
  135. e.code = grpc.status.INTERNAL;
  136. handleError(call, e);
  137. return;
  138. }
  139. status.metadata = statusMetadata._getCoreRepresentation();
  140. if (!call.metadataSent) {
  141. end_batch[grpc.opType.SEND_INITIAL_METADATA] =
  142. (new Metadata())._getCoreRepresentation();
  143. call.metadataSent = true;
  144. }
  145. message.grpcWriteFlags = flags;
  146. end_batch[grpc.opType.SEND_MESSAGE] = message;
  147. end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
  148. call.startBatch(end_batch, function (){});
  149. }
  150. /**
  151. * Initialize a writable stream. This is used for both the writable and duplex
  152. * stream constructors.
  153. * @access private
  154. * @param {Writable} stream The stream to set up
  155. * @param {function(*):Buffer=} Serialization function for responses
  156. */
  157. function setUpWritable(stream, serialize) {
  158. stream.finished = false;
  159. stream.status = {
  160. code : grpc.status.OK,
  161. details : 'OK',
  162. metadata : new Metadata()
  163. };
  164. stream.serialize = common.wrapIgnoreNull(serialize);
  165. function sendStatus() {
  166. var batch = {};
  167. if (!stream.call.metadataSent) {
  168. stream.call.metadataSent = true;
  169. batch[grpc.opType.SEND_INITIAL_METADATA] =
  170. (new Metadata())._getCoreRepresentation();
  171. }
  172. if (stream.status.metadata) {
  173. stream.status.metadata = stream.status.metadata._getCoreRepresentation();
  174. }
  175. batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
  176. stream.call.startBatch(batch, function(){});
  177. }
  178. stream.on('finish', sendStatus);
  179. /**
  180. * Set the pending status to a given error status. If the error does not have
  181. * code or details properties, the code will be set to grpc.status.UNKNOWN
  182. * and the details will be set to 'Unknown Error'.
  183. * @param {Error} err The error object
  184. */
  185. function setStatus(err) {
  186. var code = grpc.status.UNKNOWN;
  187. var details = 'Unknown Error';
  188. var metadata = new Metadata();
  189. if (err.hasOwnProperty('message')) {
  190. details = err.message;
  191. }
  192. if (err.hasOwnProperty('code')) {
  193. code = err.code;
  194. if (err.hasOwnProperty('details')) {
  195. details = err.details;
  196. }
  197. }
  198. if (err.hasOwnProperty('metadata')) {
  199. metadata = err.metadata;
  200. }
  201. stream.status = {code: code, details: details, metadata: metadata};
  202. }
  203. /**
  204. * Terminate the call. This includes indicating that reads are done, draining
  205. * all pending writes, and sending the given error as a status
  206. * @param {Error} err The error object
  207. * @this GrpcServerStream
  208. */
  209. function terminateCall(err) {
  210. // Drain readable data
  211. setStatus(err);
  212. stream.end();
  213. }
  214. stream.on('error', terminateCall);
  215. /**
  216. * Override of Writable#end method that allows for sending metadata with a
  217. * success status.
  218. * @param {Metadata=} metadata Metadata to send with the status
  219. */
  220. stream.end = function(metadata) {
  221. if (metadata) {
  222. stream.status.metadata = metadata;
  223. }
  224. Writable.prototype.end.call(this);
  225. };
  226. }
  227. /**
  228. * Initialize a readable stream. This is used for both the readable and duplex
  229. * stream constructors.
  230. * @access private
  231. * @param {Readable} stream The stream to initialize
  232. * @param {function(Buffer):*=} deserialize Deserialization function for
  233. * incoming data.
  234. */
  235. function setUpReadable(stream, deserialize) {
  236. stream.deserialize = common.wrapIgnoreNull(deserialize);
  237. stream.finished = false;
  238. stream.reading = false;
  239. stream.terminate = function() {
  240. stream.finished = true;
  241. stream.on('data', function() {});
  242. };
  243. stream.on('cancelled', function() {
  244. stream.terminate();
  245. });
  246. }
  247. util.inherits(ServerUnaryCall, EventEmitter);
  248. function ServerUnaryCall(call) {
  249. EventEmitter.call(this);
  250. this.call = call;
  251. }
  252. util.inherits(ServerWritableStream, Writable);
  253. /**
  254. * A stream that the server can write to. Used for calls that are streaming from
  255. * the server side.
  256. * @constructor
  257. * @param {grpc.Call} call The call object to send data with
  258. * @param {function(*):Buffer=} serialize Serialization function for writes
  259. */
  260. function ServerWritableStream(call, serialize) {
  261. Writable.call(this, {objectMode: true});
  262. this.call = call;
  263. this.finished = false;
  264. setUpWritable(this, serialize);
  265. }
  266. /**
  267. * Start writing a chunk of data. This is an implementation of a method required
  268. * for implementing stream.Writable.
  269. * @access private
  270. * @param {Buffer} chunk The chunk of data to write
  271. * @param {string} encoding Used to pass write flags
  272. * @param {function(Error=)} callback Callback to indicate that the write is
  273. * complete
  274. */
  275. function _write(chunk, encoding, callback) {
  276. /* jshint validthis: true */
  277. var batch = {};
  278. var self = this;
  279. var message;
  280. try {
  281. message = this.serialize(chunk);
  282. } catch (e) {
  283. e.code = grpc.status.INTERNAL;
  284. callback(e);
  285. return;
  286. }
  287. if (!this.call.metadataSent) {
  288. batch[grpc.opType.SEND_INITIAL_METADATA] =
  289. (new Metadata())._getCoreRepresentation();
  290. this.call.metadataSent = true;
  291. }
  292. if (_.isFinite(encoding)) {
  293. /* Attach the encoding if it is a finite number. This is the closest we
  294. * can get to checking that it is valid flags */
  295. message.grpcWriteFlags = encoding;
  296. }
  297. batch[grpc.opType.SEND_MESSAGE] = message;
  298. this.call.startBatch(batch, function(err, value) {
  299. if (err) {
  300. self.emit('error', err);
  301. return;
  302. }
  303. callback();
  304. });
  305. }
  306. ServerWritableStream.prototype._write = _write;
  307. util.inherits(ServerReadableStream, Readable);
  308. /**
  309. * A stream that the server can read from. Used for calls that are streaming
  310. * from the client side.
  311. * @constructor
  312. * @param {grpc.Call} call The call object to read data with
  313. * @param {function(Buffer):*=} deserialize Deserialization function for reads
  314. */
  315. function ServerReadableStream(call, deserialize) {
  316. Readable.call(this, {objectMode: true});
  317. this.call = call;
  318. setUpReadable(this, deserialize);
  319. }
  320. /**
  321. * Start reading from the gRPC data source. This is an implementation of a
  322. * method required for implementing stream.Readable
  323. * @access private
  324. * @param {number} size Ignored
  325. */
  326. function _read(size) {
  327. /* jshint validthis: true */
  328. var self = this;
  329. /**
  330. * Callback to be called when a READ event is received. Pushes the data onto
  331. * the read queue and starts reading again if applicable
  332. * @param {grpc.Event} event READ event object
  333. */
  334. function readCallback(err, event) {
  335. if (err) {
  336. self.terminate();
  337. return;
  338. }
  339. if (self.finished) {
  340. self.push(null);
  341. return;
  342. }
  343. var data = event.read;
  344. var deserialized;
  345. try {
  346. deserialized = self.deserialize(data);
  347. } catch (e) {
  348. e.code = grpc.status.INTERNAL;
  349. self.emit('error', e);
  350. return;
  351. }
  352. if (self.push(deserialized) &amp;&amp; data !== null) {
  353. var read_batch = {};
  354. read_batch[grpc.opType.RECV_MESSAGE] = true;
  355. self.call.startBatch(read_batch, readCallback);
  356. } else {
  357. self.reading = false;
  358. }
  359. }
  360. if (self.finished) {
  361. self.push(null);
  362. } else {
  363. if (!self.reading) {
  364. self.reading = true;
  365. var batch = {};
  366. batch[grpc.opType.RECV_MESSAGE] = true;
  367. self.call.startBatch(batch, readCallback);
  368. }
  369. }
  370. }
  371. ServerReadableStream.prototype._read = _read;
  372. util.inherits(ServerDuplexStream, Duplex);
  373. /**
  374. * A stream that the server can read from or write to. Used for calls with
  375. * duplex streaming.
  376. * @constructor
  377. * @param {grpc.Call} call Call object to proxy
  378. * @param {function(*):Buffer=} serialize Serialization function for requests
  379. * @param {function(Buffer):*=} deserialize Deserialization function for
  380. * responses
  381. */
  382. function ServerDuplexStream(call, serialize, deserialize) {
  383. Duplex.call(this, {objectMode: true});
  384. this.call = call;
  385. setUpWritable(this, serialize);
  386. setUpReadable(this, deserialize);
  387. }
  388. ServerDuplexStream.prototype._read = _read;
  389. ServerDuplexStream.prototype._write = _write;
  390. /**
  391. * Send the initial metadata for a writable stream.
  392. * @param {Metadata} responseMetadata Metadata to send
  393. */
  394. function sendMetadata(responseMetadata) {
  395. /* jshint validthis: true */
  396. var self = this;
  397. if (!this.call.metadataSent) {
  398. this.call.metadataSent = true;
  399. var batch = {};
  400. batch[grpc.opType.SEND_INITIAL_METADATA] =
  401. responseMetadata._getCoreRepresentation();
  402. this.call.startBatch(batch, function(err) {
  403. if (err) {
  404. self.emit('error', err);
  405. return;
  406. }
  407. });
  408. }
  409. }
  410. ServerUnaryCall.prototype.sendMetadata = sendMetadata;
  411. ServerWritableStream.prototype.sendMetadata = sendMetadata;
  412. ServerReadableStream.prototype.sendMetadata = sendMetadata;
  413. ServerDuplexStream.prototype.sendMetadata = sendMetadata;
  414. /**
  415. * Get the endpoint this call/stream is connected to.
  416. * @return {string} The URI of the endpoint
  417. */
  418. function getPeer() {
  419. /* jshint validthis: true */
  420. return this.call.getPeer();
  421. }
  422. ServerUnaryCall.prototype.getPeer = getPeer;
  423. ServerReadableStream.prototype.getPeer = getPeer;
  424. ServerWritableStream.prototype.getPeer = getPeer;
  425. ServerDuplexStream.prototype.getPeer = getPeer;
  426. /**
  427. * Wait for the client to close, then emit a cancelled event if the client
  428. * cancelled.
  429. */
  430. function waitForCancel() {
  431. /* jshint validthis: true */
  432. var self = this;
  433. var cancel_batch = {};
  434. cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
  435. self.call.startBatch(cancel_batch, function(err, result) {
  436. if (err) {
  437. self.emit('error', err);
  438. }
  439. if (result.cancelled) {
  440. self.cancelled = true;
  441. self.emit('cancelled');
  442. }
  443. });
  444. }
  445. ServerUnaryCall.prototype.waitForCancel = waitForCancel;
  446. ServerReadableStream.prototype.waitForCancel = waitForCancel;
  447. ServerWritableStream.prototype.waitForCancel = waitForCancel;
  448. ServerDuplexStream.prototype.waitForCancel = waitForCancel;
  449. /**
  450. * Fully handle a unary call
  451. * @access private
  452. * @param {grpc.Call} call The call to handle
  453. * @param {Object} handler Request handler object for the method that was called
  454. * @param {Metadata} metadata Metadata from the client
  455. */
  456. function handleUnary(call, handler, metadata) {
  457. var emitter = new ServerUnaryCall(call);
  458. emitter.on('error', function(error) {
  459. handleError(call, error);
  460. });
  461. emitter.metadata = metadata;
  462. emitter.waitForCancel();
  463. var batch = {};
  464. batch[grpc.opType.RECV_MESSAGE] = true;
  465. call.startBatch(batch, function(err, result) {
  466. if (err) {
  467. handleError(call, err);
  468. return;
  469. }
  470. try {
  471. emitter.request = handler.deserialize(result.read);
  472. } catch (e) {
  473. e.code = grpc.status.INTERNAL;
  474. handleError(call, e);
  475. return;
  476. }
  477. if (emitter.cancelled) {
  478. return;
  479. }
  480. handler.func(emitter, function sendUnaryData(err, value, trailer, flags) {
  481. if (err) {
  482. if (trailer) {
  483. err.metadata = trailer;
  484. }
  485. handleError(call, err);
  486. } else {
  487. sendUnaryResponse(call, value, handler.serialize, trailer, flags);
  488. }
  489. });
  490. });
  491. }
  492. /**
  493. * Fully handle a server streaming call
  494. * @access private
  495. * @param {grpc.Call} call The call to handle
  496. * @param {Object} handler Request handler object for the method that was called
  497. * @param {Metadata} metadata Metadata from the client
  498. */
  499. function handleServerStreaming(call, handler, metadata) {
  500. var stream = new ServerWritableStream(call, handler.serialize);
  501. stream.waitForCancel();
  502. stream.metadata = metadata;
  503. var batch = {};
  504. batch[grpc.opType.RECV_MESSAGE] = true;
  505. call.startBatch(batch, function(err, result) {
  506. if (err) {
  507. stream.emit('error', err);
  508. return;
  509. }
  510. try {
  511. stream.request = handler.deserialize(result.read);
  512. } catch (e) {
  513. e.code = grpc.status.INTERNAL;
  514. stream.emit('error', e);
  515. return;
  516. }
  517. handler.func(stream);
  518. });
  519. }
  520. /**
  521. * Fully handle a client streaming call
  522. * @access private
  523. * @param {grpc.Call} call The call to handle
  524. * @param {Object} handler Request handler object for the method that was called
  525. * @param {Metadata} metadata Metadata from the client
  526. */
  527. function handleClientStreaming(call, handler, metadata) {
  528. var stream = new ServerReadableStream(call, handler.deserialize);
  529. stream.on('error', function(error) {
  530. handleError(call, error);
  531. });
  532. stream.waitForCancel();
  533. stream.metadata = metadata;
  534. handler.func(stream, function(err, value, trailer, flags) {
  535. stream.terminate();
  536. if (err) {
  537. if (trailer) {
  538. err.metadata = trailer;
  539. }
  540. handleError(call, err);
  541. } else {
  542. sendUnaryResponse(call, value, handler.serialize, trailer, flags);
  543. }
  544. });
  545. }
  546. /**
  547. * Fully handle a bidirectional streaming call
  548. * @access private
  549. * @param {grpc.Call} call The call to handle
  550. * @param {Object} handler Request handler object for the method that was called
  551. * @param {Metadata} metadata Metadata from the client
  552. */
  553. function handleBidiStreaming(call, handler, metadata) {
  554. var stream = new ServerDuplexStream(call, handler.serialize,
  555. handler.deserialize);
  556. stream.waitForCancel();
  557. stream.metadata = metadata;
  558. handler.func(stream);
  559. }
  560. var streamHandlers = {
  561. unary: handleUnary,
  562. server_stream: handleServerStreaming,
  563. client_stream: handleClientStreaming,
  564. bidi: handleBidiStreaming
  565. };
  566. /**
  567. * Constructs a server object that stores request handlers and delegates
  568. * incoming requests to those handlers
  569. * @constructor
  570. * @param {Object=} options Options that should be passed to the internal server
  571. * implementation
  572. */
  573. function Server(options) {
  574. this.handlers = {};
  575. var handlers = this.handlers;
  576. var server = new grpc.Server(options);
  577. this._server = server;
  578. this.started = false;
  579. /**
  580. * Start the server and begin handling requests
  581. * @this Server
  582. */
  583. this.start = function() {
  584. if (this.started) {
  585. throw new Error('Server is already running');
  586. }
  587. this.started = true;
  588. server.start();
  589. /**
  590. * Handles the SERVER_RPC_NEW event. If there is a handler associated with
  591. * the requested method, use that handler to respond to the request. Then
  592. * wait for the next request
  593. * @param {grpc.Event} event The event to handle with tag SERVER_RPC_NEW
  594. */
  595. function handleNewCall(err, event) {
  596. if (err) {
  597. return;
  598. }
  599. var details = event.new_call;
  600. var call = details.call;
  601. var method = details.method;
  602. var metadata = Metadata._fromCoreRepresentation(details.metadata);
  603. if (method === null) {
  604. return;
  605. }
  606. server.requestCall(handleNewCall);
  607. var handler;
  608. if (handlers.hasOwnProperty(method)) {
  609. handler = handlers[method];
  610. } else {
  611. var batch = {};
  612. batch[grpc.opType.SEND_INITIAL_METADATA] =
  613. (new Metadata())._getCoreRepresentation();
  614. batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
  615. code: grpc.status.UNIMPLEMENTED,
  616. details: '',
  617. metadata: {}
  618. };
  619. batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
  620. call.startBatch(batch, function() {});
  621. return;
  622. }
  623. streamHandlers[handler.type](call, handler, metadata);
  624. }
  625. server.requestCall(handleNewCall);
  626. };
  627. /**
  628. * Gracefully shuts down the server. The server will stop receiving new calls,
  629. * and any pending calls will complete. The callback will be called when all
  630. * pending calls have completed and the server is fully shut down. This method
  631. * is idempotent with itself and forceShutdown.
  632. * @param {function()} callback The shutdown complete callback
  633. */
  634. this.tryShutdown = function(callback) {
  635. server.tryShutdown(callback);
  636. };
  637. /**
  638. * Forcibly shuts down the server. The server will stop receiving new calls
  639. * and cancel all pending calls. When it returns, the server has shut down.
  640. * This method is idempotent with itself and tryShutdown, and it will trigger
  641. * any outstanding tryShutdown callbacks.
  642. */
  643. this.forceShutdown = function() {
  644. server.forceShutdown();
  645. };
  646. }
  647. /**
  648. * Registers a handler to handle the named method. Fails if there already is
  649. * a handler for the given method. Returns true on success
  650. * @param {string} name The name of the method that the provided function should
  651. * handle/respond to.
  652. * @param {function} handler Function that takes a stream of request values and
  653. * returns a stream of response values
  654. * @param {function(*):Buffer} serialize Serialization function for responses
  655. * @param {function(Buffer):*} deserialize Deserialization function for requests
  656. * @param {string} type The streaming type of method that this handles
  657. * @return {boolean} True if the handler was set. False if a handler was already
  658. * set for that name.
  659. */
  660. Server.prototype.register = function(name, handler, serialize, deserialize,
  661. type) {
  662. if (this.handlers.hasOwnProperty(name)) {
  663. return false;
  664. }
  665. this.handlers[name] = {
  666. func: handler,
  667. serialize: serialize,
  668. deserialize: deserialize,
  669. type: type
  670. };
  671. return true;
  672. };
  673. var unimplementedStatusResponse = {
  674. code: grpc.status.UNIMPLEMENTED,
  675. details: 'The server does not implement this method'
  676. };
  677. var defaultHandler = {
  678. unary: function(call, callback) {
  679. callback(unimplementedStatusResponse);
  680. },
  681. client_stream: function(call, callback) {
  682. callback(unimplementedStatusResponse);
  683. },
  684. server_stream: function(call) {
  685. call.emit('error', unimplementedStatusResponse);
  686. },
  687. bidi: function(call) {
  688. call.emit('error', unimplementedStatusResponse);
  689. }
  690. };
  691. /**
  692. * Add a service to the server, with a corresponding implementation. If you are
  693. * generating this from a proto file, you should instead use
  694. * addProtoService.
  695. * @param {Object&lt;String, *>} service The service descriptor, as
  696. * {@link module:src/common.getProtobufServiceAttrs} returns
  697. * @param {Object&lt;String, function>} implementation Map of method names to
  698. * method implementation for the provided service.
  699. */
  700. Server.prototype.addService = function(service, implementation) {
  701. if (!_.isObject(service) || !_.isObject(implementation)) {
  702. throw new Error('addService requires two objects as arguments');
  703. }
  704. if (_.keys(service).length === 0) {
  705. throw new Error('Cannot add an empty service to a server');
  706. }
  707. if (this.started) {
  708. throw new Error('Can\'t add a service to a started server.');
  709. }
  710. var self = this;
  711. _.forOwn(service, function(attrs, name) {
  712. var method_type;
  713. if (attrs.requestStream) {
  714. if (attrs.responseStream) {
  715. method_type = 'bidi';
  716. } else {
  717. method_type = 'client_stream';
  718. }
  719. } else {
  720. if (attrs.responseStream) {
  721. method_type = 'server_stream';
  722. } else {
  723. method_type = 'unary';
  724. }
  725. }
  726. var impl;
  727. if (implementation[name] === undefined) {
  728. /* Handle the case where the method is passed with the name exactly as
  729. written in the proto file, instead of using JavaScript function
  730. naming style */
  731. if (implementation[attrs.originalName] === undefined) {
  732. common.log(grpc.logVerbosity.ERROR, 'Method handler ' + name + ' for ' +
  733. attrs.path + ' expected but not provided');
  734. impl = defaultHandler[method_type];
  735. } else {
  736. impl = _.bind(implementation[attrs.originalName], implementation);
  737. }
  738. } else {
  739. impl = _.bind(implementation[name], implementation);
  740. }
  741. var serialize = attrs.responseSerialize;
  742. var deserialize = attrs.requestDeserialize;
  743. var register_success = self.register(attrs.path, impl, serialize,
  744. deserialize, method_type);
  745. if (!register_success) {
  746. throw new Error('Method handler for ' + attrs.path +
  747. ' already provided.');
  748. }
  749. });
  750. };
  751. /**
  752. * Add a proto service to the server, with a corresponding implementation
  753. * @param {Protobuf.Reflect.Service} service The proto service descriptor
  754. * @param {Object&lt;String, function>} implementation Map of method names to
  755. * method implementation for the provided service.
  756. */
  757. Server.prototype.addProtoService = function(service, implementation) {
  758. var options;
  759. if (service.grpc_options) {
  760. options = service.grpc_options;
  761. }
  762. this.addService(common.getProtobufServiceAttrs(service, options),
  763. implementation);
  764. };
  765. /**
  766. * Binds the server to the given port, with SSL enabled if creds is given
  767. * @param {string} port The port that the server should bind on, in the format
  768. * "address:port"
  769. * @param {ServerCredentials=} creds Server credential object to be used for
  770. * SSL. Pass an insecure credentials object for an insecure port.
  771. */
  772. Server.prototype.bind = function(port, creds) {
  773. if (this.started) {
  774. throw new Error('Can\'t bind an already running server to an address');
  775. }
  776. return this._server.addHttp2Port(port, creds);
  777. };
  778. /**
  779. * @see module:src/server~Server
  780. */
  781. exports.Server = Server;
  782. </code></pre>
  783. </article>
  784. </section>
  785. </div>
  786. <nav>
  787. <h2><a href="index.html">Home</a></h2><h3>Modules</h3><ul><li><a href="module-src_client.html">src/client</a></li><li><a href="module-src_common.html">src/common</a></li><li><a href="module-src_credentials.html">src/credentials</a></li><li><a href="module-src_metadata.html">src/metadata</a></li><li><a href="module-src_server.html">src/server</a></li></ul><h3>Classes</h3><ul><li><a href="module-src_client.makeClientConstructor-Client.html">Client</a></li><li><a href="module-src_client-ClientDuplexStream.html">ClientDuplexStream</a></li><li><a href="module-src_client-ClientReadableStream.html">ClientReadableStream</a></li><li><a href="module-src_client-ClientWritableStream.html">ClientWritableStream</a></li><li><a href="module-src_metadata-Metadata.html">Metadata</a></li><li><a href="module-src_server-Server.html">Server</a></li><li><a href="module-src_server-ServerDuplexStream.html">ServerDuplexStream</a></li><li><a href="module-src_server-ServerReadableStream.html">ServerReadableStream</a></li><li><a href="module-src_server-ServerWritableStream.html">ServerWritableStream</a></li></ul><h3>Global</h3><ul><li><a href="global.html#callError">callError</a></li><li><a href="global.html#credentials">credentials</a></li><li><a href="global.html#getClientChannel">getClientChannel</a></li><li><a href="global.html#load">load</a></li><li><a href="global.html#loadObject">loadObject</a></li><li><a href="global.html#logVerbosity">logVerbosity</a></li><li><a href="global.html#makeGenericClientConstructor">makeGenericClientConstructor</a></li><li><a href="global.html#Metadata">Metadata</a></li><li><a href="global.html#propagate">propagate</a></li><li><a href="global.html#Server">Server</a></li><li><a href="global.html#ServerCredentials">ServerCredentials</a></li><li><a href="global.html#setLogger">setLogger</a></li><li><a href="global.html#setLogVerbosity">setLogVerbosity</a></li><li><a href="global.html#status">status</a></li><li><a href="global.html#waitForClientReady">waitForClientReady</a></li><li><a href="global.html#writeFlags">writeFlags</a></li></ul>
  788. </nav>
  789. <br class="clear">
  790. <footer>
  791. Documentation generated by <a href="https://github.com/jsdoc3/jsdoc">JSDoc 3.4.3</a> on Tue Mar 21 2017 11:29:55 GMT-0700 (PDT)
  792. </footer>
  793. <script> prettyPrint(); </script>
  794. <script src="scripts/linenumber.js"> </script>
  795. </body>
  796. </html>