AsyncStreamExtensions.cs 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. #region Copyright notice and license
  2. // Copyright 2015 gRPC authors.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. #endregion
  16. using System;
  17. using System.Collections.Generic;
  18. using System.Threading.Tasks;
  19. namespace Grpc.Core.Utils
  20. {
  21. /// <summary>
  22. /// Extension methods that simplify work with gRPC streaming calls.
  23. /// </summary>
  24. public static class AsyncStreamExtensions
  25. {
  26. /// <summary>
  27. /// Reads the entire stream and executes an async action for each element.
  28. /// </summary>
  29. public static async Task ForEachAsync<T>(this IAsyncStreamReader<T> streamReader, Func<T, Task> asyncAction)
  30. where T : class
  31. {
  32. while (await streamReader.MoveNext().ConfigureAwait(false))
  33. {
  34. await asyncAction(streamReader.Current).ConfigureAwait(false);
  35. }
  36. }
  37. /// <summary>
  38. /// Reads the entire stream and creates a list containing all the elements read.
  39. /// </summary>
  40. public static async Task<List<T>> ToListAsync<T>(this IAsyncStreamReader<T> streamReader)
  41. where T : class
  42. {
  43. var result = new List<T>();
  44. while (await streamReader.MoveNext().ConfigureAwait(false))
  45. {
  46. result.Add(streamReader.Current);
  47. }
  48. return result;
  49. }
  50. /// <summary>
  51. /// Writes all elements from given enumerable to the stream.
  52. /// Completes the stream afterwards unless close = false.
  53. /// </summary>
  54. public static async Task WriteAllAsync<T>(this IClientStreamWriter<T> streamWriter, IEnumerable<T> elements, bool complete = true)
  55. where T : class
  56. {
  57. foreach (var element in elements)
  58. {
  59. await streamWriter.WriteAsync(element).ConfigureAwait(false);
  60. }
  61. if (complete)
  62. {
  63. await streamWriter.CompleteAsync().ConfigureAwait(false);
  64. }
  65. }
  66. /// <summary>
  67. /// Writes all elements from given enumerable to the stream.
  68. /// </summary>
  69. public static async Task WriteAllAsync<T>(this IServerStreamWriter<T> streamWriter, IEnumerable<T> elements)
  70. where T : class
  71. {
  72. foreach (var element in elements)
  73. {
  74. await streamWriter.WriteAsync(element).ConfigureAwait(false);
  75. }
  76. }
  77. }
  78. }