|  | @@ -32,12 +32,7 @@
 | 
	
		
			
				|  |  |  #endregion
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  using System;
 | 
	
		
			
				|  |  | -using System.Diagnostics;
 | 
	
		
			
				|  |  | -using System.Runtime.CompilerServices;
 | 
	
		
			
				|  |  | -using System.Runtime.InteropServices;
 | 
	
		
			
				|  |  | -using System.Threading;
 | 
	
		
			
				|  |  |  using System.Threading.Tasks;
 | 
	
		
			
				|  |  | -using Grpc.Core.Internal;
 | 
	
		
			
				|  |  |  using Grpc.Core.Logging;
 | 
	
		
			
				|  |  |  using Grpc.Core.Profiling;
 | 
	
		
			
				|  |  |  using Grpc.Core.Utils;
 | 
	
	
		
			
				|  | @@ -57,9 +52,11 @@ namespace Grpc.Core.Internal
 | 
	
		
			
				|  |  |          // Completion of a pending unary response if not null.
 | 
	
		
			
				|  |  |          TaskCompletionSource<TResponse> unaryResponseTcs;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        // TODO(jtattermusch): this field doesn't need to be initialized for unary response calls.
 | 
	
		
			
				|  |  |          // Indicates that response streaming call has finished.
 | 
	
		
			
				|  |  |          TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        // TODO(jtattermusch): this field could be lazy-initialized (only if someone requests the response headers).
 | 
	
		
			
				|  |  |          // Response headers set here once received.
 | 
	
		
			
				|  |  |          TaskCompletionSource<Metadata> responseHeadersTcs = new TaskCompletionSource<Metadata>();
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -67,7 +64,7 @@ namespace Grpc.Core.Internal
 | 
	
		
			
				|  |  |          ClientSideStatus? finishedStatus;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          public AsyncCall(CallInvocationDetails<TRequest, TResponse> callDetails)
 | 
	
		
			
				|  |  | -            : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer, callDetails.Channel.Environment)
 | 
	
		
			
				|  |  | +            : base(callDetails.RequestMarshaller.Serializer, callDetails.ResponseMarshaller.Deserializer)
 | 
	
		
			
				|  |  |          {
 | 
	
		
			
				|  |  |              this.details = callDetails.WithOptions(callDetails.Options.Normalize());
 | 
	
		
			
				|  |  |              this.initialMetadataSent = true;  // we always send metadata at the very beginning of the call.
 | 
	
	
		
			
				|  | @@ -144,7 +141,7 @@ namespace Grpc.Core.Internal
 | 
	
		
			
				|  |  |                  GrpcPreconditions.CheckState(!started);
 | 
	
		
			
				|  |  |                  started = true;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                Initialize(environment.CompletionQueue);
 | 
	
		
			
				|  |  | +                Initialize(details.Channel.CompletionQueue);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |                  halfcloseRequested = true;
 | 
	
		
			
				|  |  |                  readingDone = true;
 | 
	
	
		
			
				|  | @@ -171,7 +168,7 @@ namespace Grpc.Core.Internal
 | 
	
		
			
				|  |  |                  GrpcPreconditions.CheckState(!started);
 | 
	
		
			
				|  |  |                  started = true;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                Initialize(environment.CompletionQueue);
 | 
	
		
			
				|  |  | +                Initialize(details.Channel.CompletionQueue);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |                  readingDone = true;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -195,7 +192,7 @@ namespace Grpc.Core.Internal
 | 
	
		
			
				|  |  |                  GrpcPreconditions.CheckState(!started);
 | 
	
		
			
				|  |  |                  started = true;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                Initialize(environment.CompletionQueue);
 | 
	
		
			
				|  |  | +                Initialize(details.Channel.CompletionQueue);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |                  halfcloseRequested = true;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -220,7 +217,7 @@ namespace Grpc.Core.Internal
 | 
	
		
			
				|  |  |                  GrpcPreconditions.CheckState(!started);
 | 
	
		
			
				|  |  |                  started = true;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                Initialize(environment.CompletionQueue);
 | 
	
		
			
				|  |  | +                Initialize(details.Channel.CompletionQueue);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |                  using (var metadataArray = MetadataArraySafeHandle.Create(details.Options.Headers))
 | 
	
		
			
				|  |  |                  {
 | 
	
	
		
			
				|  | @@ -232,11 +229,10 @@ namespace Grpc.Core.Internal
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          /// <summary>
 | 
	
		
			
				|  |  |          /// Sends a streaming request. Only one pending send action is allowed at any given time.
 | 
	
		
			
				|  |  | -        /// completionDelegate is called when the operation finishes.
 | 
	
		
			
				|  |  |          /// </summary>
 | 
	
		
			
				|  |  | -        public void StartSendMessage(TRequest msg, WriteFlags writeFlags, AsyncCompletionDelegate<object> completionDelegate)
 | 
	
		
			
				|  |  | +        public Task SendMessageAsync(TRequest msg, WriteFlags writeFlags)
 | 
	
		
			
				|  |  |          {
 | 
	
		
			
				|  |  | -            StartSendMessageInternal(msg, writeFlags, completionDelegate);
 | 
	
		
			
				|  |  | +            return SendMessageInternalAsync(msg, writeFlags);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          /// <summary>
 | 
	
	
		
			
				|  | @@ -250,29 +246,32 @@ namespace Grpc.Core.Internal
 | 
	
		
			
				|  |  |          /// <summary>
 | 
	
		
			
				|  |  |          /// Sends halfclose, indicating client is done with streaming requests.
 | 
	
		
			
				|  |  |          /// Only one pending send action is allowed at any given time.
 | 
	
		
			
				|  |  | -        /// completionDelegate is called when the operation finishes.
 | 
	
		
			
				|  |  |          /// </summary>
 | 
	
		
			
				|  |  | -        public void StartSendCloseFromClient(AsyncCompletionDelegate<object> completionDelegate)
 | 
	
		
			
				|  |  | +        public Task SendCloseFromClientAsync()
 | 
	
		
			
				|  |  |          {
 | 
	
		
			
				|  |  |              lock (myLock)
 | 
	
		
			
				|  |  |              {
 | 
	
		
			
				|  |  | -                GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
 | 
	
		
			
				|  |  | -                CheckSendingAllowed(allowFinished: true);
 | 
	
		
			
				|  |  | +                GrpcPreconditions.CheckState(started);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -                if (!disposed && !finished)
 | 
	
		
			
				|  |  | +                var earlyResult = CheckSendPreconditionsClientSide();
 | 
	
		
			
				|  |  | +                if (earlyResult != null)
 | 
	
		
			
				|  |  |                  {
 | 
	
		
			
				|  |  | -                    call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
 | 
	
		
			
				|  |  | +                    return earlyResult;
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | -                else
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +                if (disposed || finished)
 | 
	
		
			
				|  |  |                  {
 | 
	
		
			
				|  |  |                      // In case the call has already been finished by the serverside,
 | 
	
		
			
				|  |  | -                    // the halfclose has already been done implicitly, so we only
 | 
	
		
			
				|  |  | -                    // emit the notification for the completion delegate.
 | 
	
		
			
				|  |  | -                    Task.Run(() => HandleSendCloseFromClientFinished(true));
 | 
	
		
			
				|  |  | +                    // the halfclose has already been done implicitly, so just return
 | 
	
		
			
				|  |  | +                    // completed task here.
 | 
	
		
			
				|  |  | +                    halfcloseRequested = true;
 | 
	
		
			
				|  |  | +                    return Task.FromResult<object>(null);
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | +                call.StartSendCloseFromClient(HandleSendCloseFromClientFinished);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |                  halfcloseRequested = true;
 | 
	
		
			
				|  |  | -                sendCompletionDelegate = completionDelegate;
 | 
	
		
			
				|  |  | +                streamingWriteTcs = new TaskCompletionSource<object>();
 | 
	
		
			
				|  |  | +                return streamingWriteTcs.Task;
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -342,6 +341,45 @@ namespace Grpc.Core.Internal
 | 
	
		
			
				|  |  |              get { return true; }
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        protected override Task CheckSendAllowedOrEarlyResult()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            var earlyResult = CheckSendPreconditionsClientSide();
 | 
	
		
			
				|  |  | +            if (earlyResult != null)
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                return earlyResult;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            if (finishedStatus.HasValue)
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                // throwing RpcException if we already received status on client
 | 
	
		
			
				|  |  | +                // side makes the most sense.
 | 
	
		
			
				|  |  | +                // Note that this throws even for StatusCode.OK.
 | 
	
		
			
				|  |  | +                // Writing after the call has finished is not a programming error because server can close
 | 
	
		
			
				|  |  | +                // the call anytime, so don't throw directly, but let the write task finish with an error.
 | 
	
		
			
				|  |  | +                var tcs = new TaskCompletionSource<object>();
 | 
	
		
			
				|  |  | +                tcs.SetException(new RpcException(finishedStatus.Value.Status));
 | 
	
		
			
				|  |  | +                return tcs.Task;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            return null;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        private Task CheckSendPreconditionsClientSide()
 | 
	
		
			
				|  |  | +        {
 | 
	
		
			
				|  |  | +            GrpcPreconditions.CheckState(!halfcloseRequested, "Request stream has already been completed.");
 | 
	
		
			
				|  |  | +            GrpcPreconditions.CheckState(streamingWriteTcs == null, "Only one write can be pending at a time.");
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            if (cancelRequested)
 | 
	
		
			
				|  |  | +            {
 | 
	
		
			
				|  |  | +                // Return a cancelled task.
 | 
	
		
			
				|  |  | +                var tcs = new TaskCompletionSource<object>();
 | 
	
		
			
				|  |  | +                tcs.SetCanceled();
 | 
	
		
			
				|  |  | +                return tcs.Task;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            return null;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          private void Initialize(CompletionQueueSafeHandle cq)
 | 
	
		
			
				|  |  |          {
 | 
	
		
			
				|  |  |              using (Profilers.ForCurrentThread().NewScope("AsyncCall.Initialize"))
 | 
	
	
		
			
				|  | @@ -368,7 +406,7 @@ namespace Grpc.Core.Internal
 | 
	
		
			
				|  |  |                  var credentials = details.Options.Credentials;
 | 
	
		
			
				|  |  |                  using (var nativeCredentials = credentials != null ? credentials.ToNativeCredentials() : null)
 | 
	
		
			
				|  |  |                  {
 | 
	
		
			
				|  |  | -                    var result = details.Channel.Handle.CreateCall(environment.CompletionRegistry,
 | 
	
		
			
				|  |  | +                    var result = details.Channel.Handle.CreateCall(
 | 
	
		
			
				|  |  |                                   parentCall, ContextPropagationToken.DefaultMask, cq,
 | 
	
		
			
				|  |  |                                   details.Method, details.Host, Timespec.FromDateTime(details.Options.Deadline.Value), nativeCredentials);
 | 
	
		
			
				|  |  |                      return result;
 | 
	
	
		
			
				|  | @@ -400,6 +438,7 @@ namespace Grpc.Core.Internal
 | 
	
		
			
				|  |  |          /// </summary>
 | 
	
		
			
				|  |  |          private void HandleReceivedResponseHeaders(bool success, Metadata responseHeaders)
 | 
	
		
			
				|  |  |          {
 | 
	
		
			
				|  |  | +            // TODO(jtattermusch): handle success==false
 | 
	
		
			
				|  |  |              responseHeadersTcs.SetResult(responseHeaders);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -443,19 +482,6 @@ namespace Grpc.Core.Internal
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        protected override void CheckSendingAllowed(bool allowFinished)
 | 
	
		
			
				|  |  | -        {
 | 
	
		
			
				|  |  | -            base.CheckSendingAllowed(true);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -            // throwing RpcException if we already received status on client
 | 
	
		
			
				|  |  | -            // side makes the most sense.
 | 
	
		
			
				|  |  | -            // Note that this throws even for StatusCode.OK.
 | 
	
		
			
				|  |  | -            if (!allowFinished && finishedStatus.HasValue)
 | 
	
		
			
				|  |  | -            {
 | 
	
		
			
				|  |  | -                throw new RpcException(finishedStatus.Value.Status);
 | 
	
		
			
				|  |  | -            }
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |          /// <summary>
 | 
	
		
			
				|  |  |          /// Handles receive status completion for calls with streaming response.
 | 
	
		
			
				|  |  |          /// </summary>
 |