benchmark_client_express.js 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. /*
  2. *
  3. * Copyright 2015 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. /**
  19. * Benchmark client module
  20. * @module
  21. */
  22. 'use strict';
  23. var fs = require('fs');
  24. var path = require('path');
  25. var util = require('util');
  26. var EventEmitter = require('events');
  27. var http = require('http');
  28. var https = require('https');
  29. var async = require('async');
  30. var _ = require('lodash');
  31. var PoissonProcess = require('poisson-process');
  32. var Histogram = require('./histogram');
  33. /**
  34. * Convert a time difference, as returned by process.hrtime, to a number of
  35. * nanoseconds.
  36. * @param {Array.<number>} time_diff The time diff, represented as
  37. * [seconds, nanoseconds]
  38. * @return {number} The total number of nanoseconds
  39. */
  40. function timeDiffToNanos(time_diff) {
  41. return time_diff[0] * 1e9 + time_diff[1];
  42. }
  43. function BenchmarkClient(server_targets, channels, histogram_params,
  44. security_params) {
  45. var options = {
  46. method: 'PUT',
  47. headers: {
  48. 'Content-Type': 'application/json'
  49. }
  50. };
  51. var protocol;
  52. if (security_params) {
  53. var ca_path;
  54. protocol = https;
  55. this.request = _.bind(https.request, https);
  56. if (security_params.use_test_ca) {
  57. ca_path = path.join(__dirname, '../test/data/ca.pem');
  58. var ca_data = fs.readFileSync(ca_path);
  59. options.ca = ca_data;
  60. }
  61. if (security_params.server_host_override) {
  62. var host_override = security_params.server_host_override;
  63. options.servername = host_override;
  64. }
  65. } else {
  66. protocol = http;
  67. }
  68. this.request = _.bind(protocol.request, protocol);
  69. this.client_options = [];
  70. for (var i = 0; i < channels; i++) {
  71. var host_port;
  72. host_port = server_targets[i % server_targets.length].split(':');
  73. var new_options = _.assign({hostname: host_port[0], port: +host_port[1]}, options);
  74. this.client_options[i] = new_options;
  75. }
  76. this.histogram = new Histogram(histogram_params.resolution,
  77. histogram_params.max_possible);
  78. this.running = false;
  79. this.pending_calls = 0;
  80. }
  81. util.inherits(BenchmarkClient, EventEmitter);
  82. function startAllClients(client_options_list, outstanding_rpcs_per_channel,
  83. makeCall, emitter) {
  84. _.each(client_options_list, function(client_options) {
  85. _.times(outstanding_rpcs_per_channel, function() {
  86. makeCall(client_options);
  87. });
  88. });
  89. }
  90. BenchmarkClient.prototype.startClosedLoop = function(
  91. outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, generic) {
  92. var self = this;
  93. var options = {};
  94. self.running = true;
  95. if (rpc_type == 'UNARY') {
  96. options.path = '/serviceProto.BenchmarkService.service/unaryCall';
  97. } else {
  98. self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type));
  99. }
  100. if (generic) {
  101. self.emit('error', new Error('Generic client not supported'));
  102. }
  103. self.last_wall_time = process.hrtime();
  104. self.last_usage = process.cpuUsage();
  105. var argument = {
  106. response_size: resp_size,
  107. payload: {
  108. body: '0'.repeat(req_size)
  109. }
  110. };
  111. function makeCall(client_options) {
  112. if (self.running) {
  113. self.pending_calls++;
  114. var start_time = process.hrtime();
  115. function finishCall(success) {
  116. if (success) {
  117. var time_diff = process.hrtime(start_time);
  118. self.histogram.add(timeDiffToNanos(time_diff));
  119. }
  120. makeCall(client_options);
  121. self.pending_calls--;
  122. if ((!self.running) && self.pending_calls == 0) {
  123. self.emit('finished');
  124. }
  125. }
  126. var req = self.request(client_options, function(res) {
  127. var res_data = '';
  128. res.on('data', function(data) {
  129. res_data += data;
  130. });
  131. res.on('end', function() {
  132. JSON.parse(res_data);
  133. finishCall(true);
  134. });
  135. });
  136. req.write(JSON.stringify(argument));
  137. req.end();
  138. req.on('error', function(error) {
  139. if (error.code === 'ECONNRESET' || error.code === 'ETIMEDOUT') {
  140. finishCall(false);
  141. return;
  142. }
  143. self.emit('error', new Error('Client error: ' + error.message));
  144. self.running = false;
  145. });
  146. }
  147. }
  148. startAllClients(_.map(self.client_options, _.partial(_.assign, options)),
  149. outstanding_rpcs_per_channel, makeCall, self);
  150. };
  151. BenchmarkClient.prototype.startPoisson = function(
  152. outstanding_rpcs_per_channel, rpc_type, req_size, resp_size, offered_load,
  153. generic) {
  154. var self = this;
  155. var options = {};
  156. self.running = true;
  157. if (rpc_type == 'UNARY') {
  158. options.path = '/serviceProto.BenchmarkService.service/unaryCall';
  159. } else {
  160. self.emit('error', new Error('Unsupported rpc_type: ' + rpc_type));
  161. }
  162. if (generic) {
  163. self.emit('error', new Error('Generic client not supported'));
  164. }
  165. self.last_wall_time = process.hrtime();
  166. self.last_usage = process.cpuUsage();
  167. var argument = {
  168. response_size: resp_size,
  169. payload: {
  170. body: '0'.repeat(req_size)
  171. }
  172. };
  173. function makeCall(client_options, poisson) {
  174. if (self.running) {
  175. self.pending_calls++;
  176. var start_time = process.hrtime();
  177. var req = self.request(client_options, function(res) {
  178. var res_data = '';
  179. res.on('data', function(data) {
  180. res_data += data;
  181. });
  182. res.on('end', function() {
  183. JSON.parse(res_data);
  184. var time_diff = process.hrtime(start_time);
  185. self.histogram.add(timeDiffToNanos(time_diff));
  186. self.pending_calls--;
  187. if ((!self.running) && self.pending_calls == 0) {
  188. self.emit('finished');
  189. }
  190. });
  191. });
  192. req.write(JSON.stringify(argument));
  193. req.end();
  194. req.on('error', function(error) {
  195. self.emit('error', new Error('Client error: ' + error.message));
  196. self.running = false;
  197. });
  198. } else {
  199. poisson.stop();
  200. }
  201. }
  202. var averageIntervalMs = (1 / offered_load) * 1000;
  203. startAllClients(_.map(self.client_options, _.partial(_.assign, options)),
  204. outstanding_rpcs_per_channel, function(opts){
  205. var p = PoissonProcess.create(averageIntervalMs, function() {
  206. makeCall(opts, p);
  207. });
  208. p.start();
  209. }, self);
  210. };
  211. /**
  212. * Return curent statistics for the client. If reset is set, restart
  213. * statistic collection.
  214. * @param {boolean} reset Indicates that statistics should be reset
  215. * @return {object} Client statistics
  216. */
  217. BenchmarkClient.prototype.mark = function(reset) {
  218. var wall_time_diff = process.hrtime(this.last_wall_time);
  219. var usage_diff = process.cpuUsage(this.last_usage);
  220. var histogram = this.histogram;
  221. if (reset) {
  222. this.last_wall_time = process.hrtime();
  223. this.last_usage = process.cpuUsage();
  224. this.histogram = new Histogram(histogram.resolution,
  225. histogram.max_possible);
  226. }
  227. return {
  228. latencies: {
  229. bucket: histogram.getContents(),
  230. min_seen: histogram.minimum(),
  231. max_seen: histogram.maximum(),
  232. sum: histogram.getSum(),
  233. sum_of_squares: histogram.sumOfSquares(),
  234. count: histogram.getCount()
  235. },
  236. time_elapsed: wall_time_diff[0] + wall_time_diff[1] / 1e9,
  237. time_user: usage_diff.user / 1000000,
  238. time_system: usage_diff.system / 1000000
  239. };
  240. };
  241. /**
  242. * Stop the clients.
  243. * @param {function} callback Called when the clients have finished shutting
  244. * down
  245. */
  246. BenchmarkClient.prototype.stop = function(callback) {
  247. this.running = false;
  248. this.on('finished', callback);
  249. };
  250. module.exports = BenchmarkClient;