Przeglądaj źródła

Pass deadline into filters via grpc_call_element_args, so that we can
start the timer before the first op is sent down.

Mark D. Roth 9 lat temu
rodzic
commit
f28763c68c

+ 20 - 8
src/core/ext/client_config/client_channel.c

@@ -385,6 +385,9 @@ typedef struct client_channel_call_data {
   // stack and each has its own mutex.  If/when we have time, find a way
   // to avoid this without breaking the grpc_deadline_state abstraction.
   grpc_deadline_state deadline_state;
+  gpr_timespec deadline;
+
+  grpc_error *cancel_error;
 
   /** either 0 for no call, 1 for cancelled, or a pointer to a
       grpc_subchannel_call */
@@ -482,7 +485,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
   } else {
     grpc_subchannel_call *subchannel_call = NULL;
     grpc_error *new_error = grpc_connected_subchannel_create_call(
-        exec_ctx, calld->connected_subchannel, calld->pollent,
+        exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
         &subchannel_call);
     if (new_error != GRPC_ERROR_NONE) {
       new_error = grpc_error_add_child(new_error, error);
@@ -627,8 +630,8 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
   grpc_subchannel_call *call = GET_CALL(calld);
   GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0);
   if (call == CANCELLED_CALL) {
-    grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
-                                                 GRPC_ERROR_CANCELLED);
+    grpc_transport_stream_op_finish_with_failure(
+        exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
     GPR_TIMER_END("cc_start_transport_stream_op", 0);
     return;
   }
@@ -644,8 +647,8 @@ retry:
   call = GET_CALL(calld);
   if (call == CANCELLED_CALL) {
     gpr_mu_unlock(&calld->mu);
-    grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
-                                                 GRPC_ERROR_CANCELLED);
+    grpc_transport_stream_op_finish_with_failure(
+        exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error));
     GPR_TIMER_END("cc_start_transport_stream_op", 0);
     return;
   }
@@ -661,6 +664,12 @@ retry:
                          (gpr_atm)(uintptr_t)CANCELLED_CALL)) {
       goto retry;
     } else {
+      // Stash a copy of cancel_error in our call data, so that we can use
+      // it for subsequent operations.  This ensures that if the call is
+      // cancelled before any ops are passed down (e.g., if the deadline
+      // is in the past when the call starts), we can return the right
+      // error to the caller when the first op does get passed down.
+      calld->cancel_error = GRPC_ERROR_REF(op->cancel_error);
       switch (calld->creation_phase) {
         case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
           fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error));
