浏览代码

Fix inproc transport bugs on client WritesDone or Read after status

Vijay Pai 5 年之前
父节点
当前提交
5e569a5d37
共有 1 个文件被更改,包括 32 次插入15 次删除
  1. 32 15
      src/core/ext/transport/inproc/inproc_transport.cc

+ 32 - 15
src/core/ext/transport/inproc/inproc_transport.cc

@@ -267,6 +267,11 @@ struct inproc_stream {
   bool trailing_md_sent = false;
   bool initial_md_recvd = false;
   bool trailing_md_recvd = false;
+  // The following tracks if the server-side only pretends to have received
+  // trailing metadata since it no longer cares about the RPC. If that is the
+  // case, it is still ok for the client to send trailing metadata (in which
+  // case it will be ignored).
+  bool trailing_md_recvd_implicit_only = false;
 
   bool closed = false;
 
@@ -617,7 +622,7 @@ void op_state_machine_locked(inproc_stream* s, grpc_error* error) {
       s->send_message_op->payload->send_message.send_message.reset();
       complete_if_batch_end_locked(
           s, GRPC_ERROR_NONE, s->send_message_op,
-          "op_state_machine scheduling send-message-on-complete");
+          "op_state_machine scheduling send-message-on-complete case 1");
       s->send_message_op = nullptr;
     }
   }
@@ -739,15 +744,25 @@ void op_state_machine_locked(inproc_stream* s, grpc_error* error) {
   }
   if (s->to_read_trailing_md_filled) {
     if (s->trailing_md_recvd) {
-      new_err =
-          GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
-      INPROC_LOG(
-          GPR_INFO,
-          "op_state_machine %p scheduling on_complete errors for already "
-          "recvd trailing md %p",
-          s, new_err);
-      fail_helper_locked(s, GRPC_ERROR_REF(new_err));
-      goto done;
+      if (s->trailing_md_recvd_implicit_only) {
+        INPROC_LOG(GPR_INFO,
+                   "op_state_machine %p already implicitly received trailing "
+                   "metadata, so ignoring new trailing metadata from client",
+                   s);
+        grpc_metadata_batch_clear(&s->to_read_trailing_md);
+        s->to_read_trailing_md_filled = false;
+        s->trailing_md_recvd_implicit_only = false;
+      } else {
+        new_err =
+            GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
+        INPROC_LOG(
+            GPR_INFO,
+            "op_state_machine %p scheduling on_complete errors for already "
+            "recvd trailing md %p",
+            s, new_err);
+        fail_helper_locked(s, GRPC_ERROR_REF(new_err));
+        goto done;
+      }
     }
     if (s->recv_message_op != nullptr) {
       // This message needs to be wrapped up because it will never be
@@ -770,7 +785,7 @@ void op_state_machine_locked(inproc_stream* s, grpc_error* error) {
       s->send_message_op->payload->send_message.stream_write_closed = true;
       complete_if_batch_end_locked(
           s, new_err, s->send_message_op,
-          "op_state_machine scheduling send-message-on-complete");
+          "op_state_machine scheduling send-message-on-complete case 2");
       s->send_message_op = nullptr;
     }
     if (s->recv_trailing_md_op != nullptr) {
@@ -809,7 +824,7 @@ void op_state_machine_locked(inproc_stream* s, grpc_error* error) {
                    "trailing-md-on-complete %p",
                    s, new_err);
       }
-    } else {
+    } else if (!s->trailing_md_recvd) {
       INPROC_LOG(
           GPR_INFO,
           "op_state_machine %p has trailing md but not yet waiting for it", s);
@@ -832,6 +847,9 @@ void op_state_machine_locked(inproc_stream* s, grpc_error* error) {
         "op_state_machine scheduling recv-trailing-md-on-complete");
     s->trailing_md_recvd = true;
     s->recv_trailing_md_op = nullptr;
+    // Since we are only pretending to have received the trailing MD, it would
+    // be ok (not an error) if the client actually sends it later.
+    s->trailing_md_recvd_implicit_only = true;
   }
   if (s->trailing_md_recvd && s->recv_message_op) {
     // No further message will come on this stream, so finish off the
@@ -847,14 +865,13 @@ void op_state_machine_locked(inproc_stream* s, grpc_error* error) {
         "op_state_machine scheduling recv-message-on-complete");
     s->recv_message_op = nullptr;
   }
-  if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) &&
-      s->send_message_op) {
+  if (s->trailing_md_recvd && s->send_message_op && s->t->is_client) {
     // Nothing further will try to receive from this stream, so finish off
     // any outstanding send_message op
     s->send_message_op->payload->send_message.send_message.reset();
     complete_if_batch_end_locked(
         s, new_err, s->send_message_op,
-        "op_state_machine scheduling send-message-on-complete");
+        "op_state_machine scheduling send-message-on-complete case 3");
     s->send_message_op = nullptr;
   }
   if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||