GrpcEnvironment.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. #region Copyright notice and license
  2. // Copyright 2015 gRPC authors.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. #endregion
  16. using System;
  17. using System.Collections.Generic;
  18. using System.Linq;
  19. using System.Runtime.InteropServices;
  20. using System.Threading.Tasks;
  21. using Grpc.Core.Internal;
  22. using Grpc.Core.Logging;
  23. using Grpc.Core.Utils;
  24. namespace Grpc.Core
  25. {
  26. /// <summary>
  27. /// Encapsulates initialization and shutdown of gRPC library.
  28. /// </summary>
  29. public class GrpcEnvironment
  30. {
  31. const int MinDefaultThreadPoolSize = 4;
  32. static object staticLock = new object();
  33. static GrpcEnvironment instance;
  34. static int refCount;
  35. static int? customThreadPoolSize;
  36. static int? customCompletionQueueCount;
  37. static bool inlineHandlers;
  38. static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
  39. static readonly HashSet<Server> registeredServers = new HashSet<Server>();
  40. static ILogger logger = new NullLogger();
  41. readonly GrpcThreadPool threadPool;
  42. readonly DebugStats debugStats = new DebugStats();
  43. readonly AtomicCounter cqPickerCounter = new AtomicCounter();
  44. bool isShutdown;
  45. /// <summary>
  46. /// Returns a reference-counted instance of initialized gRPC environment.
  47. /// Subsequent invocations return the same instance unless reference count has dropped to zero previously.
  48. /// </summary>
  49. internal static GrpcEnvironment AddRef()
  50. {
  51. ShutdownHooks.Register();
  52. lock (staticLock)
  53. {
  54. refCount++;
  55. if (instance == null)
  56. {
  57. instance = new GrpcEnvironment();
  58. }
  59. return instance;
  60. }
  61. }
  62. /// <summary>
  63. /// Decrements the reference count for currently active environment and asynchronously shuts down the gRPC environment if reference count drops to zero.
  64. /// </summary>
  65. internal static async Task ReleaseAsync()
  66. {
  67. GrpcEnvironment instanceToShutdown = null;
  68. lock (staticLock)
  69. {
  70. GrpcPreconditions.CheckState(refCount > 0);
  71. refCount--;
  72. if (refCount == 0)
  73. {
  74. instanceToShutdown = instance;
  75. instance = null;
  76. }
  77. }
  78. if (instanceToShutdown != null)
  79. {
  80. await instanceToShutdown.ShutdownAsync().ConfigureAwait(false);
  81. }
  82. }
  83. internal static int GetRefCount()
  84. {
  85. lock (staticLock)
  86. {
  87. return refCount;
  88. }
  89. }
  90. internal static void RegisterChannel(Channel channel)
  91. {
  92. lock (staticLock)
  93. {
  94. GrpcPreconditions.CheckNotNull(channel);
  95. registeredChannels.Add(channel);
  96. }
  97. }
  98. internal static void UnregisterChannel(Channel channel)
  99. {
  100. lock (staticLock)
  101. {
  102. GrpcPreconditions.CheckNotNull(channel);
  103. GrpcPreconditions.CheckArgument(registeredChannels.Remove(channel), "Channel not found in the registered channels set.");
  104. }
  105. }
  106. internal static void RegisterServer(Server server)
  107. {
  108. lock (staticLock)
  109. {
  110. GrpcPreconditions.CheckNotNull(server);
  111. registeredServers.Add(server);
  112. }
  113. }
  114. internal static void UnregisterServer(Server server)
  115. {
  116. lock (staticLock)
  117. {
  118. GrpcPreconditions.CheckNotNull(server);
  119. GrpcPreconditions.CheckArgument(registeredServers.Remove(server), "Server not found in the registered servers set.");
  120. }
  121. }
  122. /// <summary>
  123. /// Requests shutdown of all channels created by the current process.
  124. /// </summary>
  125. public static Task ShutdownChannelsAsync()
  126. {
  127. HashSet<Channel> snapshot = null;
  128. lock (staticLock)
  129. {
  130. snapshot = new HashSet<Channel>(registeredChannels);
  131. }
  132. return Task.WhenAll(snapshot.Select((channel) => channel.ShutdownAsync()));
  133. }
  134. /// <summary>
  135. /// Requests immediate shutdown of all servers created by the current process.
  136. /// </summary>
  137. public static Task KillServersAsync()
  138. {
  139. HashSet<Server> snapshot = null;
  140. lock (staticLock)
  141. {
  142. snapshot = new HashSet<Server>(registeredServers);
  143. }
  144. return Task.WhenAll(snapshot.Select((server) => server.KillAsync()));
  145. }
  146. /// <summary>
  147. /// Gets application-wide logger used by gRPC.
  148. /// </summary>
  149. /// <value>The logger.</value>
  150. public static ILogger Logger
  151. {
  152. get
  153. {
  154. return logger;
  155. }
  156. }
  157. /// <summary>
  158. /// Sets the application-wide logger that should be used by gRPC.
  159. /// </summary>
  160. public static void SetLogger(ILogger customLogger)
  161. {
  162. GrpcPreconditions.CheckNotNull(customLogger, "customLogger");
  163. logger = customLogger;
  164. }
  165. /// <summary>
  166. /// Sets the number of threads in the gRPC thread pool that polls for internal RPC events.
  167. /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
  168. /// Setting thread pool size is an advanced setting and you should only use it if you know what you are doing.
  169. /// Most users should rely on the default value provided by gRPC library.
  170. /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
  171. /// </summary>
  172. public static void SetThreadPoolSize(int threadCount)
  173. {
  174. lock (staticLock)
  175. {
  176. GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
  177. GrpcPreconditions.CheckArgument(threadCount > 0, "threadCount needs to be a positive number");
  178. customThreadPoolSize = threadCount;
  179. }
  180. }
  181. /// <summary>
  182. /// Sets the number of completion queues in the gRPC thread pool that polls for internal RPC events.
  183. /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
  184. /// Setting the number of completions queues is an advanced setting and you should only use it if you know what you are doing.
  185. /// Most users should rely on the default value provided by gRPC library.
  186. /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
  187. /// </summary>
  188. public static void SetCompletionQueueCount(int completionQueueCount)
  189. {
  190. lock (staticLock)
  191. {
  192. GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
  193. GrpcPreconditions.CheckArgument(completionQueueCount > 0, "threadCount needs to be a positive number");
  194. customCompletionQueueCount = completionQueueCount;
  195. }
  196. }
  197. /// <summary>
  198. /// By default, gRPC's internal event handlers get offloaded to .NET default thread pool thread (<c>inlineHandlers=false</c>).
  199. /// Setting <c>inlineHandlers</c> to <c>true</c> will allow scheduling the event handlers directly to
  200. /// <c>GrpcThreadPool</c> internal threads. That can lead to significant performance gains in some situations,
  201. /// but requires user to never block in async code (incorrectly written code can easily lead to deadlocks).
  202. /// Inlining handlers is an advanced setting and you should only use it if you know what you are doing.
  203. /// Most users should rely on the default value provided by gRPC library.
  204. /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
  205. /// Note: <c>inlineHandlers=true</c> was the default in gRPC C# v1.4.x and earlier.
  206. /// </summary>
  207. public static void SetHandlerInlining(bool inlineHandlers)
  208. {
  209. lock (staticLock)
  210. {
  211. GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
  212. GrpcEnvironment.inlineHandlers = inlineHandlers;
  213. }
  214. }
  215. /// <summary>
  216. /// Occurs when <c>GrpcEnvironment</c> is about the start the shutdown logic.
  217. /// If <c>GrpcEnvironment</c> is later initialized and shutdown, the event will be fired again (unless unregistered first).
  218. /// </summary>
  219. public static event EventHandler ShuttingDown;
  220. /// <summary>
  221. /// Creates gRPC environment.
  222. /// </summary>
  223. private GrpcEnvironment()
  224. {
  225. GrpcNativeInit();
  226. threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault(), GetCompletionQueueCountOrDefault(), inlineHandlers);
  227. threadPool.Start();
  228. }
  229. /// <summary>
  230. /// Gets the completion queues used by this gRPC environment.
  231. /// </summary>
  232. internal IReadOnlyCollection<CompletionQueueSafeHandle> CompletionQueues
  233. {
  234. get
  235. {
  236. return this.threadPool.CompletionQueues;
  237. }
  238. }
  239. internal bool IsAlive
  240. {
  241. get
  242. {
  243. return this.threadPool.IsAlive;
  244. }
  245. }
  246. /// <summary>
  247. /// Picks a completion queue in a round-robin fashion.
  248. /// Shouldn't be invoked on a per-call basis (used at per-channel basis).
  249. /// </summary>
  250. internal CompletionQueueSafeHandle PickCompletionQueue()
  251. {
  252. var cqIndex = (int) ((cqPickerCounter.Increment() - 1) % this.threadPool.CompletionQueues.Count);
  253. return this.threadPool.CompletionQueues.ElementAt(cqIndex);
  254. }
  255. /// <summary>
  256. /// Gets the completion queue used by this gRPC environment.
  257. /// </summary>
  258. internal DebugStats DebugStats
  259. {
  260. get
  261. {
  262. return this.debugStats;
  263. }
  264. }
  265. /// <summary>
  266. /// Gets version of gRPC C core.
  267. /// </summary>
  268. internal static string GetCoreVersionString()
  269. {
  270. var ptr = NativeMethods.Get().grpcsharp_version_string(); // the pointer is not owned
  271. return Marshal.PtrToStringAnsi(ptr);
  272. }
  273. internal static void GrpcNativeInit()
  274. {
  275. NativeMethods.Get().grpcsharp_init();
  276. }
  277. internal static void GrpcNativeShutdown()
  278. {
  279. NativeMethods.Get().grpcsharp_shutdown();
  280. }
  281. /// <summary>
  282. /// Shuts down this environment.
  283. /// </summary>
  284. private async Task ShutdownAsync()
  285. {
  286. if (isShutdown)
  287. {
  288. throw new InvalidOperationException("ShutdownAsync has already been called");
  289. }
  290. await Task.Run(() => ShuttingDown?.Invoke(this, null)).ConfigureAwait(false);
  291. await threadPool.StopAsync().ConfigureAwait(false);
  292. GrpcNativeShutdown();
  293. isShutdown = true;
  294. debugStats.CheckOK();
  295. }
  296. private int GetThreadPoolSizeOrDefault()
  297. {
  298. if (customThreadPoolSize.HasValue)
  299. {
  300. return customThreadPoolSize.Value;
  301. }
  302. // In systems with many cores, use half of the cores for GrpcThreadPool
  303. // and the other half for .NET thread pool. This heuristic definitely needs
  304. // more work, but seems to work reasonably well for a start.
  305. return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2);
  306. }
  307. private int GetCompletionQueueCountOrDefault()
  308. {
  309. if (customCompletionQueueCount.HasValue)
  310. {
  311. return customCompletionQueueCount.Value;
  312. }
  313. // by default, create a completion queue for each thread
  314. return GetThreadPoolSizeOrDefault();
  315. }
  316. private static class ShutdownHooks
  317. {
  318. static object staticLock = new object();
  319. static bool hooksRegistered;
  320. public static void Register()
  321. {
  322. lock (staticLock)
  323. {
  324. if (!hooksRegistered)
  325. {
  326. #if NETSTANDARD1_5
  327. System.Runtime.Loader.AssemblyLoadContext.Default.Unloading += (assemblyLoadContext) => { HandleShutdown(); };
  328. #else
  329. AppDomain.CurrentDomain.ProcessExit += (sender, eventArgs) => { HandleShutdown(); };
  330. AppDomain.CurrentDomain.DomainUnload += (sender, eventArgs) => { HandleShutdown(); };
  331. #endif
  332. }
  333. hooksRegistered = true;
  334. }
  335. }
  336. /// <summary>
  337. /// Handler for AppDomain.DomainUnload, AppDomain.ProcessExit and AssemblyLoadContext.Unloading hooks.
  338. /// </summary>
  339. private static void HandleShutdown()
  340. {
  341. Task.WaitAll(GrpcEnvironment.ShutdownChannelsAsync(), GrpcEnvironment.KillServersAsync());
  342. }
  343. }
  344. }
  345. }