Explorar o código

Reviewer comments

Yash Tibrewal %!s(int64=7) %!d(string=hai) anos
pai
achega
4009616b58

+ 7 - 6
src/core/ext/filters/http/client/http_client_filter.cc

@@ -58,6 +58,8 @@ struct call_data {
   grpc_metadata_batch* recv_trailing_metadata;
   grpc_closure* original_recv_trailing_metadata_ready;
   grpc_closure recv_trailing_metadata_ready;
+  grpc_error* recv_trailing_metadata_error;
+  bool seen_recv_trailing_metadata_ready;
   // State for handling send_message ops.
   grpc_transport_stream_op_batch* send_message_batch;
   size_t send_message_bytes_read;
@@ -67,8 +69,6 @@ struct call_data {
   grpc_closure on_send_message_next_done;
   grpc_closure* original_send_message_on_complete;
   grpc_closure send_message_on_complete;
-  grpc_error* recv_trailing_metadata_err;
-  bool seen_recv_trailing_metadata_ready;
 };
 
 struct channel_data {
@@ -164,7 +164,7 @@ static void recv_initial_metadata_ready(void* user_data, grpc_error* error) {
   if (calld->seen_recv_trailing_metadata_ready) {
     GRPC_CALL_COMBINER_START(
         calld->call_combiner, &calld->recv_trailing_metadata_ready,
-        calld->recv_trailing_metadata_err, "continue recv trailing metadata");
+        calld->recv_trailing_metadata_error, "continue recv_trailing_metadata");
   }
   GRPC_CLOSURE_RUN(closure, error);
 }
@@ -173,12 +173,14 @@ static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
   if (calld->original_recv_initial_metadata_ready != nullptr) {
-    calld->recv_trailing_metadata_err = GRPC_ERROR_REF(error);
+    calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error);
     calld->seen_recv_trailing_metadata_ready = true;
     GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
                       recv_trailing_metadata_ready, elem,
                       grpc_schedule_on_exec_ctx);
-    GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for initial metadata");
+    GRPC_CALL_COMBINER_STOP(calld->call_combiner,
+                            "deferring recv_trailing_metadata_ready until "
+                            "after recv_initial_metadata_ready");
     return;
   }
   if (error == GRPC_ERROR_NONE) {
@@ -445,7 +447,6 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
                                   const grpc_call_element_args* args) {
   call_data* calld = static_cast<call_data*>(elem->call_data);
   calld->call_combiner = args->call_combiner;
-  calld->seen_recv_trailing_metadata_ready = false;
   GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
                     recv_initial_metadata_ready, elem,
                     grpc_schedule_on_exec_ctx);

+ 8 - 8
src/core/ext/filters/http/server/http_server_filter.cc

@@ -63,9 +63,9 @@ struct call_data {
   grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
   bool seen_recv_message_ready;
 
+  // State for intercepting recv_trailing_metadata
   grpc_closure recv_trailing_metadata_ready;
   grpc_closure* original_recv_trailing_metadata_ready;
-
   grpc_error* recv_trailing_metadata_ready_error;
   bool seen_recv_trailing_metadata_ready;
 };
@@ -304,15 +304,14 @@ static void hs_recv_initial_metadata_ready(void* user_data, grpc_error* err) {
   } else {
     GRPC_ERROR_REF(err);
   }
-  grpc_closure* closure = calld->original_recv_initial_metadata_ready;
-  calld->original_recv_initial_metadata_ready = nullptr;
   if (calld->seen_recv_trailing_metadata_ready) {
     GRPC_CALL_COMBINER_START(calld->call_combiner,
                              &calld->recv_trailing_metadata_ready,
                              calld->recv_trailing_metadata_ready_error,
-                             "continue recv trailing metadata");
+                             "resuming hs_recv_trailing_metadata_ready from "
+                             "hs_recv_initial_metadata_ready");
   }