@@ -697,7 +706,7 @@ retry:
       calld->connected_subchannel != NULL) {
     grpc_subchannel_call *subchannel_call = NULL;
     grpc_error *error = grpc_connected_subchannel_create_call(
-        exec_ctx, calld->connected_subchannel, calld->pollent,
+        exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline,
         &subchannel_call);
     if (error != GRPC_ERROR_NONE) {
       subchannel_call = CANCELLED_CALL;
@@ -720,7 +729,9 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
                                      grpc_call_element *elem,
                                      grpc_call_element_args *args) {
   call_data *calld = elem->call_data;
-  grpc_deadline_state_init(&calld->deadline_state, args->call_stack);
+  grpc_deadline_state_init(exec_ctx, elem, args);
+  calld->deadline = args->deadline;
+  calld->cancel_error = GRPC_ERROR_NONE;
   gpr_atm_rel_store(&calld->subchannel_call, 0);
   gpr_mu_init(&calld->mu);
   calld->connected_subchannel = NULL;
@@ -739,7 +750,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
                                  const grpc_call_final_info *final_info,
                                  void *and_free_memory) {
   call_data *calld = elem->call_data;
-  grpc_deadline_state_destroy(exec_ctx, &calld->deadline_state);
+  grpc_deadline_state_destroy(exec_ctx, elem);
+  GRPC_ERROR_UNREF(calld->cancel_error);
   grpc_subchannel_call *call = GET_CALL(calld);
   if (call != NULL && call != CANCELLED_CALL) {
     GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call");

+ 3 - 2
src/core/ext/client_config/subchannel.c

@@ -706,14 +706,15 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
 
 grpc_error *grpc_connected_subchannel_create_call(
     grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
-    grpc_polling_entity *pollent, grpc_subchannel_call **call) {
+    grpc_polling_entity *pollent, gpr_timespec deadline,
+    grpc_subchannel_call **call) {
   grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
   *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
   grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
   (*call)->connection = con;  // Ref is added below.
   grpc_error *error =
       grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call,
-                           NULL, NULL, callstk);
+                           NULL, NULL, deadline, callstk);
   if (error != GRPC_ERROR_NONE) {
     const char *error_string = grpc_error_string(error);
     gpr_log(GPR_ERROR, "error: %s", error_string);

+ 2 - 1
src/core/ext/client_config/subchannel.h

@@ -110,7 +110,8 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
 /** construct a subchannel call */
 grpc_error *grpc_connected_subchannel_create_call(
     grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
-    grpc_polling_entity *pollent, grpc_subchannel_call **subchannel_call);
+    grpc_polling_entity *pollent, gpr_timespec deadline,
+    grpc_subchannel_call **subchannel_call);
 
 /** process a transport level op */
 void grpc_connected_subchannel_process_transport_op(

+ 6 - 7
src/core/lib/channel/channel_stack.c

@@ -157,13 +157,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
   }
 }
 
-grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
-                                 grpc_channel_stack *channel_stack,
-                                 int initial_refs, grpc_iomgr_cb_func destroy,
-                                 void *destroy_arg,
-                                 grpc_call_context_element *context,
-                                 const void *transport_server_data,
-                                 grpc_call_stack *call_stack) {
+grpc_error *grpc_call_stack_init(
+    grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+    int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
+    grpc_call_context_element *context, const void *transport_server_data,
+    gpr_timespec deadline, grpc_call_stack *call_stack) {
   grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
   grpc_call_element_args args;
   size_t count = channel_stack->count;
@@ -184,6 +182,7 @@ grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
     args.call_stack = call_stack;
     args.server_transport_data = transport_server_data;
     args.context = context;
+    args.deadline = deadline;
     call_elems[i].filter = channel_elems[i].filter;
     call_elems[i].channel_data = channel_elems[i].channel_data;
     call_elems[i].call_data = user_data;

+ 6 - 7
src/core/lib/channel/channel_stack.h

@@ -74,6 +74,7 @@ typedef struct {
   grpc_call_stack *call_stack;
   const void *server_transport_data;
   grpc_call_context_element *context;
+  gpr_timespec deadline;
 } grpc_call_element_args;
 
 typedef struct {
@@ -220,13 +221,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
 /* Initialize a call stack given a channel stack. transport_server_data is
    expected to be NULL on a client, or an opaque transport owned pointer on the
    server. */
-grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
-                                 grpc_channel_stack *channel_stack,
-                                 int initial_refs, grpc_iomgr_cb_func destroy,
-                                 void *destroy_arg,
-                                 grpc_call_context_element *context,
-                                 const void *transport_server_data,
-                                 grpc_call_stack *call_stack);
+grpc_error *grpc_call_stack_init(
+    grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack,
+    int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg,
+    grpc_call_context_element *context, const void *transport_server_data,
+    gpr_timespec deadline, grpc_call_stack *call_stack);
 /* Set a pollset or a pollset_set for a call stack: must occur before the first
  * op is started */
 void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,

+ 43 - 15
src/core/lib/channel/deadline_filter.c

@@ -34,10 +34,12 @@
 #include <stdbool.h>
 #include <string.h>
 
+#include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
 #include <grpc/support/sync.h>
 #include <grpc/support/time.h>
 
+#include "src/core/lib/iomgr/exec_ctx.h"
 #include "src/core/lib/iomgr/timer.h"
 
 //
@@ -106,15 +108,49 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state,
   op->on_complete = &deadline_state->on_complete;
 }
 
-void grpc_deadline_state_init(grpc_deadline_state* deadline_state,
-                              grpc_call_stack* call_stack) {
+// Callback and associated state for starting the timer after call stack
+// initialization has been completed.
+struct start_timer_after_init_state {
+  grpc_call_element* elem;
+  gpr_timespec deadline;
+  grpc_closure closure;
+};
+static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg,
+                                   grpc_error* error) {
+  struct start_timer_after_init_state* state = arg;
+  start_timer_if_needed(exec_ctx, state->elem, state->deadline);
+  gpr_free(state);
+}
+
+void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+                              grpc_call_element_args* args) {
+  grpc_deadline_state* deadline_state = elem->call_data;
   memset(deadline_state, 0, sizeof(*deadline_state));
-  deadline_state->call_stack = call_stack;
+  deadline_state->call_stack = args->call_stack;
   gpr_mu_init(&deadline_state->timer_mu);
+  // Deadline will always be infinite on servers, so the timer will only be
+  // set on clients with a finite deadline.
+  const gpr_timespec deadline =
+      gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC);
+  if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) {
+    // When the deadline passes, we indicate the failure by sending down
+    // an op with cancel_error set.  However, we can't send down any ops
+    // until after the call stack is fully initialized.  If we start the
+    // timer here, we have no guarantee that the timer won't pop before
+    // call stack initialization is finished.  To avoid that problem, we
+    // create a closure to start the timer, and we schedule that closure
+    // to be run after call stack initialization is done.
+    struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state));
+    state->elem = elem;
+    state->deadline = deadline;
+    grpc_closure_init(&state->closure, start_timer_after_init, state);
+    grpc_exec_ctx_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE, NULL);
+  }
 }
 
 void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
