benchmark_client.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. /*
  2. *
  3. * Copyright 2015 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. /**
  19. * Benchmark client module
  20. * @module
  21. */
  22. 'use strict';
  23. var fs = require('fs');
  24. var path = require('path');
  25. var util = require('util');
  26. var EventEmitter = require('events');
  27. var async = require('async');
  28. var _ = require('lodash');
  29. var PoissonProcess = require('poisson-process');
  30. var Histogram = require('./histogram');
  31. var genericService = require('./generic_service');
  32. var grpc = require('../../../');
  33. var serviceProto = grpc.load({
  34. root: __dirname + '/../../..',
  35. file: 'src/proto/grpc/testing/services.proto'}).grpc.testing;
  36. /**
  37. * Create a buffer filled with size zeroes
  38. * @param {number} size The length of the buffer
  39. * @return {Buffer} The new buffer
  40. */
  41. function zeroBuffer(size) {
  42. var zeros = new Buffer(size);
  43. zeros.fill(0);
  44. return zeros;
  45. }
  46. /**
  47. * Convert a time difference, as returned by process.hrtime, to a number of
  48. * nanoseconds.
  49. * @param {Array.<number>} time_diff The time diff, represented as
  50. * [seconds, nanoseconds]
  51. * @return {number} The total number of nanoseconds
  52. */
  53. function timeDiffToNanos(time_diff) {
  54. return time_diff[0] * 1e9 + time_diff[1];
  55. }
  56. /**
  57. * The BenchmarkClient class. Opens channels to servers and makes RPCs based on
  58. * parameters from the driver, and records statistics about those RPCs.
  59. * @param {Array.<string>} server_targets List of servers to connect to
  60. * @param {number} channels The total number of channels to open
  61. * @param {Object} histogram_params Options for setting up the histogram
  62. * @param {Object=} security_params Options for TLS setup. If absent, don't use
  63. * TLS
  64. */
  65. function BenchmarkClient(server_targets, channels, histogram_params,
  66. security_params) {
  67. var options = {
  68. "grpc.max_receive_message_length": -1,
  69. "grpc.max_send_message_length": -1
  70. };
  71. var creds;
  72. if (security_params) {
  73. var ca_path;
  74. if (security_params.use_test_ca) {
  75. ca_path = path.join(__dirname, '../test/data/ca.pem');
  76. var ca_data = fs.readFileSync(ca_path);
  77. creds = grpc.credentials.createSsl(ca_data);
  78. } else {
  79. creds = grpc.credentials.createSsl();
  80. }
  81. if (security_params.server_host_override) {
  82. var host_override = security_params.server_host_override;
  83. options['grpc.ssl_target_name_override'] = host_override;
  84. options['grpc.default_authority'] = host_override;
  85. }
  86. } else {
  87. creds = grpc.credentials.createInsecure();
  88. }
  89. this.clients = [];
  90. var GenericClient = grpc.makeGenericClientConstructor(genericService);
  91. this.genericClients = [];
  92. for (var i = 0; i < channels; i++) {
  93. this.clients[i] = new serviceProto.BenchmarkService(
  94. server_targets[i % server_targets.length], creds, options);
  95. this.genericClients[i] = new GenericClient(
  96. server_targets[i % server_targets.length], creds, options);
  97. }
  98. this.histogram = new Histogram(histogram_params.resolution,
  99. histogram_params.max_possible);
  100. this.running = false;
  101. this.pending_calls = 0;
  102. };
  103. util.inherits(BenchmarkClient, EventEmitter);
  104. /**
  105. * Start every client in the list of clients by waiting for each to be ready,
  106. * then starting outstanding_rpcs_per_channel calls on each of them
  107. * @param {Array<grpc.Client>} client_list The list of clients
  108. * @param {Number} outstanding_rpcs_per_channel The number of calls to start
  109. * on each client
  110. * @param {function(grpc.Client)} makeCall Function to make a single call on
  111. * a single client
  112. * @param {EventEmitter} emitter The event emitter to send errors on, if
  113. * necessary
  114. */
  115. function startAllClients(client_list, outstanding_rpcs_per_channel, makeCall,
  116. emitter) {
  117. var ready_wait_funcs = _.map(client_list, function(client) {
  118. return _.partial(grpc.waitForClientReady, client, Infinity);
  119. });
  120. async.parallel(ready_wait_funcs, function(err) {
  121. if (err) {
  122. emitter.emit('error', err);
  123. return;
  124. }
  125. _.each(client_list, function(client) {
  126. _.times(outstanding_rpcs_per_channel, function() {
  127. makeCall(client);
  128. });
  129. });
  130. });
  131. }
  132. /**
  133. * Start a closed-loop test. For each channel, start
  134. * outstanding_rpcs_per_channel RPCs. Then, whenever an RPC finishes, start
  135. * another one.
  136. * @param {number} outstanding_rpcs_per_channel Number of RPCs to start per
  137. * channel
  138. * @param {string} rpc_type Which method to call. Should be 'UNARY' or
  139. * 'STREAMING'
  140. * @param {number} req_size The size of the payload to send with each request
  141. * @param {number} resp_size The size of payload to request be sent in responses
  142. * @param {boolean} generic Indicates that the generic (non-proto) clients
  143. * should be used
  144. */
  145. BenchmarkClient.prototype.startClosedLoop = function(
  146. outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, generic) {
  147. var self = this;
  148. self.running = true;
  149. self.last_wall_time = process.hrtime();
  150. self.last_usage = process.cpuUsage();
  151. var makeCall;
  152. var argument;
  153. var client_list;
  154. if (generic) {
  155. argument = zeroBuffer(req_size);
  156. client_list = self.genericClients;
  157. } else {
  158. argument = {
  159. response_size: resp_size,
  160. payload: {
  161. body: zeroBuffer(req_size)
  162. }
  163. };
  164. client_list = self.clients;
  165. }
  166. if (rpc_type == 'UNARY') {
  167. makeCall = function(client) {
  168. if (self.running) {
  169. self.pending_calls++;
  170. var start_time = process.hrtime();
  171. client.unaryCall(argument, function(error, response) {
  172. if (error) {
  173. self.emit('error', new Error('Client error: ' + error.message));
  174. self.running = false;
  175. return;
  176. }
  177. var time_diff = process.hrtime(start_time);
  178. self.histogram.add(timeDiffToNanos(time_diff));
  179. makeCall(client);
  180. self.pending_calls--;
  181. if ((!self.running) && self.pending_calls == 0) {
  182. self.emit('finished');
  183. }
  184. });
  185. }
  186. };
  187. } else {
  188. makeCall = function(client) {
  189. if (self.running) {
  190. self.pending_calls++;
  191. var call = client.streamingCall();
  192. var start_time = process.hrtime();
  193. call.write(argument);
  194. call.on('data', function() {
  195. var time_diff = process.hrtime(start_time);
  196. self.histogram.add(timeDiffToNanos(time_diff));
  197. self.pending_calls--;
  198. if (self.running) {
  199. self.pending_calls++;
  200. start_time = process.hrtime();
  201. call.write(argument);
  202. } else {
  203. call.end();
  204. if (self.pending_calls == 0) {
  205. self.emit('finished');
  206. }
  207. }
  208. });
  209. call.on('error', function(error) {
  210. self.emit('error', new Error('Client error: ' + error.message));
  211. self.running = false;
  212. });
  213. }
  214. };
  215. }
  216. startAllClients(client_list, outstanding_rpcs_per_channel, makeCall, self);
  217. };
  218. /**
  219. * Start a poisson test. For each channel, this initiates a number of Poisson
  220. * processes equal to outstanding_rpcs_per_channel, where each Poisson process
  221. * has the load parameter offered_load.
  222. * @param {number} outstanding_rpcs_per_channel Number of RPCs to start per
  223. * channel
  224. * @param {string} rpc_type Which method to call. Should be 'UNARY' or
  225. * 'STREAMING'
  226. * @param {number} req_size The size of the payload to send with each request
  227. * @param {number} resp_size The size of payload to request be sent in responses
  228. * @param {number} offered_load The load parameter for the Poisson process
  229. * @param {boolean} generic Indicates that the generic (non-proto) clients
  230. * should be used
  231. */
  232. BenchmarkClient.prototype.startPoisson = function(
  233. outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, offered_load,
  234. generic) {
  235. var self = this;
  236. self.running = true;
  237. self.last_wall_time = process.hrtime();
  238. self.last_usage = process.cpuUsage();
  239. var makeCall;
  240. var argument;
  241. var client_list;
  242. if (generic) {
  243. argument = zeroBuffer(req_size);
  244. client_list = self.genericClients;
  245. } else {
  246. argument = {
  247. response_size: resp_size,
  248. payload: {
  249. body: zeroBuffer(req_size)
  250. }
  251. };
  252. client_list = self.clients;
  253. }
  254. if (rpc_type == 'UNARY') {
  255. makeCall = function(client, poisson) {
  256. if (self.running) {
  257. self.pending_calls++;
  258. var start_time = process.hrtime();
  259. client.unaryCall(argument, function(error, response) {
  260. if (error) {
  261. self.emit('error', new Error('Client error: ' + error.message));
  262. self.running = false;
  263. return;
  264. }
  265. var time_diff = process.hrtime(start_time);
  266. self.histogram.add(timeDiffToNanos(time_diff));
  267. self.pending_calls--;
  268. if ((!self.running) && self.pending_calls == 0) {
  269. self.emit('finished');
  270. }
  271. });
  272. } else {
  273. poisson.stop();
  274. }
  275. };
  276. } else {
  277. self.emit('error', new Error('Streaming Poisson benchmarks not supported'));
  278. return;
  279. }
  280. var averageIntervalMs = (1 / offered_load) * 1000;
  281. startAllClients(client_list, outstanding_rpcs_per_channel, function(client){
  282. var p = PoissonProcess.create(averageIntervalMs, function() {
  283. makeCall(client, p);
  284. });
  285. p.start();
  286. }, self);
  287. };
  288. /**
  289. * Return curent statistics for the client. If reset is set, restart
  290. * statistic collection.
  291. * @param {boolean} reset Indicates that statistics should be reset
  292. * @return {object} Client statistics
  293. */
  294. BenchmarkClient.prototype.mark = function(reset) {
  295. var wall_time_diff = process.hrtime(this.last_wall_time);
  296. var usage_diff = process.cpuUsage(this.last_usage);
  297. var histogram = this.histogram;
  298. if (reset) {
  299. this.last_wall_time = process.hrtime();
  300. this.last_usage = process.cpuUsage();
  301. this.histogram = new Histogram(histogram.resolution,
  302. histogram.max_possible);
  303. }
  304. return {
  305. latencies: {
  306. bucket: histogram.getContents(),
  307. min_seen: histogram.minimum(),
  308. max_seen: histogram.maximum(),
  309. sum: histogram.getSum(),
  310. sum_of_squares: histogram.sumOfSquares(),
  311. count: histogram.getCount()
  312. },
  313. time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9,
  314. time_user: usage_diff.user / 1000000,
  315. time_system: usage_diff.system / 1000000
  316. };
  317. };
  318. /**
  319. * Stop the clients.
  320. * @param {function} callback Called when the clients have finished shutting
  321. * down
  322. */
  323. BenchmarkClient.prototype.stop = function(callback) {
  324. this.running = false;
  325. this.on('finished', callback);
  326. };
  327. module.exports = BenchmarkClient;