-  GRPC_CLOSURE_RUN(closure, err);
+  GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready, err);
 }
 
 static void hs_recv_message_ready(void* user_data, grpc_error* err) {
@@ -342,13 +341,15 @@ static void hs_recv_message_ready(void* user_data, grpc_error* err) {
 static void hs_recv_trailing_metadata_ready(void* user_data, grpc_error* err) {
   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
-  if (calld->original_recv_initial_metadata_ready) {
+  if (!calld->seen_recv_initial_metadata_ready) {
     calld->recv_trailing_metadata_ready_error = GRPC_ERROR_REF(err);
     calld->seen_recv_trailing_metadata_ready = true;
     GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
                       hs_recv_trailing_metadata_ready, elem,
                       grpc_schedule_on_exec_ctx);
-    GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for initial metadata");
+    GRPC_CALL_COMBINER_STOP(calld->call_combiner,
+                            "deferring hs_recv_trailing_metadata_ready until "
+                            "ater hs_recv_initial_metadata_ready");
     return;
   }
   err = grpc_error_add_child(
@@ -435,7 +436,6 @@ static grpc_error* hs_init_call_elem(grpc_call_element* elem,
                                      const grpc_call_element_args* args) {
   call_data* calld = static_cast<call_data*>(elem->call_data);
   calld->call_combiner = args->call_combiner;
-  calld->seen_recv_trailing_metadata_ready = false;
   GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
                     hs_recv_initial_metadata_ready, elem,
                     grpc_schedule_on_exec_ctx);

+ 8 - 6
src/core/ext/filters/message_size/message_size_filter.cc

@@ -152,9 +152,10 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
   grpc_closure* closure = calld->next_recv_message_ready;
   calld->next_recv_message_ready = nullptr;
   if (calld->seen_recv_trailing_metadata) {
-    GRPC_CALL_COMBINER_START(
-        calld->call_combiner, &calld->recv_trailing_metadata_ready,
-        calld->recv_trailing_metadata_error, "continue recv trailing metadata");
+    GRPC_CALL_COMBINER_START(calld->call_combiner,
+                             &calld->recv_trailing_metadata_ready,
+                             calld->recv_trailing_metadata_error,
+                             "continue recv_trailing_metadata_ready");
   }
   GRPC_CLOSURE_RUN(closure, error);
 }
@@ -164,13 +165,15 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
 static void recv_trailing_metadata_ready(void* user_data, grpc_error* error) {
   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
-  if (calld->next_recv_message_ready) {
+  if (calld->next_recv_message_ready != nullptr) {
     calld->seen_recv_trailing_metadata = true;
     calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error);
     GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
                       recv_trailing_metadata_ready, elem,
                       grpc_schedule_on_exec_ctx);
-    GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for recv message");
+    GRPC_CALL_COMBINER_STOP(calld->call_combiner,
+                            "deferring recv_trailing_metadata_ready until "
+                            "after recv_message_ready");
     return;
   }
   error =
@@ -227,7 +230,6 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
   calld->next_recv_message_ready = nullptr;
   calld->original_recv_trailing_metadata_ready = nullptr;
   calld->error = GRPC_ERROR_NONE;
-  calld->seen_recv_trailing_metadata = false;
   GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem,
                     grpc_schedule_on_exec_ctx);
   GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,

+ 19 - 15
src/core/lib/security/transport/server_auth_filter.cc

@@ -41,16 +41,16 @@ struct call_data {
   grpc_transport_stream_op_batch* recv_initial_metadata_batch;
   grpc_closure* original_recv_initial_metadata_ready;
   grpc_closure recv_initial_metadata_ready;
-  grpc_error* error;
+  grpc_error* recv_initial_metadata_error;
   grpc_closure recv_trailing_metadata_ready;
   grpc_closure* original_recv_trailing_metadata_ready;
+  grpc_error* recv_trailing_metadata_error;
+  bool seen_recv_trailing_ready;
   grpc_metadata_array md;
   const grpc_metadata* consumed_md;
   size_t num_consumed_md;
   grpc_closure cancel_closure;
   gpr_atm state;  // async_state
-  grpc_error* recv_trailing_metadata_error;
-  bool seen_recv_trailing_ready;
 };
 
 struct channel_data {
@@ -116,13 +116,14 @@ static void on_md_processing_done_inner(grpc_call_element* elem,
         batch->payload->recv_initial_metadata.recv_initial_metadata,
         remove_consumed_md, elem, "Response metadata filtering error");
   }
-  calld->error = GRPC_ERROR_REF(error);
+  calld->recv_initial_metadata_error = GRPC_ERROR_REF(error);
   grpc_closure* closure = calld->original_recv_initial_metadata_ready;
   calld->original_recv_initial_metadata_ready = nullptr;
   if (calld->seen_recv_trailing_ready) {
-    GRPC_CALL_COMBINER_START(
-        calld->call_combiner, &calld->recv_trailing_metadata_ready,
-        calld->recv_trailing_metadata_error, "continue recv trailing metadata");
+    GRPC_CALL_COMBINER_START(calld->call_combiner,
+                             &calld->recv_trailing_metadata_ready,
+                             calld->recv_trailing_metadata_error,
+                             "continue recv_trailing_metadata_ready");
   }
   GRPC_CLOSURE_SCHED(closure, error);
 }
@@ -196,9 +197,10 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
   grpc_closure* closure = calld->original_recv_initial_metadata_ready;
   calld->original_recv_initial_metadata_ready = nullptr;
   if (calld->seen_recv_trailing_ready) {
-    GRPC_CALL_COMBINER_START(
-        calld->call_combiner, &calld->recv_trailing_metadata_ready,
-        calld->recv_trailing_metadata_error, "continue recv trailing metadata");
+    GRPC_CALL_COMBINER_START(calld->call_combiner,
+                             &calld->recv_trailing_metadata_ready,
+                             calld->recv_trailing_metadata_error,
+                             "continue recv_trailing_metadata_ready");
   }
   GRPC_CLOSURE_RUN(closure, GRPC_ERROR_REF(error));
 }
