CommonThreadedBase.cs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. #region Copyright notice and license
  2. // Copyright 2015 gRPC authors.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. #endregion
  16. using System;
  17. using System.Collections.Concurrent;
  18. using System.Collections.Generic;
  19. using System.Threading;
  20. using System.Threading.Tasks;
  21. using BenchmarkDotNet.Attributes;
  22. using Grpc.Core;
  23. namespace Grpc.Microbenchmarks
  24. {
  25. // common base-type for tests that need to run with some level of concurrency;
  26. // note there's nothing *special* about this type - it is just to save some
  27. // boilerplate
  28. [ClrJob, CoreJob] // test .NET Core and .NET Framework
  29. [MemoryDiagnoser] // allocations
  30. public abstract class CommonThreadedBase
  31. {
  32. protected virtual bool NeedsEnvironment => true;
  33. [Params(1, 2, 4, 6)]
  34. public int ThreadCount { get; set; }
  35. protected GrpcEnvironment Environment { get; private set; }
  36. private List<Thread> workers;
  37. private List<BlockingCollection<Action>> dispatchQueues;
  38. [GlobalSetup]
  39. public virtual void Setup()
  40. {
  41. dispatchQueues = new List<BlockingCollection<Action>>();
  42. workers = new List<Thread>();
  43. for (int i = 0; i < ThreadCount; i++)
  44. {
  45. var dispatchQueue = new BlockingCollection<Action>();
  46. var thread = new Thread(new ThreadStart(() => WorkerThreadBody(dispatchQueue)));
  47. thread.Name = string.Format("threaded benchmark worker {0}", i);
  48. thread.Start();
  49. workers.Add(thread);
  50. dispatchQueues.Add(dispatchQueue);
  51. }
  52. if (NeedsEnvironment) Environment = GrpcEnvironment.AddRef();
  53. }
  54. [GlobalCleanup]
  55. public virtual void Cleanup()
  56. {
  57. for (int i = 0; i < ThreadCount; i++)
  58. {
  59. dispatchQueues[i].Add(null); // null action request termination of the worker thread.
  60. workers[i].Join();
  61. }
  62. if (Environment != null)
  63. {
  64. Environment = null;
  65. GrpcEnvironment.ReleaseAsync().Wait();
  66. }
  67. }
  68. /// <summary>
  69. /// Runs the operation in parallel (once on each worker thread).
  70. /// This method tries to incur as little
  71. /// overhead as possible, but there is some inherent overhead
  72. /// that is hard to avoid (thread hop etc.). Therefore it is strongly
  73. /// recommended that the benchmarked operation runs long enough to
  74. /// make this overhead negligible.
  75. /// </summary>
  76. protected void RunConcurrent(Action operation)
  77. {
  78. var workItemTasks = new Task[ThreadCount];
  79. for (int i = 0; i < ThreadCount; i++)
  80. {
  81. var tcs = new TaskCompletionSource<object>();
  82. var workItem = new Action(() =>
  83. {
  84. try
  85. {
  86. operation();
  87. tcs.SetResult(null);
  88. }
  89. catch (Exception e)
  90. {
  91. tcs.SetException(e);
  92. }
  93. });
  94. workItemTasks[i] = tcs.Task;
  95. dispatchQueues[i].Add(workItem);
  96. }
  97. Task.WaitAll(workItemTasks);
  98. }
  99. private void WorkerThreadBody(BlockingCollection<Action> dispatchQueue)
  100. {
  101. while(true)
  102. {
  103. var workItem = dispatchQueue.Take();
  104. if (workItem == null)
  105. {
  106. // stop the worker if null action was provided
  107. break;
  108. }
  109. workItem();
  110. }
  111. }
  112. }
  113. }