surface_server.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. /*
  2. *
  3. * Copyright 2014, Google Inc.
  4. * All rights reserved.
  5. *
  6. * Redistribution and use in source and binary forms, with or without
  7. * modification, are permitted provided that the following conditions are
  8. * met:
  9. *
  10. * * Redistributions of source code must retain the above copyright
  11. * notice, this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above
  13. * copyright notice, this list of conditions and the following disclaimer
  14. * in the documentation and/or other materials provided with the
  15. * distribution.
  16. * * Neither the name of Google Inc. nor the names of its
  17. * contributors may be used to endorse or promote products derived from
  18. * this software without specific prior written permission.
  19. *
  20. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  21. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  22. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  23. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  24. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  25. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  26. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  27. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  28. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  29. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  30. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  31. *
  32. */
  33. var _ = require('underscore');
  34. var capitalize = require('underscore.string/capitalize');
  35. var decapitalize = require('underscore.string/decapitalize');
  36. var Server = require('./server.js');
  37. var stream = require('stream');
  38. var Readable = stream.Readable;
  39. var Writable = stream.Writable;
  40. var Duplex = stream.Duplex;
  41. var util = require('util');
  42. var common = require('./common.js');
  43. util.inherits(ServerReadableObjectStream, Readable);
  44. /**
  45. * Class for representing a gRPC client streaming call as a Node stream on the
  46. * server side. Extends from stream.Readable.
  47. * @constructor
  48. * @param {stream} stream Underlying binary Duplex stream for the call
  49. */
  50. function ServerReadableObjectStream(stream) {
  51. var options = {objectMode: true};
  52. Readable.call(this, options);
  53. this._stream = stream;
  54. Object.defineProperty(this, 'cancelled', {
  55. get: function() { return stream.cancelled; }
  56. });
  57. var self = this;
  58. this._stream.on('data', function forwardData(chunk) {
  59. if (!self.push(chunk)) {
  60. self._stream.pause();
  61. }
  62. });
  63. this._stream.on('end', function forwardEnd() {
  64. self.push(null);
  65. });
  66. this._stream.pause();
  67. }
  68. /**
  69. * _read implementation for both types of streams that allow reading.
  70. * @this {ServerReadableObjectStream|ServerBidiObjectStream}
  71. * @param {number} size Ignored
  72. */
  73. function _read(size) {
  74. this._stream.resume();
  75. }
  76. /**
  77. * See docs for _read
  78. */
  79. ServerReadableObjectStream.prototype._read = _read;
  80. util.inherits(ServerWritableObjectStream, Writable);
  81. /**
  82. * Class for representing a gRPC server streaming call as a Node stream on the
  83. * server side. Extends from stream.Writable.
  84. * @constructor
  85. * @param {stream} stream Underlying binary Duplex stream for the call
  86. */
  87. function ServerWritableObjectStream(stream) {
  88. var options = {objectMode: true};
  89. Writable.call(this, options);
  90. this._stream = stream;
  91. this.on('finish', function() {
  92. this._stream.end();
  93. });
  94. }
  95. /**
  96. * _write implementation for both types of streams that allow writing
  97. * @this {ServerWritableObjectStream}
  98. * @param {*} chunk The value to write to the stream
  99. * @param {string} encoding Ignored
  100. * @param {function(Error)} callback Callback to call when finished writing
  101. */
  102. function _write(chunk, encoding, callback) {
  103. this._stream.write(chunk, encoding, callback);
  104. }
  105. /**
  106. * See docs for _write
  107. */
  108. ServerWritableObjectStream.prototype._write = _write;
  109. /**
  110. * Creates a binary stream handler function from a unary handler function
  111. * @param {function(Object, function(Error, *))} handler Unary call handler
  112. * @return {function(stream)} Binary stream handler
  113. */
  114. function makeUnaryHandler(handler) {
  115. /**
  116. * Handles a stream by reading a single data value, passing it to the handler,
  117. * and writing the response back to the stream.
  118. * @param {stream} stream Binary data stream
  119. */
  120. return function handleUnaryCall(stream) {
  121. stream.on('data', function handleUnaryData(value) {
  122. var call = {request: value};
  123. Object.defineProperty(call, 'cancelled', {
  124. get: function() { return stream.cancelled;}
  125. });
  126. handler(call, function sendUnaryData(err, value) {
  127. if (err) {
  128. stream.emit('error', err);
  129. } else {
  130. stream.write(value);
  131. stream.end();
  132. }
  133. });
  134. });
  135. };
  136. }
  137. /**
  138. * Creates a binary stream handler function from a client stream handler
  139. * function
  140. * @param {function(Readable, function(Error, *))} handler Client stream call
  141. * handler
  142. * @return {function(stream)} Binary stream handler
  143. */
  144. function makeClientStreamHandler(handler) {
  145. /**
  146. * Handles a stream by passing a deserializing stream to the handler and
  147. * writing the response back to the stream.
  148. * @param {stream} stream Binary data stream
  149. */
  150. return function handleClientStreamCall(stream) {
  151. var object_stream = new ServerReadableObjectStream(stream);
  152. handler(object_stream, function sendClientStreamData(err, value) {
  153. if (err) {
  154. stream.emit('error', err);
  155. } else {
  156. stream.write(value);
  157. stream.end();
  158. }
  159. });
  160. };
  161. }
  162. /**
  163. * Creates a binary stream handler function from a server stream handler
  164. * function
  165. * @param {function(Writable)} handler Server stream call handler
  166. * @return {function(stream)} Binary stream handler
  167. */
  168. function makeServerStreamHandler(handler) {
  169. /**
  170. * Handles a stream by attaching it to a serializing stream, and passing it to
  171. * the handler.
  172. * @param {stream} stream Binary data stream
  173. */
  174. return function handleServerStreamCall(stream) {
  175. stream.on('data', function handleClientData(value) {
  176. var object_stream = new ServerWritableObjectStream(stream);
  177. object_stream.request = value;
  178. handler(object_stream);
  179. });
  180. };
  181. }
  182. /**
  183. * Creates a binary stream handler function from a bidi stream handler function
  184. * @param {function(Duplex)} handler Unary call handler
  185. * @return {function(stream)} Binary stream handler
  186. */
  187. function makeBidiStreamHandler(handler) {
  188. return handler;
  189. }
  190. /**
  191. * Map with short names for each of the handler maker functions. Used in
  192. * makeServerConstructor
  193. */
  194. var handler_makers = {
  195. unary: makeUnaryHandler,
  196. server_stream: makeServerStreamHandler,
  197. client_stream: makeClientStreamHandler,
  198. bidi: makeBidiStreamHandler
  199. };
  200. /**
  201. * Creates a constructor for servers with a service defined by the methods
  202. * object. The methods object has string keys and values of this form:
  203. * {serialize: function, deserialize: function, client_stream: bool,
  204. * server_stream: bool}
  205. * @param {Object} methods Method descriptor for each method the server should
  206. * expose
  207. * @param {string} prefix The prefex to prepend to each method name
  208. * @return {function(Object, Object)} New server constructor
  209. */
  210. function makeServerConstructor(services) {
  211. var qual_names = [];
  212. _.each(services, function(service) {
  213. _.each(service.children, function(method) {
  214. var name = common.fullyQualifiedName(method);
  215. if (_.indexOf(qual_names, name) !== -1) {
  216. throw new Error('Method ' + name + ' exposed by more than one service');
  217. }
  218. qual_names.push(name);
  219. });
  220. });
  221. /**
  222. * Create a server with the given handlers for all of the methods.
  223. * @constructor
  224. * @param {Object} service_handlers Map from service names to map from method
  225. * names to handlers
  226. * @param {Object} options Options to pass to the underlying server
  227. */
  228. function SurfaceServer(service_handlers, options) {
  229. var server = new Server(options);
  230. this.inner_server = server;
  231. _.each(services, function(service) {
  232. var service_name = common.fullyQualifiedName(service);
  233. if (service_handlers[service_name] === undefined) {
  234. throw new Error('Handlers for service ' +
  235. service_name + ' not provided.');
  236. }
  237. var prefix = '/' + common.fullyQualifiedName(service) + '/';
  238. _.each(service.children, function(method) {
  239. var method_type;
  240. if (method.requestStream) {
  241. if (method.responseStream) {
  242. method_type = 'bidi';
  243. } else {
  244. method_type = 'client_stream';
  245. }
  246. } else {
  247. if (method.responseStream) {
  248. method_type = 'server_stream';
  249. } else {
  250. method_type = 'unary';
  251. }
  252. }
  253. if (service_handlers[service_name][decapitalize(method.name)] ===
  254. undefined) {
  255. throw new Error('Method handler for ' +
  256. common.fullyQualifiedName(method) + ' not provided.');
  257. }
  258. var binary_handler = handler_makers[method_type](
  259. service_handlers[service_name][decapitalize(method.name)]);
  260. var serialize = common.serializeCls(
  261. method.resolvedResponseType.build());
  262. var deserialize = common.deserializeCls(
  263. method.resolvedRequestType.build());
  264. server.register(prefix + capitalize(method.name), binary_handler,
  265. serialize, deserialize);
  266. });
  267. }, this);
  268. }
  269. /**
  270. * Binds the server to the given port, with SSL enabled if secure is specified
  271. * @param {string} port The port that the server should bind on, in the format
  272. * "address:port"
  273. * @param {boolean=} secure Whether the server should open a secure port
  274. * @return {SurfaceServer} this
  275. */
  276. SurfaceServer.prototype.bind = function(port, secure) {
  277. return this.inner_server.bind(port, secure);
  278. };
  279. /**
  280. * Starts the server listening on any bound ports
  281. * @return {SurfaceServer} this
  282. */
  283. SurfaceServer.prototype.listen = function() {
  284. this.inner_server.start();
  285. return this;
  286. };
  287. /**
  288. * Shuts the server down; tells it to stop listening for new requests and to
  289. * kill old requests.
  290. */
  291. SurfaceServer.prototype.shutdown = function() {
  292. this.inner_server.shutdown();
  293. };
  294. return SurfaceServer;
  295. }
  296. /**
  297. * See documentation for makeServerConstructor
  298. */
  299. exports.makeServerConstructor = makeServerConstructor;