-                                 grpc_deadline_state* deadline_state) {
+                                 grpc_call_element* elem) {
+  grpc_deadline_state* deadline_state = elem->call_data;
   cancel_timer_if_needed(exec_ctx, deadline_state);
   gpr_mu_destroy(&deadline_state->timer_mu);
 }
@@ -127,12 +163,6 @@ void grpc_deadline_state_client_start_transport_stream_op(
       op->close_error != GRPC_ERROR_NONE) {
     cancel_timer_if_needed(exec_ctx, deadline_state);
   } else {
-    // If we're sending initial metadata, get the deadline from the metadata
-    // and start the timer if needed.
-    if (op->send_initial_metadata != NULL) {
-      start_timer_if_needed(exec_ctx, elem,
-                            op->send_initial_metadata->deadline);
-    }
     // Make sure we know when the call is complete, so that we can cancel
     // the timer.
     if (op->recv_trailing_metadata != NULL) {
@@ -177,10 +207,9 @@ typedef struct server_call_data {
 static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
                                   grpc_call_element* elem,
                                   grpc_call_element_args* args) {
-  base_call_data* calld = elem->call_data;
   // Note: size of call data is different between client and server.
-  memset(calld, 0, elem->filter->sizeof_call_data);
-  grpc_deadline_state_init(&calld->deadline_state, args->call_stack);
+  memset(elem->call_data, 0, elem->filter->sizeof_call_data);
+  grpc_deadline_state_init(exec_ctx, elem, args);
   return GRPC_ERROR_NONE;
 }
 
@@ -188,8 +217,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx,
 static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
                               const grpc_call_final_info* final_info,
                               void* and_free_memory) {
-  base_call_data* calld = elem->call_data;
-  grpc_deadline_state_destroy(exec_ctx, &calld->deadline_state);
+  grpc_deadline_state_destroy(exec_ctx, elem);
 }
 
 // Method for starting a call op for client filter.

+ 12 - 10
src/core/lib/channel/deadline_filter.h

@@ -36,7 +36,7 @@
 #include "src/core/lib/iomgr/timer.h"
 
 // State used for filters that enforce call deadlines.
-// Should be the first field in the filter's call_data.
+// Must be the first field in the filter's call_data.
 typedef struct grpc_deadline_state {
   // We take a reference to the call stack for the timer callback.
   grpc_call_stack* call_stack;
@@ -54,16 +54,18 @@ typedef struct grpc_deadline_state {
   grpc_closure* next_on_complete;
 } grpc_deadline_state;
 
-void grpc_deadline_state_init(grpc_deadline_state* call_data,
-                              grpc_call_stack* call_stack);
-void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
-                                 grpc_deadline_state* call_data);
-
-// To be used in a filter's start_transport_stream_op() method to
-// enforce call deadlines.
-// It is the caller's responsibility to chain to the next filter if
-// necessary after this function returns.
+// To be used in a filter's init_call_elem(), destroy_call_elem(), and
+// start_transport_stream_op() methods to enforce call deadlines.
+//
 // REQUIRES: The first field in elem->call_data is a grpc_deadline_state.
+//
+// For grpc_deadline_state_client_start_transport_stream_op(), it is the
+// caller's responsibility to chain to the next filter if necessary
+// after the function returns.
+void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
+                              grpc_call_element_args* args);
+void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx,
+                                 grpc_call_element* elem);
 void grpc_deadline_state_client_start_transport_stream_op(
     grpc_exec_ctx* exec_ctx, grpc_call_element* elem,
     grpc_transport_stream_op* op);

