ClientRunners.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. #region Copyright notice and license
  2. // Copyright 2015, Google Inc.
  3. // All rights reserved.
  4. //
  5. // Redistribution and use in source and binary forms, with or without
  6. // modification, are permitted provided that the following conditions are
  7. // met:
  8. //
  9. // * Redistributions of source code must retain the above copyright
  10. // notice, this list of conditions and the following disclaimer.
  11. // * Redistributions in binary form must reproduce the above
  12. // copyright notice, this list of conditions and the following disclaimer
  13. // in the documentation and/or other materials provided with the
  14. // distribution.
  15. // * Neither the name of Google Inc. nor the names of its
  16. // contributors may be used to endorse or promote products derived from
  17. // this software without specific prior written permission.
  18. //
  19. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  20. // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  21. // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  22. // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  23. // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  24. // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  25. // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  26. // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  27. // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  28. // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  29. // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  30. #endregion
  31. using System;
  32. using System.Collections.Generic;
  33. using System.Diagnostics;
  34. using System.IO;
  35. using System.Linq;
  36. using System.Text.RegularExpressions;
  37. using System.Threading;
  38. using System.Threading.Tasks;
  39. using Google.Protobuf;
  40. using Grpc.Core;
  41. using Grpc.Core.Logging;
  42. using Grpc.Core.Utils;
  43. using NUnit.Framework;
  44. using Grpc.Testing;
  45. namespace Grpc.IntegrationTesting
  46. {
  47. /// <summary>
  48. /// Helper methods to start client runners for performance testing.
  49. /// </summary>
  50. public class ClientRunners
  51. {
  52. static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<ClientRunners>();
  53. /// <summary>
  54. /// Creates a started client runner.
  55. /// </summary>
  56. public static IClientRunner CreateStarted(ClientConfig config)
  57. {
  58. Logger.Debug("ClientConfig: {0}", config);
  59. if (config.AsyncClientThreads != 0)
  60. {
  61. Logger.Warning("ClientConfig.AsyncClientThreads is not supported for C#. Ignoring the value");
  62. }
  63. if (config.CoreLimit != 0)
  64. {
  65. Logger.Warning("ClientConfig.CoreLimit is not supported for C#. Ignoring the value");
  66. }
  67. if (config.CoreList.Count > 0)
  68. {
  69. Logger.Warning("ClientConfig.CoreList is not supported for C#. Ignoring the value");
  70. }
  71. var channels = CreateChannels(config.ClientChannels, config.ServerTargets, config.SecurityParams);
  72. return new ClientRunnerImpl(channels,
  73. config.ClientType,
  74. config.RpcType,
  75. config.OutstandingRpcsPerChannel,
  76. config.LoadParams,
  77. config.PayloadConfig,
  78. config.HistogramParams);
  79. }
  80. private static List<Channel> CreateChannels(int clientChannels, IEnumerable<string> serverTargets, SecurityParams securityParams)
  81. {
  82. GrpcPreconditions.CheckArgument(clientChannels > 0, "clientChannels needs to be at least 1.");
  83. GrpcPreconditions.CheckArgument(serverTargets.Count() > 0, "at least one serverTarget needs to be specified.");
  84. var credentials = securityParams != null ? TestCredentials.CreateSslCredentials() : ChannelCredentials.Insecure;
  85. List<ChannelOption> channelOptions = null;
  86. if (securityParams != null && securityParams.ServerHostOverride != "")
  87. {
  88. channelOptions = new List<ChannelOption>
  89. {
  90. new ChannelOption(ChannelOptions.SslTargetNameOverride, securityParams.ServerHostOverride)
  91. };
  92. }
  93. var result = new List<Channel>();
  94. for (int i = 0; i < clientChannels; i++)
  95. {
  96. var target = serverTargets.ElementAt(i % serverTargets.Count());
  97. var channel = new Channel(target, credentials, channelOptions);
  98. result.Add(channel);
  99. }
  100. return result;
  101. }
  102. }
  103. public class ClientRunnerImpl : IClientRunner
  104. {
  105. const double SecondsToNanos = 1e9;
  106. readonly List<Channel> channels;
  107. readonly ClientType clientType;
  108. readonly RpcType rpcType;
  109. readonly PayloadConfig payloadConfig;
  110. readonly Histogram histogram;
  111. readonly List<Task> runnerTasks;
  112. readonly CancellationTokenSource stoppedCts = new CancellationTokenSource();
  113. readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch();
  114. public ClientRunnerImpl(List<Channel> channels, ClientType clientType, RpcType rpcType, int outstandingRpcsPerChannel, LoadParams loadParams, PayloadConfig payloadConfig, HistogramParams histogramParams)
  115. {
  116. GrpcPreconditions.CheckArgument(outstandingRpcsPerChannel > 0, "outstandingRpcsPerChannel");
  117. this.channels = new List<Channel>(channels);
  118. this.clientType = clientType;
  119. this.rpcType = rpcType;
  120. this.payloadConfig = payloadConfig;
  121. this.histogram = new Histogram(histogramParams.Resolution, histogramParams.MaxPossible);
  122. this.runnerTasks = new List<Task>();
  123. foreach (var channel in this.channels)
  124. {
  125. for (int i = 0; i < outstandingRpcsPerChannel; i++)
  126. {
  127. var timer = CreateTimer(loadParams, 1.0 / this.channels.Count / outstandingRpcsPerChannel);
  128. var threadBody = GetThreadBody(channel, timer);
  129. this.runnerTasks.Add(Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning));
  130. }
  131. }
  132. }
  133. public ClientStats GetStats(bool reset)
  134. {
  135. var histogramData = histogram.GetSnapshot(reset);
  136. var secondsElapsed = wallClockStopwatch.GetElapsedSnapshot(reset).TotalSeconds;
  137. // TODO: populate user time and system time
  138. return new ClientStats
  139. {
  140. Latencies = histogramData,
  141. TimeElapsed = secondsElapsed,
  142. TimeUser = 0,
  143. TimeSystem = 0
  144. };
  145. }
  146. public async Task StopAsync()
  147. {
  148. stoppedCts.Cancel();
  149. foreach (var runnerTask in runnerTasks)
  150. {
  151. await runnerTask;
  152. }
  153. foreach (var channel in channels)
  154. {
  155. await channel.ShutdownAsync();
  156. }
  157. }
  158. private void RunUnary(Channel channel, IInterarrivalTimer timer)
  159. {
  160. var client = BenchmarkService.NewClient(channel);
  161. var request = CreateSimpleRequest();
  162. var stopwatch = new Stopwatch();
  163. while (!stoppedCts.Token.IsCancellationRequested)
  164. {
  165. stopwatch.Restart();
  166. client.UnaryCall(request);
  167. stopwatch.Stop();
  168. // spec requires data point in nanoseconds.
  169. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
  170. timer.WaitForNext();
  171. }
  172. }
  173. private async Task RunUnaryAsync(Channel channel, IInterarrivalTimer timer)
  174. {
  175. var client = BenchmarkService.NewClient(channel);
  176. var request = CreateSimpleRequest();
  177. var stopwatch = new Stopwatch();
  178. while (!stoppedCts.Token.IsCancellationRequested)
  179. {
  180. stopwatch.Restart();
  181. await client.UnaryCallAsync(request);
  182. stopwatch.Stop();
  183. // spec requires data point in nanoseconds.
  184. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
  185. await timer.WaitForNextAsync();
  186. }
  187. }
  188. private async Task RunStreamingPingPongAsync(Channel channel, IInterarrivalTimer timer)
  189. {
  190. var client = BenchmarkService.NewClient(channel);
  191. var request = CreateSimpleRequest();
  192. var stopwatch = new Stopwatch();
  193. using (var call = client.StreamingCall())
  194. {
  195. while (!stoppedCts.Token.IsCancellationRequested)
  196. {
  197. stopwatch.Restart();
  198. await call.RequestStream.WriteAsync(request);
  199. await call.ResponseStream.MoveNext();
  200. stopwatch.Stop();
  201. // spec requires data point in nanoseconds.
  202. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
  203. await timer.WaitForNextAsync();
  204. }
  205. // finish the streaming call
  206. await call.RequestStream.CompleteAsync();
  207. Assert.IsFalse(await call.ResponseStream.MoveNext());
  208. }
  209. }
  210. private async Task RunGenericStreamingAsync(Channel channel, IInterarrivalTimer timer)
  211. {
  212. var request = CreateByteBufferRequest();
  213. var stopwatch = new Stopwatch();
  214. var callDetails = new CallInvocationDetails<byte[], byte[]>(channel, GenericService.StreamingCallMethod, new CallOptions());
  215. using (var call = Calls.AsyncDuplexStreamingCall(callDetails))
  216. {
  217. while (!stoppedCts.Token.IsCancellationRequested)
  218. {
  219. stopwatch.Restart();
  220. await call.RequestStream.WriteAsync(request);
  221. await call.ResponseStream.MoveNext();
  222. stopwatch.Stop();
  223. // spec requires data point in nanoseconds.
  224. histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos);
  225. await timer.WaitForNextAsync();
  226. }
  227. // finish the streaming call
  228. await call.RequestStream.CompleteAsync();
  229. Assert.IsFalse(await call.ResponseStream.MoveNext());
  230. }
  231. }
  232. private Action GetThreadBody(Channel channel, IInterarrivalTimer timer)
  233. {
  234. if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams)
  235. {
  236. GrpcPreconditions.CheckArgument(clientType == ClientType.ASYNC_CLIENT, "Generic client only supports async API");
  237. GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls");
  238. return () =>
  239. {
  240. RunGenericStreamingAsync(channel, timer).Wait();
  241. };
  242. }
  243. GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
  244. if (clientType == ClientType.SYNC_CLIENT)
  245. {
  246. GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#");
  247. return () => RunUnary(channel, timer);
  248. }
  249. else if (clientType == ClientType.ASYNC_CLIENT)
  250. {
  251. switch (rpcType)
  252. {
  253. case RpcType.UNARY:
  254. return () =>
  255. {
  256. RunUnaryAsync(channel, timer).Wait();
  257. };
  258. case RpcType.STREAMING:
  259. return () =>
  260. {
  261. RunStreamingPingPongAsync(channel, timer).Wait();
  262. };
  263. }
  264. }
  265. throw new ArgumentException("Unsupported configuration.");
  266. }
  267. private SimpleRequest CreateSimpleRequest()
  268. {
  269. GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
  270. return new SimpleRequest
  271. {
  272. Payload = CreateZerosPayload(payloadConfig.SimpleParams.ReqSize),
  273. ResponseSize = payloadConfig.SimpleParams.RespSize
  274. };
  275. }
  276. private byte[] CreateByteBufferRequest()
  277. {
  278. return new byte[payloadConfig.BytebufParams.ReqSize];
  279. }
  280. private static Payload CreateZerosPayload(int size)
  281. {
  282. return new Payload { Body = ByteString.CopyFrom(new byte[size]) };
  283. }
  284. private static IInterarrivalTimer CreateTimer(LoadParams loadParams, double loadMultiplier)
  285. {
  286. switch (loadParams.LoadCase)
  287. {
  288. case LoadParams.LoadOneofCase.ClosedLoop:
  289. return new ClosedLoopInterarrivalTimer();
  290. case LoadParams.LoadOneofCase.Poisson:
  291. return new PoissonInterarrivalTimer(loadParams.Poisson.OfferedLoad * loadMultiplier);
  292. default:
  293. throw new ArgumentException("Unknown load type");
  294. }
  295. }
  296. }
  297. }