@@ -206,15 +208,18 @@ static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
 static void recv_trailing_metadata_ready(void* user_data, grpc_error* err) {
   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
-  if (calld->original_recv_initial_metadata_ready) {
+  if (calld->original_recv_initial_metadata_ready != nullptr) {
     calld->recv_trailing_metadata_error = GRPC_ERROR_REF(err);
     calld->seen_recv_trailing_ready = true;
     GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
                       recv_trailing_metadata_ready, elem,
                       grpc_schedule_on_exec_ctx);
-    GRPC_CALL_COMBINER_STOP(calld->call_combiner, "wait for initial metadata");
+    GRPC_CALL_COMBINER_STOP(calld->call_combiner,
+                            "deferring recv_trailing_metadata_ready until "
+                            "after recv_initial_metadata_ready");
   }
-  err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error));
+  err = grpc_error_add_child(
+      GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->recv_initial_metadata_error));
   GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
 }
 
@@ -251,7 +256,6 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
   GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
                     recv_trailing_metadata_ready, elem,
                     grpc_schedule_on_exec_ctx);
-  calld->seen_recv_trailing_ready = false;
   // Create server security context.  Set its auth context from channel
   // data and save it in the call context.
   grpc_server_security_context* server_ctx =
@@ -273,7 +277,7 @@ static void destroy_call_elem(grpc_call_element* elem,
                               const grpc_call_final_info* final_info,
                               grpc_closure* ignored) {
   call_data* calld = static_cast<call_data*>(elem->call_data);
-  GRPC_ERROR_UNREF(calld->error);
+  GRPC_ERROR_UNREF(calld->recv_initial_metadata_error);
 }
 
 /* Constructor for channel_data */

+ 34 - 9
src/core/lib/surface/server.cc

@@ -150,12 +150,15 @@ struct call_data {
   grpc_closure kill_zombie_closure;
   grpc_closure* on_done_recv_initial_metadata;
   grpc_closure recv_trailing_metadata_ready;
-  grpc_error* error;
+  grpc_error* recv_initial_metadata_error;
   grpc_closure* original_recv_trailing_metadata_ready;
+  grpc_error* recv_trailing_metadata_error;
+  bool seen_recv_trailing_metadata_ready;
 
   grpc_closure publish;
 
   call_data* pending_next;
+  grpc_call_combiner* call_combiner;
 };
 
 struct request_matcher {
@@ -730,18 +733,39 @@ static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {
     grpc_error* src_error = error;
     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
         "Missing :authority or :path", &error, 1);
-    GRPC_ERROR_UNREF(src_error);
-  }
-
-  GRPC_CLOSURE_RUN(calld->on_done_recv_initial_metadata, error);
+    /* Pass the src_error reference to calld->recv_initial_metadata_error */
+    calld->recv_initial_metadata_error = src_error;
+  }
+  grpc_closure* closure = calld->on_done_recv_initial_metadata;
+  calld->on_done_recv_initial_metadata = nullptr;
+  if (calld->seen_recv_trailing_metadata_ready) {
+    GRPC_CALL_COMBINER_START(calld->call_combiner,
+                             &calld->recv_trailing_metadata_ready,
+                             calld->recv_trailing_metadata_error,
+                             "continue server_recv_trailing_metadata_ready");
+  }
+  GRPC_CLOSURE_RUN(closure, error);
 }
 
 static void server_recv_trailing_metadata_ready(void* user_data,
-                                                grpc_error* err) {
+                                                grpc_error* error) {
   grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
   call_data* calld = static_cast<call_data*>(elem->call_data);
-  err = grpc_error_add_child(GRPC_ERROR_REF(err), GRPC_ERROR_REF(calld->error));
-  GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, err);
+  if (calld->on_done_recv_initial_metadata != nullptr) {
+    calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error);
+    calld->seen_recv_trailing_metadata_ready = true;
+    GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,
+                      server_recv_trailing_metadata_ready, elem,
+                      grpc_schedule_on_exec_ctx);
+    GRPC_CALL_COMBINER_STOP(calld->call_combiner,
+                            "deferring server_recv_trailing_metadata_ready "
+                            "until after server_on_recv_initial_metadata");
+    return;
+  }
+  error =
+      grpc_error_add_child(GRPC_ERROR_REF(error),
+                           GRPC_ERROR_REF(calld->recv_initial_metadata_error));
+  GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);
 }
 
 static void server_mutate_op(grpc_call_element* elem,
@@ -845,6 +869,7 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
   memset(calld, 0, sizeof(call_data));
   calld->deadline = GRPC_MILLIS_INF_FUTURE;
   calld->call = grpc_call_from_top_element(elem);
+  calld->call_combiner = args->call_combiner;
 
   GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata,
                     server_on_recv_initial_metadata, elem,
@@ -863,7 +888,7 @@ static void destroy_call_elem(grpc_call_element* elem,
   call_data* calld = static_cast<call_data*>(elem->call_data);
 
   GPR_ASSERT(calld->state != PENDING);
-  GRPC_ERROR_UNREF(calld->error);
+  GRPC_ERROR_UNREF(calld->recv_initial_metadata_error);
   if (calld->host_set) {
     grpc_slice_unref_internal(calld->host);
   }