浏览代码

Be cautious and wait for possible error causing callbacks before we treat trailing metadata

Yash Tibrewal 7 年之前
父节点
当前提交
86f1c7a5df

+ 17 - 1
src/core/ext/filters/http/client/http_client_filter.cc

@@ -67,6 +67,8 @@ struct call_data {
   grpc_closure on_send_message_next_done;
   grpc_closure* original_send_message_on_complete;
   grpc_closure send_message_on_complete;
+  grpc_error* recv_trailing_metadata_err;
+  bool seen_recv_trailing_metadata_ready;
 };
 
 struct channel_data {
@@ -157,12 +159,25 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) {
   } else {
     GRPC_ERROR_REF(error);
   }
-  GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, error);
+  grpc_closure* closure = calld->original_recv_initial_metadata_ready;
+  calld->original_recv_initial_metadata_ready = nullptr;
+  if (calld->seen_recv_trailing_metadata_ready) {
+    GRPC_CALL_COMBINER_START(
+        calld->call_combiner, &calld->recv_trailing_metadata_ready,
+        calld->recv_trailing_metadata_err, "continue recv trailing metadata");
+  }
+  GRPC_CLOSURE_RUN(closure, error);
 }
 
 static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
+  if (calld->original_recv_initial_metadata_ready != nullptr) {
+    calld->recv_trailing_metadata_err = GRPC_ERROR_REF(error);
+    calld->seen_recv_trailing_metadata_ready = true;
+    GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for initial metadata");
+    return;
+  }
   if (error == GRPC_ERROR_NONE) {
     error =
         client_filter_incoming_metadata(elem, calld->recv_trailing_metadata);
@@ -427,6 +442,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
                                   const grpc_call_element_args* args) {
   call_data* calld = static_cast<call_data*>(elem->call_data);
   calld->call_combiner = args->call_combiner;
+  calld->seen_recv_trailing_metadata_ready = false;
   GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
                     recv_initial_metadata_ready, elem,
                     grpc_schedule_on_exec_ctx);

+ 19 - 1
src/core/ext/filters/http/server/http_server_filter.cc

@@ -64,6 +64,9 @@ struct call_data {
 
   grpc_closure recv_trailing_metadata_ready;
   grpc_closure* original_recv_trailing_metadata_ready;
+
+  grpc_error* recv_trailing_metadata_ready_error;
+  bool seen_recv_trailing_metadata_ready;
 };
 
 }  // namespace
@@ -291,7 +294,15 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) {
   } else {
     GRPC_ERROR_REF(err);
   }
-  GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, err);
+  grpc_closure* closure = calld->original_recv_initial_metadata_ready;
+  calld->original_recv_initial_metadata_ready = nullptr;
+  if (calld->seen_recv_trailing_metadata_ready) {
+    GRPC_CALL_COMBINER_START(calld->call_combiner,
+                             &calld->recv_trailing_metadata_ready,
+                             calld->recv_trailing_metadata_ready_error,
+                             "continue recv trailing metadata");
+  }
+  GRPC_CLOSURE_RUN(closure, err);
 }
 
 static void hs_recv_message_ready(void* user_data, grpc_error* err) {
@@ -321,6 +332,12 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) {
 static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) {
   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
+  if (calld->original_recv_initial_metadata_ready) {
+    calld->recv_trailing_metadata_ready_error = GRPC_ERROR_REF(err);
+    calld->seen_recv_trailing_metadata_ready = true;
+    GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for initial metadata");
+    return;
+  }
   err = grpc_error_add_child(
       GRPC_ERROR_REF(err),
       GRPC_ERROR_REF(calld->recv_initial_metadata_ready_error));
@@ -405,6 +422,7 @@ static grpc_error* hs_init_call_elem(grpc_call_element* elem,
                                      const grpc_call_element_args* args) {
   call_data* calld = static_cast<call_data*>(elem->call_data);
   calld->call_combiner = args->call_combiner;
+  calld->seen_recv_trailing_metadata_ready = false;
   GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
                     hs_recv_initial_metadata_ready, elem,
                     grpc_schedule_on_exec_ctx);

+ 17 - 1
src/core/ext/filters/message_size/message_size_filter.cc

@@ -108,6 +108,8 @@ struct call_data {
   grpc_closure* next_recv_message_ready;
   // Original recv_trailing_metadata callback, invoked after our own.
   grpc_closure* original_recv_trailing_metadata_ready;
+  bool seen_recv_trailing_metadata;
+  grpc_error* recv_trailing_metadata_error;
 };
 
 struct channel_data {
@@ -147,7 +149,14 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
     GRPC_ERROR_REF(error);
   }
   // Invoke the next callback.
-  GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error);
+  grpc_closure* closure = calld->next_recv_message_ready;
+  calld->next_recv_message_ready = nullptr;
+  if (calld->seen_recv_trailing_metadata) {
+    GRPC_CALL_COMBINER_START(
+        calld->call_combiner, &calld->recv_trailing_metadata_ready,
+        calld->recv_trailing_metadata_error, "continue recv trailing metadata");
+  }
+  GRPC_CLOSURE_RUN(closure, error);
 }
 
 // Callback invoked on completion of recv_trailing_metadata