+ 42 - 30
src/core/lib/surface/call.c

@@ -254,34 +254,7 @@ grpc_call *grpc_call_create(
     }
   }
   send_deadline = gpr_convert_clock_type(send_deadline, GPR_CLOCK_MONOTONIC);
-  GRPC_CHANNEL_INTERNAL_REF(channel, "call");
-  /* initial refcount dropped by grpc_call_destroy */
-  grpc_error *error = grpc_call_stack_init(
-      &exec_ctx, channel_stack, 1, destroy_call, call, call->context,
-      server_transport_data, CALL_STACK_FROM_CALL(call));
-  if (error != GRPC_ERROR_NONE) {
-    grpc_status_code status;
-    const char *error_str;
-    grpc_error_get_status(error, &status, &error_str);
-    close_with_status(&exec_ctx, call, status, error_str);
-    GRPC_ERROR_UNREF(error);
-  }
-  if (cq != NULL) {
-    GPR_ASSERT(
-        pollset_set_alternative == NULL &&
-        "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
-    GRPC_CQ_INTERNAL_REF(cq, "bind");
-    call->pollent =
-        grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
-  }
-  if (pollset_set_alternative != NULL) {
-    call->pollent =
-        grpc_polling_entity_create_from_pollset_set(pollset_set_alternative);
-  }
-  if (!grpc_polling_entity_is_empty(&call->pollent)) {
-    grpc_call_stack_set_pollset_or_pollset_set(
-        &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
-  }
+
   if (parent_call != NULL) {
     GRPC_CALL_INTERNAL_REF(parent_call, "child");
     GPR_ASSERT(call->is_client);
@@ -323,7 +296,38 @@ grpc_call *grpc_call_create(
 
     gpr_mu_unlock(&parent_call->mu);
   }
+
   call->send_deadline = send_deadline;
+
+  GRPC_CHANNEL_INTERNAL_REF(channel, "call");
+  /* initial refcount dropped by grpc_call_destroy */
+  grpc_error *error = grpc_call_stack_init(
+      &exec_ctx, channel_stack, 1, destroy_call, call, call->context,
+      server_transport_data, send_deadline, CALL_STACK_FROM_CALL(call));
+  if (error != GRPC_ERROR_NONE) {
+    grpc_status_code status;
+    const char *error_str;
+    grpc_error_get_status(error, &status, &error_str);
+    close_with_status(&exec_ctx, call, status, error_str);
+    GRPC_ERROR_UNREF(error);
+  }
+  if (cq != NULL) {
+    GPR_ASSERT(
+        pollset_set_alternative == NULL &&
+        "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
+    GRPC_CQ_INTERNAL_REF(cq, "bind");
+    call->pollent =
+        grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
+  }
+  if (pollset_set_alternative != NULL) {
+    call->pollent =
+        grpc_polling_entity_create_from_pollset_set(pollset_set_alternative);
+  }
+  if (!grpc_polling_entity_is_empty(&call->pollent)) {
+    grpc_call_stack_set_pollset_or_pollset_set(
+        &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
+  }
+
   grpc_exec_ctx_finish(&exec_ctx);
   GPR_TIMER_END("grpc_call_create", 0);
   return call;
@@ -1220,8 +1224,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
     if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
             0 &&
         !call->is_client) {
-      call->send_deadline = gpr_convert_clock_type(md->deadline,
-                                                   GPR_CLOCK_MONOTONIC);
+      call->send_deadline =
+          gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC);
     }
   }
 
@@ -1250,6 +1254,14 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
   GRPC_ERROR_REF(error);
 
   gpr_mu_lock(&call->mu);
+
+  // If the error has an associated status code, set the call's status.
+  intptr_t status;
+  if (error != GRPC_ERROR_NONE &&
+      grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) {
+    set_status_from_error(call, STATUS_FROM_CORE, error);
+  }
+
   if (bctl->send_initial_metadata) {
     if (error != GRPC_ERROR_NONE) {
       set_status_from_error(call, STATUS_FROM_CORE, error);

+ 0 - 10
test/core/end2end/invalid_call_argument_test.c

@@ -304,11 +304,6 @@ static void test_receive_initial_metadata_twice_at_client() {
   grpc_op *op;
   prepare_test(1);
   op = g_state.ops;
-  op->op = GRPC_OP_SEND_INITIAL_METADATA;
-  op->data.send_initial_metadata.count = 0;
-  op->flags = 0;
-  op->reserved = NULL;
-  op++;
   op->op = GRPC_OP_RECV_INITIAL_METADATA;
   op->data.recv_initial_metadata = &g_state.initial_metadata_recv;
   op->flags = 0;
@@ -397,11 +392,6 @@ static void test_recv_status_on_client_twice() {
   prepare_test(1);
 
   op = g_state.ops;
-  op->op = GRPC_OP_SEND_INITIAL_METADATA;
-  op->data.send_initial_metadata.count = 0;
-  op->flags = 0;
-  op->reserved = NULL;
-  op++;
   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
   op->data.recv_status_on_client.trailing_metadata =
       &g_state.trailing_metadata_recv;

+ 6 - 10
test/core/end2end/tests/negative_deadline.c

@@ -122,11 +122,6 @@ static void simple_request_body(grpc_end2end_test_fixture f, size_t num_ops) {
 
   memset(ops, 0, sizeof(ops));
   op = ops;
-  op->op = GRPC_OP_SEND_INITIAL_METADATA;
-  op->data.send_initial_metadata.count = 0;
-  op->flags = 0;
-  op->reserved = NULL;
-  op++;
   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
   op->data.recv_status_on_client.status = &status;
@@ -140,14 +135,15 @@ static void simple_request_body(grpc_end2end_test_fixture f, size_t num_ops) {
   op->flags = 0;
   op->reserved = NULL;
   op++;
+  op->op = GRPC_OP_SEND_INITIAL_METADATA;
+  op->data.send_initial_metadata.count = 0;
+  op->flags = 0;
+  op->reserved = NULL;
+  op++;
   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
   op->flags = 0;
   op->reserved = NULL;
   op++;
-  // Need to send at least the SEND_INITIAL_METADATA and
-  // RECV_STATUS_ON_CLIENT ops, since the former allows the client to set
-  // the deadline timer, and the latter returns status to the test.
-  GPR_ASSERT(num_ops >= 2);
   GPR_ASSERT(num_ops <= (size_t)(op - ops));
   error = grpc_call_start_batch(c, ops, num_ops, tag(1), NULL);
   GPR_ASSERT(GRPC_CALL_OK == error);
@@ -178,7 +174,7 @@ static void test_invoke_simple_request(grpc_end2end_test_config config,
 
 void negative_deadline(grpc_end2end_test_config config) {
   size_t i;
-  for (i = 2; i <= 4; i++) {
+  for (i = 1; i <= 4; i++) {
     test_invoke_simple_request(config, i);
   }
 }