src_server.js.html 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016
  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="utf-8">
  5. <title>JSDoc: Source: src/server.js</title>
  6. <script src="scripts/prettify/prettify.js"> </script>
  7. <script src="scripts/prettify/lang-css.js"> </script>
  8. <!--[if lt IE 9]>
  9. <script src="//html5shiv.googlecode.com/svn/trunk/html5.js"></script>
  10. <![endif]-->
  11. <link type="text/css" rel="stylesheet" href="styles/prettify-tomorrow.css">
  12. <link type="text/css" rel="stylesheet" href="styles/jsdoc-default.css">
  13. </head>
  14. <body>
  15. <div id="main">
  16. <h1 class="page-title">Source: src/server.js</h1>
  17. <section>
  18. <article>
  19. <pre class="prettyprint source linenums"><code>/**
  20. * @license
  21. * Copyright 2015 gRPC authors.
  22. *
  23. * Licensed under the Apache License, Version 2.0 (the "License");
  24. * you may not use this file except in compliance with the License.
  25. * You may obtain a copy of the License at
  26. *
  27. * http://www.apache.org/licenses/LICENSE-2.0
  28. *
  29. * Unless required by applicable law or agreed to in writing, software
  30. * distributed under the License is distributed on an "AS IS" BASIS,
  31. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  32. * See the License for the specific language governing permissions and
  33. * limitations under the License.
  34. *
  35. */
  36. 'use strict';
  37. var _ = require('lodash');
  38. var grpc = require('./grpc_extension');
  39. var common = require('./common');
  40. var Metadata = require('./metadata');
  41. var constants = require('./constants');
  42. var stream = require('stream');
  43. var Readable = stream.Readable;
  44. var Writable = stream.Writable;
  45. var Duplex = stream.Duplex;
  46. var util = require('util');
  47. var EventEmitter = require('events').EventEmitter;
  48. /**
  49. * Handle an error on a call by sending it as a status
  50. * @private
  51. * @param {grpc.internal~Call} call The call to send the error on
  52. * @param {(Object|Error)} error The error object
  53. */
  54. function handleError(call, error) {
  55. var statusMetadata = new Metadata();
  56. var status = {
  57. code: constants.status.UNKNOWN,
  58. details: 'Unknown Error'
  59. };
  60. if (error.hasOwnProperty('message')) {
  61. status.details = error.message;
  62. }
  63. if (error.hasOwnProperty('code')) {
  64. status.code = error.code;
  65. if (error.hasOwnProperty('details')) {
  66. status.details = error.details;
  67. }
  68. }
  69. if (error.hasOwnProperty('metadata')) {
  70. statusMetadata = error.metadata;
  71. }
  72. status.metadata = statusMetadata._getCoreRepresentation();
  73. var error_batch = {};
  74. if (!call.metadataSent) {
  75. error_batch[grpc.opType.SEND_INITIAL_METADATA] =
  76. (new Metadata())._getCoreRepresentation();
  77. }
  78. error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
  79. call.startBatch(error_batch, function(){});
  80. }
  81. /**
  82. * Send a response to a unary or client streaming call.
  83. * @private
  84. * @param {grpc.Call} call The call to respond on
  85. * @param {*} value The value to respond with
  86. * @param {grpc~serialize} serialize Serialization function for the
  87. * response
  88. * @param {grpc.Metadata=} metadata Optional trailing metadata to send with
  89. * status
  90. * @param {number=} [flags=0] Flags for modifying how the message is sent.
  91. */
  92. function sendUnaryResponse(call, value, serialize, metadata, flags) {
  93. var end_batch = {};
  94. var statusMetadata = new Metadata();
  95. var status = {
  96. code: constants.status.OK,
  97. details: 'OK'
  98. };
  99. if (metadata) {
  100. statusMetadata = metadata;
  101. }
  102. var message;
  103. try {
  104. message = serialize(value);
  105. } catch (e) {
  106. e.code = constants.status.INTERNAL;
  107. handleError(call, e);
  108. return;
  109. }
  110. status.metadata = statusMetadata._getCoreRepresentation();
  111. if (!call.metadataSent) {
  112. end_batch[grpc.opType.SEND_INITIAL_METADATA] =
  113. (new Metadata())._getCoreRepresentation();
  114. call.metadataSent = true;
  115. }
  116. message.grpcWriteFlags = flags;
  117. end_batch[grpc.opType.SEND_MESSAGE] = message;
  118. end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
  119. call.startBatch(end_batch, function (){});
  120. }
  121. /**
  122. * Initialize a writable stream. This is used for both the writable and duplex
  123. * stream constructors.
  124. * @private
  125. * @param {Writable} stream The stream to set up
  126. * @param {function(*):Buffer=} Serialization function for responses
  127. */
  128. function setUpWritable(stream, serialize) {
  129. stream.finished = false;
  130. stream.status = {
  131. code : constants.status.OK,
  132. details : 'OK',
  133. metadata : new Metadata()
  134. };
  135. stream.serialize = common.wrapIgnoreNull(serialize);
  136. function sendStatus() {
  137. var batch = {};
  138. if (!stream.call.metadataSent) {
  139. stream.call.metadataSent = true;
  140. batch[grpc.opType.SEND_INITIAL_METADATA] =
  141. (new Metadata())._getCoreRepresentation();
  142. }
  143. if (stream.status.metadata) {
  144. stream.status.metadata = stream.status.metadata._getCoreRepresentation();
  145. }
  146. batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status;
  147. stream.call.startBatch(batch, function(){});
  148. }
  149. stream.on('finish', sendStatus);
  150. /**
  151. * Set the pending status to a given error status. If the error does not have
  152. * code or details properties, the code will be set to grpc.status.UNKNOWN
  153. * and the details will be set to 'Unknown Error'.
  154. * @param {Error} err The error object
  155. */
  156. function setStatus(err) {
  157. var code = constants.status.UNKNOWN;
  158. var details = 'Unknown Error';
  159. var metadata = new Metadata();
  160. if (err.hasOwnProperty('message')) {
  161. details = err.message;
  162. }
  163. if (err.hasOwnProperty('code')) {
  164. code = err.code;
  165. if (err.hasOwnProperty('details')) {
  166. details = err.details;
  167. }
  168. }
  169. if (err.hasOwnProperty('metadata')) {
  170. metadata = err.metadata;
  171. }
  172. stream.status = {code: code, details: details, metadata: metadata};
  173. }
  174. /**
  175. * Terminate the call. This includes indicating that reads are done, draining
  176. * all pending writes, and sending the given error as a status
  177. * @param {Error} err The error object
  178. * @this GrpcServerStream
  179. */
  180. function terminateCall(err) {
  181. // Drain readable data
  182. setStatus(err);
  183. stream.end();
  184. }
  185. stream.on('error', terminateCall);
  186. /**
  187. * Override of Writable#end method that allows for sending metadata with a
  188. * success status.
  189. * @param {Metadata=} metadata Metadata to send with the status
  190. */
  191. stream.end = function(metadata) {
  192. if (metadata) {
  193. stream.status.metadata = metadata;
  194. }
  195. Writable.prototype.end.call(this);
  196. };
  197. }
  198. /**
  199. * Initialize a readable stream. This is used for both the readable and duplex
  200. * stream constructors.
  201. * @private
  202. * @param {Readable} stream The stream to initialize
  203. * @param {grpc~deserialize} deserialize Deserialization function for
  204. * incoming data.
  205. */
  206. function setUpReadable(stream, deserialize) {
  207. stream.deserialize = common.wrapIgnoreNull(deserialize);
  208. stream.finished = false;
  209. stream.reading = false;
  210. stream.terminate = function() {
  211. stream.finished = true;
  212. stream.on('data', function() {});
  213. };
  214. stream.on('cancelled', function() {
  215. stream.terminate();
  216. });
  217. }
  218. /**
  219. * Emitted when the call has been cancelled. After this has been emitted, the
  220. * call's `cancelled` property will be set to `true`.
  221. * @event grpc~ServerUnaryCall~cancelled
  222. */
  223. util.inherits(ServerUnaryCall, EventEmitter);
  224. /**
  225. * An EventEmitter. Used for unary calls.
  226. * @constructor grpc~ServerUnaryCall
  227. * @extends external:EventEmitter
  228. * @param {grpc.internal~Call} call The call object associated with the request
  229. * @param {grpc.Metadata} metadata The request metadata from the client
  230. */
  231. function ServerUnaryCall(call, metadata) {
  232. EventEmitter.call(this);
  233. this.call = call;
  234. /**
  235. * Indicates if the call has been cancelled
  236. * @member {boolean} grpc~ServerUnaryCall#cancelled
  237. */
  238. this.cancelled = false;
  239. /**
  240. * The request metadata from the client
  241. * @member {grpc.Metadata} grpc~ServerUnaryCall#metadata
  242. */
  243. this.metadata = metadata;
  244. /**
  245. * The request message from the client
  246. * @member {*} grpc~ServerUnaryCall#request
  247. */
  248. this.request = undefined;
  249. }
  250. /**
  251. * Emitted when the call has been cancelled. After this has been emitted, the
  252. * call's `cancelled` property will be set to `true`.
  253. * @event grpc~ServerWritableStream~cancelled
  254. */
  255. util.inherits(ServerWritableStream, Writable);
  256. /**
  257. * A stream that the server can write to. Used for calls that are streaming from
  258. * the server side.
  259. * @constructor grpc~ServerWritableStream
  260. * @extends external:Writable
  261. * @borrows grpc~ServerUnaryCall#sendMetadata as
  262. * grpc~ServerWritableStream#sendMetadata
  263. * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerWritableStream#getPeer
  264. * @param {grpc.internal~Call} call The call object to send data with
  265. * @param {grpc.Metadata} metadata The request metadata from the client
  266. * @param {grpc~serialize} serialize Serialization function for writes
  267. */
  268. function ServerWritableStream(call, metadata, serialize) {
  269. Writable.call(this, {objectMode: true});
  270. this.call = call;
  271. this.finished = false;
  272. setUpWritable(this, serialize);
  273. /**
  274. * Indicates if the call has been cancelled
  275. * @member {boolean} grpc~ServerWritableStream#cancelled
  276. */
  277. this.cancelled = false;
  278. /**
  279. * The request metadata from the client
  280. * @member {grpc.Metadata} grpc~ServerWritableStream#metadata
  281. */
  282. this.metadata = metadata;
  283. /**
  284. * The request message from the client
  285. * @member {*} grpc~ServerWritableStream#request
  286. */
  287. this.request = undefined;
  288. }
  289. /**
  290. * Start writing a chunk of data. This is an implementation of a method required
  291. * for implementing stream.Writable.
  292. * @private
  293. * @param {Buffer} chunk The chunk of data to write
  294. * @param {string} encoding Used to pass write flags
  295. * @param {function(Error=)} callback Callback to indicate that the write is
  296. * complete
  297. */
  298. function _write(chunk, encoding, callback) {
  299. /* jshint validthis: true */
  300. var batch = {};
  301. var self = this;
  302. var message;
  303. try {
  304. message = this.serialize(chunk);
  305. } catch (e) {
  306. e.code = constants.status.INTERNAL;
  307. callback(e);
  308. return;
  309. }
  310. if (!this.call.metadataSent) {
  311. batch[grpc.opType.SEND_INITIAL_METADATA] =
  312. (new Metadata())._getCoreRepresentation();
  313. this.call.metadataSent = true;
  314. }
  315. if (_.isFinite(encoding)) {
  316. /* Attach the encoding if it is a finite number. This is the closest we
  317. * can get to checking that it is valid flags */
  318. message.grpcWriteFlags = encoding;
  319. }
  320. batch[grpc.opType.SEND_MESSAGE] = message;
  321. this.call.startBatch(batch, function(err, value) {
  322. if (err) {
  323. self.emit('error', err);
  324. return;
  325. }
  326. callback();
  327. });
  328. }
  329. ServerWritableStream.prototype._write = _write;
  330. /**
  331. * Emitted when the call has been cancelled. After this has been emitted, the
  332. * call's `cancelled` property will be set to `true`.
  333. * @event grpc~ServerReadableStream~cancelled
  334. */
  335. util.inherits(ServerReadableStream, Readable);
  336. /**
  337. * A stream that the server can read from. Used for calls that are streaming
  338. * from the client side.
  339. * @constructor grpc~ServerReadableStream
  340. * @extends external:Readable
  341. * @borrows grpc~ServerUnaryCall#sendMetadata as
  342. * grpc~ServerReadableStream#sendMetadata
  343. * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerReadableStream#getPeer
  344. * @param {grpc.internal~Call} call The call object to read data with
  345. * @param {grpc.Metadata} metadata The request metadata from the client
  346. * @param {grpc~deserialize} deserialize Deserialization function for reads
  347. */
  348. function ServerReadableStream(call, metadata, deserialize) {
  349. Readable.call(this, {objectMode: true});
  350. this.call = call;
  351. setUpReadable(this, deserialize);
  352. /**
  353. * Indicates if the call has been cancelled
  354. * @member {boolean} grpc~ServerReadableStream#cancelled
  355. */
  356. this.cancelled = false;
  357. /**
  358. * The request metadata from the client
  359. * @member {grpc.Metadata} grpc~ServerReadableStream#metadata
  360. */
  361. this.metadata = metadata;
  362. }
  363. /**
  364. * Start reading from the gRPC data source. This is an implementation of a
  365. * method required for implementing stream.Readable
  366. * @access private
  367. * @param {number} size Ignored
  368. */
  369. function _read(size) {
  370. /* jshint validthis: true */
  371. var self = this;
  372. /**
  373. * Callback to be called when a READ event is received. Pushes the data onto
  374. * the read queue and starts reading again if applicable
  375. * @param {grpc.Event} event READ event object
  376. */
  377. function readCallback(err, event) {
  378. if (err) {
  379. self.terminate();
  380. return;
  381. }
  382. if (self.finished) {
  383. self.push(null);
  384. return;
  385. }
  386. var data = event.read;
  387. var deserialized;
  388. try {
  389. deserialized = self.deserialize(data);
  390. } catch (e) {
  391. e.code = constants.status.INTERNAL;
  392. self.emit('error', e);
  393. return;
  394. }
  395. if (self.push(deserialized) &amp;&amp; data !== null) {
  396. var read_batch = {};
  397. read_batch[grpc.opType.RECV_MESSAGE] = true;
  398. self.call.startBatch(read_batch, readCallback);
  399. } else {
  400. self.reading = false;
  401. }
  402. }
  403. if (self.finished) {
  404. self.push(null);
  405. } else {
  406. if (!self.reading) {
  407. self.reading = true;
  408. var batch = {};
  409. batch[grpc.opType.RECV_MESSAGE] = true;
  410. self.call.startBatch(batch, readCallback);
  411. }
  412. }
  413. }
  414. ServerReadableStream.prototype._read = _read;
  415. /**
  416. * Emitted when the call has been cancelled. After this has been emitted, the
  417. * call's `cancelled` property will be set to `true`.
  418. * @event grpc~ServerDuplexStream~cancelled
  419. */
  420. util.inherits(ServerDuplexStream, Duplex);
  421. /**
  422. * A stream that the server can read from or write to. Used for calls with
  423. * duplex streaming.
  424. * @constructor grpc~ServerDuplexStream
  425. * @extends external:Duplex
  426. * @borrows grpc~ServerUnaryCall#sendMetadata as
  427. * grpc~ServerDuplexStream#sendMetadata
  428. * @borrows grpc~ServerUnaryCall#getPeer as grpc~ServerDuplexStream#getPeer
  429. * @param {grpc.internal~Call} call Call object to proxy
  430. * @param {grpc.Metadata} metadata The request metadata from the client
  431. * @param {grpc~serialize} serialize Serialization function for requests
  432. * @param {grpc~deserialize} deserialize Deserialization function for
  433. * responses
  434. */
  435. function ServerDuplexStream(call, metadata, serialize, deserialize) {
  436. Duplex.call(this, {objectMode: true});
  437. this.call = call;
  438. setUpWritable(this, serialize);
  439. setUpReadable(this, deserialize);
  440. /**
  441. * Indicates if the call has been cancelled
  442. * @member {boolean} grpc~ServerReadableStream#cancelled
  443. */
  444. this.cancelled = false;
  445. /**
  446. * The request metadata from the client
  447. * @member {grpc.Metadata} grpc~ServerReadableStream#metadata
  448. */
  449. this.metadata = metadata;
  450. }
  451. ServerDuplexStream.prototype._read = _read;
  452. ServerDuplexStream.prototype._write = _write;
  453. /**
  454. * Send the initial metadata for a writable stream.
  455. * @alias grpc~ServerUnaryCall#sendMetadata
  456. * @param {Metadata} responseMetadata Metadata to send
  457. */
  458. function sendMetadata(responseMetadata) {
  459. /* jshint validthis: true */
  460. var self = this;
  461. if (!this.call.metadataSent) {
  462. this.call.metadataSent = true;
  463. var batch = {};
  464. batch[grpc.opType.SEND_INITIAL_METADATA] =
  465. responseMetadata._getCoreRepresentation();
  466. this.call.startBatch(batch, function(err) {
  467. if (err) {
  468. self.emit('error', err);
  469. return;
  470. }
  471. });
  472. }
  473. }
  474. ServerUnaryCall.prototype.sendMetadata = sendMetadata;
  475. ServerWritableStream.prototype.sendMetadata = sendMetadata;
  476. ServerReadableStream.prototype.sendMetadata = sendMetadata;
  477. ServerDuplexStream.prototype.sendMetadata = sendMetadata;
  478. /**
  479. * Get the endpoint this call/stream is connected to.
  480. * @alias grpc~ServerUnaryCall#getPeer
  481. * @return {string} The URI of the endpoint
  482. */
  483. function getPeer() {
  484. /* jshint validthis: true */
  485. return this.call.getPeer();
  486. }
  487. ServerUnaryCall.prototype.getPeer = getPeer;
  488. ServerReadableStream.prototype.getPeer = getPeer;
  489. ServerWritableStream.prototype.getPeer = getPeer;
  490. ServerDuplexStream.prototype.getPeer = getPeer;
  491. /**
  492. * Wait for the client to close, then emit a cancelled event if the client
  493. * cancelled.
  494. * @private
  495. */
  496. function waitForCancel() {
  497. /* jshint validthis: true */
  498. var self = this;
  499. var cancel_batch = {};
  500. cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
  501. self.call.startBatch(cancel_batch, function(err, result) {
  502. if (err) {
  503. self.emit('error', err);
  504. }
  505. if (result.cancelled) {
  506. self.cancelled = true;
  507. self.emit('cancelled');
  508. }
  509. });
  510. }
  511. ServerUnaryCall.prototype.waitForCancel = waitForCancel;
  512. ServerReadableStream.prototype.waitForCancel = waitForCancel;
  513. ServerWritableStream.prototype.waitForCancel = waitForCancel;
  514. ServerDuplexStream.prototype.waitForCancel = waitForCancel;
  515. /**
  516. * Callback function passed to server handlers that handle methods with unary
  517. * responses.
  518. * @callback grpc.Server~sendUnaryData
  519. * @param {grpc~ServiceError} error An error, if the call failed
  520. * @param {*} value The response value. Must be a valid argument to the
  521. * `responseSerialize` method of the method that is being handled
  522. * @param {grpc.Metadata=} trailer Trailing metadata to send, if applicable
  523. * @param {grpc.writeFlags=} flags Flags to modify writing the response
  524. */
  525. /**
  526. * User-provided method to handle unary requests on a server
  527. * @callback grpc.Server~handleUnaryCall
  528. * @param {grpc~ServerUnaryCall} call The call object
  529. * @param {grpc.Server~sendUnaryData} callback The callback to call to respond
  530. * to the request
  531. */
  532. /**
  533. * Fully handle a unary call
  534. * @private
  535. * @param {grpc.internal~Call} call The call to handle
  536. * @param {Object} handler Request handler object for the method that was called
  537. * @param {grpc~Server.handleUnaryCall} handler.func The handler function
  538. * @param {grpc~deserialize} handler.deserialize The deserialization function
  539. * for request data
  540. * @param {grpc~serialize} handler.serialize The serialization function for
  541. * response data
  542. * @param {grpc.Metadata} metadata Metadata from the client
  543. */
  544. function handleUnary(call, handler, metadata) {
  545. var emitter = new ServerUnaryCall(call, metadata);
  546. emitter.on('error', function(error) {
  547. handleError(call, error);
  548. });
  549. emitter.waitForCancel();
  550. var batch = {};
  551. batch[grpc.opType.RECV_MESSAGE] = true;
  552. call.startBatch(batch, function(err, result) {
  553. if (err) {
  554. handleError(call, err);
  555. return;
  556. }
  557. try {
  558. emitter.request = handler.deserialize(result.read);
  559. } catch (e) {
  560. e.code = constants.status.INTERNAL;
  561. handleError(call, e);
  562. return;
  563. }
  564. if (emitter.cancelled) {
  565. return;
  566. }
  567. handler.func(emitter, function sendUnaryData(err, value, trailer, flags) {
  568. if (err) {
  569. if (trailer) {
  570. err.metadata = trailer;
  571. }
  572. handleError(call, err);
  573. } else {
  574. sendUnaryResponse(call, value, handler.serialize, trailer, flags);
  575. }
  576. });
  577. });
  578. }
  579. /**
  580. * User provided method to handle server streaming methods on the server.
  581. * @callback grpc.Server~handleServerStreamingCall
  582. * @param {grpc~ServerWritableStream} call The call object
  583. */
  584. /**
  585. * Fully handle a server streaming call
  586. * @private
  587. * @param {grpc.internal~Call} call The call to handle
  588. * @param {Object} handler Request handler object for the method that was called
  589. * @param {grpc~Server.handleServerStreamingCall} handler.func The handler
  590. * function
  591. * @param {grpc~deserialize} handler.deserialize The deserialization function
  592. * for request data
  593. * @param {grpc~serialize} handler.serialize The serialization function for
  594. * response data
  595. * @param {grpc.Metadata} metadata Metadata from the client
  596. */
  597. function handleServerStreaming(call, handler, metadata) {
  598. var stream = new ServerWritableStream(call, metadata, handler.serialize);
  599. stream.waitForCancel();
  600. var batch = {};
  601. batch[grpc.opType.RECV_MESSAGE] = true;
  602. call.startBatch(batch, function(err, result) {
  603. if (err) {
  604. stream.emit('error', err);
  605. return;
  606. }
  607. try {
  608. stream.request = handler.deserialize(result.read);
  609. } catch (e) {
  610. e.code = constants.status.INTERNAL;
  611. stream.emit('error', e);
  612. return;
  613. }
  614. handler.func(stream);
  615. });
  616. }
  617. /**
  618. * User provided method to handle client streaming methods on the server.
  619. * @callback grpc.Server~handleClientStreamingCall
  620. * @param {grpc~ServerReadableStream} call The call object
  621. * @param {grpc.Server~sendUnaryData} callback The callback to call to respond
  622. * to the request
  623. */
  624. /**
  625. * Fully handle a client streaming call
  626. * @access private
  627. * @param {grpc.internal~Call} call The call to handle
  628. * @param {Object} handler Request handler object for the method that was called
  629. * @param {grpc~Server.handleClientStreamingCall} handler.func The handler
  630. * function
  631. * @param {grpc~deserialize} handler.deserialize The deserialization function
  632. * for request data
  633. * @param {grpc~serialize} handler.serialize The serialization function for
  634. * response data
  635. * @param {grpc.Metadata} metadata Metadata from the client
  636. */
  637. function handleClientStreaming(call, handler, metadata) {
  638. var stream = new ServerReadableStream(call, metadata, handler.deserialize);
  639. stream.on('error', function(error) {
  640. handleError(call, error);
  641. });
  642. stream.waitForCancel();
  643. handler.func(stream, function(err, value, trailer, flags) {
  644. stream.terminate();
  645. if (err) {
  646. if (trailer) {
  647. err.metadata = trailer;
  648. }
  649. handleError(call, err);
  650. } else {
  651. sendUnaryResponse(call, value, handler.serialize, trailer, flags);
  652. }
  653. });
  654. }
  655. /**
  656. * User provided method to handle bidirectional streaming calls on the server.
  657. * @callback grpc.Server~handleBidiStreamingCall
  658. * @param {grpc~ServerDuplexStream} call The call object
  659. */
  660. /**
  661. * Fully handle a bidirectional streaming call
  662. * @private
  663. * @param {grpc.internal~Call} call The call to handle
  664. * @param {Object} handler Request handler object for the method that was called
  665. * @param {grpc~Server.handleBidiStreamingCall} handler.func The handler
  666. * function
  667. * @param {grpc~deserialize} handler.deserialize The deserialization function
  668. * for request data
  669. * @param {grpc~serialize} handler.serialize The serialization function for
  670. * response data
  671. * @param {Metadata} metadata Metadata from the client
  672. */
  673. function handleBidiStreaming(call, handler, metadata) {
  674. var stream = new ServerDuplexStream(call, metadata, handler.serialize,
  675. handler.deserialize);
  676. stream.waitForCancel();
  677. handler.func(stream);
  678. }
  679. var streamHandlers = {
  680. unary: handleUnary,
  681. server_stream: handleServerStreaming,
  682. client_stream: handleClientStreaming,
  683. bidi: handleBidiStreaming
  684. };
  685. /**
  686. * Constructs a server object that stores request handlers and delegates
  687. * incoming requests to those handlers
  688. * @memberof grpc
  689. * @constructor
  690. * @param {Object=} options Options that should be passed to the internal server
  691. * implementation
  692. * @example
  693. * var server = new grpc.Server();
  694. * server.addProtoService(protobuf_service_descriptor, service_implementation);
  695. * server.bind('address:port', server_credential);
  696. * server.start();
  697. */
  698. function Server(options) {
  699. this.handlers = {};
  700. var server = new grpc.Server(options);
  701. this._server = server;
  702. this.started = false;
  703. }
  704. /**
  705. * Start the server and begin handling requests
  706. */
  707. Server.prototype.start = function() {
  708. if (this.started) {
  709. throw new Error('Server is already running');
  710. }
  711. var self = this;
  712. this.started = true;
  713. this._server.start();
  714. /**
  715. * Handles the SERVER_RPC_NEW event. If there is a handler associated with
  716. * the requested method, use that handler to respond to the request. Then
  717. * wait for the next request
  718. * @param {grpc.internal~Event} event The event to handle with tag
  719. * SERVER_RPC_NEW
  720. */
  721. function handleNewCall(err, event) {
  722. if (err) {
  723. return;
  724. }
  725. var details = event.new_call;
  726. var call = details.call;
  727. var method = details.method;
  728. var metadata = Metadata._fromCoreRepresentation(details.metadata);
  729. if (method === null) {
  730. return;
  731. }
  732. self._server.requestCall(handleNewCall);
  733. var handler;
  734. if (self.handlers.hasOwnProperty(method)) {
  735. handler = self.handlers[method];
  736. } else {
  737. var batch = {};
  738. batch[grpc.opType.SEND_INITIAL_METADATA] =
  739. (new Metadata())._getCoreRepresentation();
  740. batch[grpc.opType.SEND_STATUS_FROM_SERVER] = {
  741. code: constants.status.UNIMPLEMENTED,
  742. details: '',
  743. metadata: {}
  744. };
  745. batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
  746. call.startBatch(batch, function() {});
  747. return;
  748. }
  749. streamHandlers[handler.type](call, handler, metadata);
  750. }
  751. this._server.requestCall(handleNewCall);
  752. };
  753. /**
  754. * Unified type for application handlers for all types of calls
  755. * @typedef {(grpc.Server~handleUnaryCall
  756. * |grpc.Server~handleClientStreamingCall
  757. * |grpc.Server~handleServerStreamingCall
  758. * |grpc.Server~handleBidiStreamingCall)} grpc.Server~handleCall
  759. */
  760. /**
  761. * Registers a handler to handle the named method. Fails if there already is
  762. * a handler for the given method. Returns true on success
  763. * @param {string} name The name of the method that the provided function should
  764. * handle/respond to.
  765. * @param {grpc.Server~handleCall} handler Function that takes a stream of
  766. * request values and returns a stream of response values
  767. * @param {grpc~serialize} serialize Serialization function for responses
  768. * @param {grpc~deserialize} deserialize Deserialization function for requests
  769. * @param {string} type The streaming type of method that this handles
  770. * @return {boolean} True if the handler was set. False if a handler was already
  771. * set for that name.
  772. */
  773. Server.prototype.register = function(name, handler, serialize, deserialize,
  774. type) {
  775. if (this.handlers.hasOwnProperty(name)) {
  776. return false;
  777. }
  778. this.handlers[name] = {
  779. func: handler,
  780. serialize: serialize,
  781. deserialize: deserialize,
  782. type: type
  783. };
  784. return true;
  785. };
  786. /**
  787. * Gracefully shuts down the server. The server will stop receiving new calls,
  788. * and any pending calls will complete. The callback will be called when all
  789. * pending calls have completed and the server is fully shut down. This method
  790. * is idempotent with itself and forceShutdown.
  791. * @param {function()} callback The shutdown complete callback
  792. */
  793. Server.prototype.tryShutdown = function(callback) {
  794. this._server.tryShutdown(callback);
  795. };
  796. /**
  797. * Forcibly shuts down the server. The server will stop receiving new calls
  798. * and cancel all pending calls. When it returns, the server has shut down.
  799. * This method is idempotent with itself and tryShutdown, and it will trigger
  800. * any outstanding tryShutdown callbacks.
  801. */
  802. Server.prototype.forceShutdown = function() {
  803. this._server.forceShutdown();
  804. };
  805. var unimplementedStatusResponse = {
  806. code: constants.status.UNIMPLEMENTED,
  807. details: 'The server does not implement this method'
  808. };
  809. var defaultHandler = {
  810. unary: function(call, callback) {
  811. callback(unimplementedStatusResponse);
  812. },
  813. client_stream: function(call, callback) {
  814. callback(unimplementedStatusResponse);
  815. },
  816. server_stream: function(call) {
  817. call.emit('error', unimplementedStatusResponse);
  818. },
  819. bidi: function(call) {
  820. call.emit('error', unimplementedStatusResponse);
  821. }
  822. };
  823. /**
  824. * Add a service to the server, with a corresponding implementation.
  825. * @param {grpc~ServiceDefinition} service The service descriptor
  826. * @param {Object&lt;String, grpc.Server~handleCall>} implementation Map of method
  827. * names to method implementation for the provided service.
  828. */
  829. Server.prototype.addService = function(service, implementation) {
  830. if (!_.isObject(service) || !_.isObject(implementation)) {
  831. throw new Error('addService requires two objects as arguments');
  832. }
  833. if (_.keys(service).length === 0) {
  834. throw new Error('Cannot add an empty service to a server');
  835. }
  836. if (this.started) {
  837. throw new Error('Can\'t add a service to a started server.');
  838. }
  839. var self = this;
  840. _.forOwn(service, function(attrs, name) {
  841. var method_type;
  842. if (attrs.requestStream) {
  843. if (attrs.responseStream) {
  844. method_type = 'bidi';
  845. } else {
  846. method_type = 'client_stream';
  847. }
  848. } else {
  849. if (attrs.responseStream) {
  850. method_type = 'server_stream';
  851. } else {
  852. method_type = 'unary';
  853. }
  854. }
  855. var impl;
  856. if (implementation[name] === undefined) {
  857. /* Handle the case where the method is passed with the name exactly as
  858. written in the proto file, instead of using JavaScript function
  859. naming style */
  860. if (implementation[attrs.originalName] === undefined) {
  861. common.log(constants.logVerbosity.ERROR, 'Method handler ' + name +
  862. ' for ' + attrs.path + ' expected but not provided');
  863. impl = defaultHandler[method_type];
  864. } else {
  865. impl = _.bind(implementation[attrs.originalName], implementation);
  866. }
  867. } else {
  868. impl = _.bind(implementation[name], implementation);
  869. }
  870. var serialize = attrs.responseSerialize;
  871. var deserialize = attrs.requestDeserialize;
  872. var register_success = self.register(attrs.path, impl, serialize,
  873. deserialize, method_type);
  874. if (!register_success) {
  875. throw new Error('Method handler for ' + attrs.path +
  876. ' already provided.');
  877. }
  878. });
  879. };
  880. /**
  881. * Add a proto service to the server, with a corresponding implementation
  882. * @deprecated Use {@link grpc.Server#addService} instead
  883. * @param {Protobuf.Reflect.Service} service The proto service descriptor
  884. * @param {Object&lt;String, grpc.Server~handleCall>} implementation Map of method
  885. * names to method implementation for the provided service.
  886. */
  887. Server.prototype.addProtoService = util.deprecate(function(service,
  888. implementation) {
  889. var options;
  890. var protobuf_js_5_common = require('./protobuf_js_5_common');
  891. var protobuf_js_6_common = require('./protobuf_js_6_common');
  892. if (protobuf_js_5_common.isProbablyProtobufJs5(service)) {
  893. options = _.defaults(service.grpc_options, common.defaultGrpcOptions);
  894. this.addService(
  895. protobuf_js_5_common.getProtobufServiceAttrs(service, options),
  896. implementation);
  897. } else if (protobuf_js_6_common.isProbablyProtobufJs6(service)) {
  898. options = _.defaults(service.grpc_options, common.defaultGrpcOptions);
  899. this.addService(
  900. protobuf_js_6_common.getProtobufServiceAttrs(service, options),
  901. implementation);
  902. } else {
  903. // We assume that this is a service attributes object
  904. this.addService(service, implementation);
  905. }
  906. }, 'Server#addProtoService: Use Server#addService instead');
  907. /**
  908. * Binds the server to the given port, with SSL disabled if creds is an
  909. * insecure credentials object
  910. * @param {string} port The port that the server should bind on, in the format
  911. * "address:port"
  912. * @param {grpc.ServerCredentials} creds Server credential object to be used for
  913. * SSL. Pass an insecure credentials object for an insecure port.
  914. */
  915. Server.prototype.bind = function(port, creds) {
  916. if (this.started) {
  917. throw new Error('Can\'t bind an already running server to an address');
  918. }
  919. return this._server.addHttp2Port(port, creds);
  920. };
  921. exports.Server = Server;
  922. </code></pre>
  923. </article>
  924. </section>
  925. </div>
  926. <nav>
  927. <h2><a href="index.html">Home</a></h2><h3>Externals</h3><ul><li><a href="external-Duplex.html">Duplex</a></li><li><a href="external-EventEmitter.html">EventEmitter</a></li><li><a href="external-GoogleCredential.html">GoogleCredential</a></li><li><a href="external-Readable.html">Readable</a></li><li><a href="external-Writable.html">Writable</a></li></ul><h3>Classes</h3><ul><li><a href="grpc.Client.html">Client</a></li><li><a href="grpc.credentials-CallCredentials.html">CallCredentials</a></li><li><a href="grpc.credentials-ChannelCredentials.html">ChannelCredentials</a></li><li><a href="grpc.Metadata.html">Metadata</a></li><li><a href="grpc.Server.html">Server</a></li><li><a href="grpc.ServerCredentials.html">ServerCredentials</a></li><li><a href="grpc-ClientDuplexStream.html">ClientDuplexStream</a></li><li><a href="grpc-ClientReadableStream.html">ClientReadableStream</a></li><li><a href="grpc-ClientUnaryCall.html">ClientUnaryCall</a></li><li><a href="grpc-ClientWritableStream.html">ClientWritableStream</a></li><li><a href="grpc-ServerDuplexStream.html">ServerDuplexStream</a></li><li><a href="grpc-ServerReadableStream.html">ServerReadableStream</a></li><li><a href="grpc-ServerUnaryCall.html">ServerUnaryCall</a></li><li><a href="grpc-ServerWritableStream.html">ServerWritableStream</a></li></ul><h3>Events</h3><ul><li><a href="grpc-ClientDuplexStream.html#event:metadata">metadata</a></li><li><a href="grpc-ClientDuplexStream.html#event:status">status</a></li><li><a href="grpc-ClientReadableStream.html#event:metadata">metadata</a></li><li><a href="grpc-ClientReadableStream.html#event:status">status</a></li><li><a href="grpc-ClientUnaryCall.html#event:metadata">metadata</a></li><li><a href="grpc-ClientUnaryCall.html#event:status">status</a></li><li><a href="grpc-ClientWritableStream.html#event:metadata">metadata</a></li><li><a href="grpc-ClientWritableStream.html#event:status">status</a></li><li><a href="grpc-ServerDuplexStream.html#~event:cancelled">cancelled</a></li><li><a href="grpc-ServerReadableStream.html#~event:cancelled">cancelled</a></li><li><a href="grpc-ServerUnaryCall.html#~event:cancelled">cancelled</a></li><li><a href="grpc-ServerWritableStream.html#~event:cancelled">cancelled</a></li></ul><h3>Namespaces</h3><ul><li><a href="grpc.html">grpc</a></li><li><a href="grpc.credentials.html">credentials</a></li></ul>
  928. </nav>
  929. <br class="clear">
  930. <footer>
  931. Documentation generated by <a href="https://github.com/jsdoc3/jsdoc">JSDoc 3.5.3</a> on Fri Sep 08 2017 11:10:31 GMT-0700 (PDT)
  932. </footer>
  933. <script> prettyPrint(); </script>
  934. <script src="scripts/linenumber.js"> </script>
  935. </body>
  936. </html>