@@ -155,6 +164,12 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
 static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
+  if (calld->next_recv_message_ready) {
+    calld->seen_recv_trailing_metadata = true;
+    calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error);
+    GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for recv message");
+    return;
+  }
   error =
       grpc_error_add_child(GRPC_ERROR_REF(error), GRPC_ERROR_REF(calld->error));
   // Invoke the next callback.
@@ -209,6 +224,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
   calld->next_recv_message_ready = nullptr;
   calld->original_recv_trailing_metadata_ready = nullptr;
   calld->error = GRPC_ERROR_NONE;
+  calld->seen_recv_trailing_metadata = false;
   GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem,
                     grpc_schedule_on_exec_ctx);
   GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,

+ 24 - 3
src/core/lib/security/transport/server_auth_filter.cc

@@ -49,6 +49,8 @@ struct call_data {
   size_t num_consumed_md;
   grpc_closure cancel_closure;
   gpr_atm state;  // async_state
+  grpc_error* recv_trailing_metadata_error;
+  bool seen_recv_trailing_ready;
 };
 
 struct channel_data {
@@ -115,7 +117,14 @@ static void on_md_processing_done_inner(grpc_call_element* elem,
         remove_consumed_md, elem, "Response metadata filtering error");
   }
   calld->error = GRPC_ERROR_REF(error);
-  GRPC_CLOSURE_SCHED(calld->original_recv_initial_metadata_ready, error);
+  grpc_closure* closure = calld->original_recv_initial_metadata_ready;
+  calld->original_recv_initial_metadata_ready = nullptr;
+  if (calld->seen_recv_trailing_ready) {
+    GRPC_CALL_COMBINER_START(
+        calld->call_combiner, &calld->recv_trailing_metadata_ready,
+        calld->recv_trailing_metadata_error, "continue recv trailing metadata");
+  }
+  GRPC_CLOSURE_SCHED(closure, error);
 }
 
 // Called from application code.
@@ -184,13 +193,24 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
       return;
     }
   }
-  GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready,
-                   GRPC_ERROR_REF(error));
+  grpc_closure* closure = calld->original_recv_initial_metadata_ready;
+  calld->original_recv_initial_metadata_ready = nullptr;
+  if (calld->seen_recv_trailing_ready) {
+    GRPC_CALL_COMBINER_START(
+        calld->call_combiner, &calld->recv_trailing_metadata_ready,
+        calld->recv_trailing_metadata_error, "continue recv trailing metadata");
+  }
+  GRPC_CLOSURE_RUN(closure, GRPC_ERROR_REF(error));
 }
 
 static void recv_trailing_metadata_ready(void* user_data, grpc_error* err) {
   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
+  if (calld->original_recv_initial_metadata_ready) {
+    calld->recv_trailing_metadata_error = GRPC_ERROR_REF(err);
+    calld->seen_recv_trailing_ready = true;
+    GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for initial metadata");
+  }
   err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error));
   GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
 }
@@ -228,6 +248,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
   GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
                     recv_trailing_metadata_ready, elem,
                     grpc_schedule_on_exec_ctx);
+  calld->seen_recv_trailing_ready = false;
   // Create server security context.  Set its auth context from channel
   // data and save it in the call context.
   grpc_server_security_context* server_ctx =