Browse Source

Simplify call error/status aggregation

kpayson64 7 years ago
parent
commit
87daf00f43
2 changed files with 135 additions and 244 deletions
  1. 37 1
      src/core/ext/filters/message_size/message_size_filter.cc
  2. 98 243
      src/core/lib/surface/call.cc

+ 37 - 1
src/core/ext/filters/message_size/message_size_filter.cc

@@ -99,10 +99,15 @@ struct call_data {
   // recv_message_ready up-call on transport_stream_op, and remember to
   // call our next_recv_message_ready member after handling it.
   grpc_closure recv_message_ready;
+  grpc_closure recv_trailing_metadata;
+  // The error caused by a message that is too large, or GRPC_ERROR_NONE
+  grpc_error* error;
   // Used by recv_message_ready.
   grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message;
   // Original recv_message_ready callback, invoked after our own.
   grpc_closure* next_recv_message_ready;
+  // Original recv_trailing_metadata callback, invoked after our own.
+  grpc_closure* next_recv_trailing_metadata;
 };
 
 struct channel_data {
@@ -130,6 +135,8 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
     grpc_error* new_error = grpc_error_set_int(
         GRPC_ERROR_CREATE_FROM_COPIED_STRING(message_string),
         GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED);
+    GRPC_ERROR_UNREF(calld->error);
+    calld->error = GRPC_ERROR_REF(new_error);
     if (error == GRPC_ERROR_NONE) {
       error = new_error;
     } else {
@@ -144,6 +151,23 @@ static void recv_message_ready(void* user_data, grpc_error* error) {
   GRPC_CLOSURE_RUN(calld->next_recv_message_ready, error);
 }
 
+// Callback invoked on completion of recv_trailing_metadata
+// Notifies the recv_trailing_metadata batch of any message size failures
+static void recv_trailing_metadata(void* user_data, grpc_error* error) {
+  grpc_call_element* elem = (grpc_call_element*)user_data;
+  call_data* calld = (call_data*)elem->call_data;
+  if (error == GRPC_ERROR_NONE) {
+    error = calld->error;
+  } else if (calld->error == GRPC_ERROR_NONE) {
+    error = GRPC_ERROR_REF(error);
+  } else if (calld->error != error) {
+    error = grpc_error_add_child(GRPC_ERROR_REF(error), calld->error);
+  }
+  calld->error = GRPC_ERROR_NONE;
+  // Invoke the next callback.
+  GRPC_CLOSURE_RUN(calld->next_recv_trailing_metadata, error);
+}
+
 // Start transport stream op.
 static void start_transport_stream_op_batch(
     grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
@@ -172,6 +196,11 @@ static void start_transport_stream_op_batch(
     calld->recv_message = op->payload->recv_message.recv_message;
     op->payload->recv_message.recv_message_ready = &calld->recv_message_ready;
   }
+  // Inject callback for receiving trailing metadata.
+  if (op->recv_trailing_metadata) {
+    calld->next_recv_trailing_metadata = op->on_complete;
+    op->on_complete = &calld->recv_trailing_metadata;
+  }
   // Chain to the next filter.
   grpc_call_next_op(elem, op);
 }
@@ -183,8 +212,12 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
   call_data* calld = static_cast<call_data*>(elem->call_data);
   calld->call_combiner = args->call_combiner;
   calld->next_recv_message_ready = nullptr;
+  calld->next_recv_trailing_metadata = nullptr;
+  calld->error = GRPC_ERROR_NONE;
   GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem,
                     grpc_schedule_on_exec_ctx);
+  GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata, recv_trailing_metadata,
+                    elem, grpc_schedule_on_exec_ctx);
   // Get max sizes from channel data, then merge in per-method config values.
   // Note: Per-method config is only available on the client, so we
   // apply the max request size to the send limit and the max response
@@ -213,7 +246,10 @@ static grpc_error* init_call_elem(grpc_call_element* elem,
 // Destructor for call_data.
 static void destroy_call_elem(grpc_call_element* elem,
                               const grpc_call_final_info* final_info,
-                              grpc_closure* ignored) {}
+                              grpc_closure* ignored) {
+  call_data* calld = (call_data*)elem->call_data;
+  GRPC_ERROR_UNREF(calld->error);
+}
 
 static int default_size(const grpc_channel_args* args,
                         int without_minimal_stack) {

+ 98 - 243
src/core/lib/surface/call.cc

@@ -70,46 +70,6 @@
 // Used to create arena for the first call.
 #define ESTIMATED_MDELEM_COUNT 16
 
-/* Status data for a request can come from several sources; this
-   enumerates them all, and acts as a priority sorting for which
-   status to return to the application - earlier entries override
-   later ones */
-typedef enum {
-  /* Status came from the application layer overriding whatever
-     the wire says */
-  STATUS_FROM_API_OVERRIDE = 0,
-  /* Status came from 'the wire' - or somewhere below the surface
-     layer */
-  STATUS_FROM_WIRE,
-  /* Status was created by some internal channel stack operation: must come via
-     add_batch_error */
-  STATUS_FROM_CORE,
-  /* Status was created by some surface error */
-  STATUS_FROM_SURFACE,
-  /* Status came from the server sending status */
-  STATUS_FROM_SERVER_STATUS,
-  STATUS_SOURCE_COUNT
-} status_source;
-
-typedef struct {
-  bool is_set;
-  grpc_error* error;
-} received_status;
-
-static gpr_atm pack_received_status(received_status r) {
-  return r.is_set ? (1 | (gpr_atm)r.error) : 0;
-}
-
-static received_status unpack_received_status(gpr_atm atm) {
-  if ((atm & 1) == 0) {
-    return {false, GRPC_ERROR_NONE};
-  } else {
-    return {true, (grpc_error*)(atm & ~static_cast<gpr_atm>(1))};
-  }
-}
-
-#define MAX_ERRORS_PER_BATCH 4
-
 typedef struct batch_control {
   grpc_call* call;
   /* Share memory for cq_completion and notify_tag as they are never needed
@@ -134,10 +94,7 @@ typedef struct batch_control {
   grpc_closure start_batch;
   grpc_closure finish_batch;
   gpr_refcount steps_to_complete;
-
-  grpc_error* errors[MAX_ERRORS_PER_BATCH];
-  gpr_atm num_errors;
-
+  grpc_error* batch_error;
   grpc_transport_stream_op_batch op;
 } batch_control;
 
@@ -200,9 +157,6 @@ struct grpc_call {
   // A char* indicating the peer name.
   gpr_atm peer_string;
 
-  /* Packed received call statuses from various sources */
-  gpr_atm status[STATUS_SOURCE_COUNT];
-
   /* Call data useful used for reporting. Only valid after the call has
    * completed */
   grpc_call_final_info final_info;
@@ -234,6 +188,7 @@ struct grpc_call {
   grpc_closure receiving_stream_ready;
   grpc_closure receiving_initial_metadata_ready;
   uint32_t test_only_last_message_flags;
+  bool cancelled;
 
   grpc_closure release_call;
 
@@ -247,6 +202,7 @@ struct grpc_call {
       int* cancelled;
     } server;
   } final_op;
+  grpc_error* status_error;
 
   /* recv_state can contain one of the following values:
      RECV_NONE :                 :  no initial metadata and messages received
@@ -279,23 +235,15 @@ grpc_core::TraceFlag grpc_compression_trace(false, "compression");
 
 static void execute_batch(grpc_call* call, grpc_transport_stream_op_batch* op,
                           grpc_closure* start_batch_closure);
-static void cancel_with_status(grpc_call* c, status_source source,
-                               grpc_status_code status,
+
+static void cancel_with_status(grpc_call* c, grpc_status_code status,
                                const char* description);
-static void cancel_with_error(grpc_call* c, status_source source,
-                              grpc_error* error);
+static void cancel_with_error(grpc_call* c, grpc_error* error);
 static void destroy_call(void* call_stack, grpc_error* error);
 static void receiving_slice_ready(void* bctlp, grpc_error* error);
-static void get_final_status(
-    grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data),
-    void* set_value_user_data, grpc_slice* details, const char** error_string);
-static void set_status_value_directly(grpc_status_code status, void* dest);
-static void set_status_from_error(grpc_call* call, status_source source,
-                                  grpc_error* error);
+static void set_final_status(grpc_call* call, grpc_error* error);
 static void process_data_after_md(batch_control* bctl);
 static void post_batch_completion(batch_control* bctl);
-static void add_batch_error(batch_control* bctl, grpc_error* error,
-                            bool has_cancelled);
 
 static void add_init_error(grpc_error** composite, grpc_error* new_err) {
   if (new_err == GRPC_ERROR_NONE) return;
@@ -456,10 +404,10 @@ grpc_error* grpc_call_create(const grpc_call_create_args* args,
     gpr_mu_unlock(&pc->child_list_mu);
   }
   if (error != GRPC_ERROR_NONE) {
-    cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
+    cancel_with_error(call, GRPC_ERROR_REF(error));
   }
   if (immediately_cancel) {
-    cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
+    cancel_with_error(call, GRPC_ERROR_CANCELLED);
   }
   if (args->cq != nullptr) {
     GPR_ASSERT(args->pollset_set_alternative == nullptr &&
@@ -520,7 +468,6 @@ static void release_call(void* call, grpc_error* error) {
   GRPC_CHANNEL_INTERNAL_UNREF(channel, "call");
 }
 
-static void set_status_value_directly(grpc_status_code status, void* dest);
 static void destroy_call(void* call, grpc_error* error) {
   GPR_TIMER_SCOPE("destroy_call", 0);
   size_t i;
@@ -547,16 +494,14 @@ static void destroy_call(void* call, grpc_error* error) {
     GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
   }
 
-  get_final_status(c, set_status_value_directly, &c->final_info.final_status,
-                   nullptr, c->final_info.error_string);
+  grpc_slice slice = grpc_empty_slice();
+  grpc_error_get_status(c->status_error, c->send_deadline,
+                        &c->final_info.final_status, &slice, nullptr,
+                        c->final_info.error_string);
+  GRPC_ERROR_UNREF(c->status_error);
   c->final_info.stats.latency =
       gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), c->start_time);
 
-  for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
-    GRPC_ERROR_UNREF(
-        unpack_received_status(gpr_atm_acq_load(&c->status[i])).error);
-  }
-
   grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c), &c->final_info,
                           GRPC_CLOSURE_INIT(&c->release_call, release_call, c,
                                             grpc_schedule_on_exec_ctx));
@@ -594,7 +539,7 @@ void grpc_call_unref(grpc_call* c) {
   bool cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) != 0 &&
                 gpr_atm_acq_load(&c->received_final_op_atm) == 0;
   if (cancel) {
-    cancel_with_error(c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
+    cancel_with_error(c, GRPC_ERROR_CANCELLED);
   } else {
     // Unset the call combiner cancellation closure.  This has the
     // effect of scheduling the previously set cancellation closure, if
@@ -609,8 +554,7 @@ grpc_call_error grpc_call_cancel(grpc_call* call, void* reserved) {
   GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
   GPR_ASSERT(!reserved);
   grpc_core::ExecCtx exec_ctx;
-  cancel_with_error(call, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED);
-
+  cancel_with_error(call, GRPC_ERROR_CANCELLED);
   return GRPC_CALL_OK;
 }
 
@@ -664,8 +608,7 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call* c,
       "c=%p, status=%d, description=%s, reserved=%p)",
       4, (c, (int)status, description, reserved));
   GPR_ASSERT(reserved == nullptr);
-  cancel_with_status(c, STATUS_FROM_API_OVERRIDE, status, description);
-
+  cancel_with_status(c, status, description);
   return GRPC_CALL_OK;
 }
 
@@ -685,15 +628,18 @@ static void done_termination(void* arg, grpc_error* error) {
   gpr_free(state);
 }
 
-static void cancel_with_error(grpc_call* c, status_source source,
-                              grpc_error* error) {
+static void cancel_with_error(grpc_call* c, grpc_error* error) {
+  if (c->cancelled) {
+    GRPC_ERROR_UNREF(error);
+    return;
+  }
+  c->cancelled = true;
   GRPC_CALL_INTERNAL_REF(c, "termination");
   // Inform the call combiner of the cancellation, so that it can cancel
   // any in-flight asynchronous actions that may be holding the call
   // combiner.  This ensures that the cancel_stream batch can be sent
   // down the filter stack in a timely manner.
   grpc_call_combiner_cancel(&c->call_combiner, GRPC_ERROR_REF(error));
-  set_status_from_error(c, source, GRPC_ERROR_REF(error));
   cancel_state* state = static_cast<cancel_state*>(gpr_malloc(sizeof(*state)));
   state->call = c;
   GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
@@ -716,90 +662,28 @@ static grpc_error* error_from_status(grpc_status_code status,
       GRPC_ERROR_INT_GRPC_STATUS, status);
 }
 
-static void cancel_with_status(grpc_call* c, status_source source,
-                               grpc_status_code status,
+static void cancel_with_status(grpc_call* c, grpc_status_code status,
                                const char* description) {
-  cancel_with_error(c, source, error_from_status(status, description));
+  cancel_with_error(c, error_from_status(status, description));
 }
 
-/*******************************************************************************
- * FINAL STATUS CODE MANIPULATION
- */
-
-static bool get_final_status_from(
-    grpc_call* call, grpc_error* error, bool allow_ok_status,
-    void (*set_value)(grpc_status_code code, void* user_data),
-    void* set_value_user_data, grpc_slice* details, const char** error_string) {
-  grpc_status_code code;
-  grpc_slice slice = grpc_empty_slice();
-  grpc_error_get_status(error, call->send_deadline, &code, &slice, nullptr,
-                        error_string);
-  if (code == GRPC_STATUS_OK && !allow_ok_status) {
-    return false;
-  }
-
-  set_value(code, set_value_user_data);
-  if (details != nullptr) {
-    *details = grpc_slice_ref_internal(slice);
-  }
-  return true;
-}
-
-static void get_final_status(
-    grpc_call* call, void (*set_value)(grpc_status_code code, void* user_data),
-    void* set_value_user_data, grpc_slice* details, const char** error_string) {
-  int i;
-  received_status status[STATUS_SOURCE_COUNT];
-  for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
-    status[i] = unpack_received_status(gpr_atm_acq_load(&call->status[i]));
-  }
+static void set_final_status(grpc_call* call, grpc_error* error) {
   if (grpc_call_error_trace.enabled()) {
-    gpr_log(GPR_INFO, "get_final_status %s", call->is_client ? "CLI" : "SVR");
-    for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
-      if (status[i].is_set) {
-        gpr_log(GPR_INFO, "  %d: %s", i, grpc_error_string(status[i].error));
-      }
-    }
+    gpr_log(GPR_DEBUG, "set_final_status %s", call->is_client ? "CLI" : "SVR");
+    gpr_log(GPR_DEBUG, "%s", grpc_error_string(error));
   }
-  /* first search through ignoring "OK" statuses: if something went wrong,
-   * ensure we report it */
-  for (int allow_ok_status = 0; allow_ok_status < 2; allow_ok_status++) {
-    /* search for the best status we can present: ideally the error we use has a
-       clearly defined grpc-status, and we'll prefer that. */
-    for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
-      if (status[i].is_set &&
-          grpc_error_has_clear_grpc_status(status[i].error)) {
-        if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
-                                  set_value, set_value_user_data, details,
-                                  error_string)) {
-          return;
-        }
-      }
-    }
-    /* If no clearly defined status exists, search for 'anything' */
-    for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
-      if (status[i].is_set) {
-        if (get_final_status_from(call, status[i].error, allow_ok_status != 0,
-                                  set_value, set_value_user_data, details,
-                                  error_string)) {
-          return;
-        }
-      }
-    }
-  }
-  /* If nothing exists, set some default */
   if (call->is_client) {
-    set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
+    const char** error_string = call->final_op.client.error_string;
+    grpc_status_code code;
+    grpc_slice slice = grpc_empty_slice();
+    grpc_error_get_status(error, call->send_deadline, &code, &slice, nullptr,
+                          error_string);
+    *call->final_op.client.status = code;
+    *call->final_op.client.status_details = grpc_slice_ref_internal(slice);
+    call->status_error = error;
   } else {
-    set_value(GRPC_STATUS_OK, set_value_user_data);
-  }
-}
-
-static void set_status_from_error(grpc_call* call, status_source source,
-                                  grpc_error* error) {
-  if (!gpr_atm_rel_cas(&call->status[source],
-                       pack_received_status({false, GRPC_ERROR_NONE}),
-                       pack_received_status({true, error}))) {
+    *call->final_op.server.cancelled =
+        error != GRPC_ERROR_NONE || call->status_error != GRPC_ERROR_NONE;
     GRPC_ERROR_UNREF(error);
   }
 }
@@ -1018,6 +902,7 @@ static grpc_stream_compression_algorithm decode_stream_compression(
 static void publish_app_metadata(grpc_call* call, grpc_metadata_batch* b,
                                  int is_trailing) {
   if (b->list.count == 0) return;
+  if (!call->is_client && is_trailing) return;
   if (is_trailing && call->buffered_metadata[1] == nullptr) return;
   GPR_TIMER_SCOPE("publish_app_metadata", 0);
   grpc_metadata_array* dest;
@@ -1071,9 +956,12 @@ static void recv_initial_filter(grpc_call* call, grpc_metadata_batch* b) {
   publish_app_metadata(call, b, false);
 }
 
-static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
+static void recv_trailing_filter(void* args, grpc_metadata_batch* b,
+                                 grpc_error* batch_error) {
   grpc_call* call = static_cast<grpc_call*>(args);
-  if (b->idx.named.grpc_status != nullptr) {
+  if (batch_error != GRPC_ERROR_NONE) {
+    set_final_status(call, GRPC_ERROR_REF(batch_error));
+  } else if (b->idx.named.grpc_status != nullptr) {
     grpc_status_code status_code =
         grpc_get_status_code_from_metadata(b->idx.named.grpc_status->md);
     grpc_error* error =
@@ -1092,10 +980,16 @@ static void recv_trailing_filter(void* args, grpc_metadata_batch* b) {
       error = grpc_error_set_str(error, GRPC_ERROR_STR_GRPC_MESSAGE,
                                  grpc_empty_slice());
     }
-    set_status_from_error(call, STATUS_FROM_WIRE, error);
+    set_final_status(call, GRPC_ERROR_REF(error));
     grpc_metadata_batch_remove(b, b->idx.named.grpc_status);
+    GRPC_ERROR_UNREF(error);
+  } else {
+    // TODO(kpayson) batch completed successfully w/no error + no status, should
+    // we assert instead?
+    set_final_status(call, GRPC_ERROR_NONE);
   }
   publish_app_metadata(call, b, true);
+  GRPC_ERROR_UNREF(batch_error);
 }
 
 grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
@@ -1106,14 +1000,6 @@ grpc_call_stack* grpc_call_get_call_stack(grpc_call* call) {
  * BATCH API IMPLEMENTATION
  */
 
-static void set_status_value_directly(grpc_status_code status, void* dest) {
-  *static_cast<grpc_status_code*>(dest) = status;
-}
-
-static void set_cancelled_value(grpc_status_code status, void* dest) {
-  *static_cast<int*>(dest) = (status != GRPC_STATUS_OK);
-}
-
 static bool are_write_flags_valid(uint32_t flags) {
   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
   const uint32_t allowed_write_positions =
@@ -1181,31 +1067,15 @@ static void finish_batch_completion(void* user_data,
   GRPC_CALL_INTERNAL_UNREF(call, "completion");
 }
 
-static grpc_error* consolidate_batch_errors(batch_control* bctl) {
-  size_t n = static_cast<size_t>(gpr_atm_acq_load(&bctl->num_errors));
-  if (n == 0) {
-    return GRPC_ERROR_NONE;
-  } else if (n == 1) {
-    /* Skip creating a composite error in the case that only one error was
-       logged */
-    grpc_error* e = bctl->errors[0];
-    bctl->errors[0] = nullptr;
-    return e;
-  } else {
-    grpc_error* error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
-        "Call batch failed", bctl->errors, n);
-    for (size_t i = 0; i < n; i++) {
-      GRPC_ERROR_UNREF(bctl->errors[i]);
-      bctl->errors[i] = nullptr;
-    }
-    return error;
-  }
+static void reset_batch_errors(batch_control* bctl) {
+  GRPC_ERROR_UNREF(bctl->batch_error);
+  bctl->batch_error = GRPC_ERROR_NONE;
 }
 
 static void post_batch_completion(batch_control* bctl) {
   grpc_call* next_child_call;
   grpc_call* call = bctl->call;
-  grpc_error* error = consolidate_batch_errors(bctl);
+  grpc_error* error = GRPC_ERROR_REF(bctl->batch_error);
 
   if (bctl->op.send_initial_metadata) {
     grpc_metadata_batch_destroy(
@@ -1223,7 +1093,7 @@ static void post_batch_completion(batch_control* bctl) {
   if (bctl->op.recv_trailing_metadata) {
     grpc_metadata_batch* md =
         &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
-    recv_trailing_filter(call, md);
+    recv_trailing_filter(call, md, GRPC_ERROR_REF(bctl->batch_error));
 
     /* propagate cancellation to any interested children */
     gpr_atm_rel_store(&call->received_final_op_atm, 1);
@@ -1237,8 +1107,7 @@ static void post_batch_completion(batch_control* bctl) {
           next_child_call = child->child->sibling_next;
           if (child->cancellation_is_inherited) {
             GRPC_CALL_INTERNAL_REF(child, "propagate_cancel");
-            cancel_with_error(child, STATUS_FROM_API_OVERRIDE,
-                              GRPC_ERROR_CANCELLED);
+            cancel_with_error(child, GRPC_ERROR_CANCELLED);
             GRPC_CALL_INTERNAL_UNREF(child, "propagate_cancel");
           }
           child = next_child_call;
@@ -1247,16 +1116,6 @@ static void post_batch_completion(batch_control* bctl) {
       gpr_mu_unlock(&pc->child_list_mu);
     }
 
-    if (call->is_client) {
-      get_final_status(call, set_status_value_directly,
-                       call->final_op.client.status,
-                       call->final_op.client.status_details,
-                       call->final_op.client.error_string);
-    } else {
-      get_final_status(call, set_cancelled_value,
-                       call->final_op.server.cancelled, nullptr, nullptr);
-    }
-
     GRPC_ERROR_UNREF(error);
     error = GRPC_ERROR_NONE;
   }
@@ -1265,9 +1124,10 @@ static void post_batch_completion(batch_control* bctl) {
     grpc_byte_buffer_destroy(*call->receiving_buffer);
     *call->receiving_buffer = nullptr;
   }
+  reset_batch_errors(bctl);
 
   if (bctl->completion_data.notify_tag.is_closure) {
-    /* unrefs bctl->error */
+    /* unrefs error */
     bctl->call = nullptr;
     /* This closure may be meant to be run within some combiner. Since we aren't
      * running in any combiner here, we need to use GRPC_CLOSURE_SCHED instead
@@ -1277,7 +1137,7 @@ static void post_batch_completion(batch_control* bctl) {
                        error);
     GRPC_CALL_INTERNAL_UNREF(call, "completion");
   } else {
-    /* unrefs bctl->error */
+    /* unrefs error */
     grpc_cq_end_op(bctl->call->cq, bctl->completion_data.notify_tag.tag, error,
                    finish_batch_completion, bctl,
                    &bctl->completion_data.cq_completion);
@@ -1386,8 +1246,10 @@ static void receiving_stream_ready(void* bctlp, grpc_error* error) {
   grpc_call* call = bctl->call;
   if (error != GRPC_ERROR_NONE) {
     call->receiving_stream.reset();
-    add_batch_error(bctl, GRPC_ERROR_REF(error), true);
-    cancel_with_error(call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error));
+    if (bctl->batch_error == GRPC_ERROR_NONE) {
+      bctl->batch_error = GRPC_ERROR_REF(error);
+    }
+    cancel_with_error(call, GRPC_ERROR_REF(error));
   }
   /* If recv_state is RECV_NONE, we will save the batch_control
    * object with rel_cas, and will not use it after the cas. Its corresponding
@@ -1423,8 +1285,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
                  call->incoming_stream_compression_algorithm,
                  call->incoming_message_compression_algorithm);
     gpr_log(GPR_ERROR, "%s", error_msg);
-    cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL,
-                       error_msg);
+    cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
     gpr_free(error_msg);
   } else if (
       grpc_compression_algorithm_from_message_stream_compression_algorithm(
@@ -1436,8 +1297,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
                  "compression (%d).",
                  call->incoming_stream_compression_algorithm,
                  call->incoming_message_compression_algorithm);
-    cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_INTERNAL,
-                       error_msg);
+    cancel_with_status(call, GRPC_STATUS_INTERNAL, error_msg);
     gpr_free(error_msg);
   } else {
     char* error_msg = nullptr;
@@ -1447,8 +1307,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
       gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.",
                    compression_algorithm);
       gpr_log(GPR_ERROR, "%s", error_msg);
-      cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
-                         error_msg);
+      cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
     } else if (grpc_compression_options_is_algorithm_enabled(
                    &compression_options, compression_algorithm) == 0) {
       /* check if algorithm is supported by current channel config */
@@ -1457,8 +1316,7 @@ static void validate_filtered_metadata(batch_control* bctl) {
       gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
                    algo_name);
       gpr_log(GPR_ERROR, "%s", error_msg);
-      cancel_with_status(call, STATUS_FROM_SURFACE, GRPC_STATUS_UNIMPLEMENTED,
-                         error_msg);
+      cancel_with_status(call, GRPC_STATUS_UNIMPLEMENTED, error_msg);
     }
     gpr_free(error_msg);
 
@@ -1476,23 +1334,12 @@ static void validate_filtered_metadata(batch_control* bctl) {
   }
 }
 
-static void add_batch_error(batch_control* bctl, grpc_error* error,
-                            bool has_cancelled) {
-  if (error == GRPC_ERROR_NONE) return;
-  int idx = static_cast<int>(gpr_atm_full_fetch_add(&bctl->num_errors, 1));
-  if (idx == 0 && !has_cancelled) {
-    cancel_with_error(bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error));
-  }
-  bctl->errors[idx] = error;
-}
-
 static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
   batch_control* bctl = static_cast<batch_control*>(bctlp);
   grpc_call* call = bctl->call;
 
   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "recv_initial_metadata_ready");
 
-  add_batch_error(bctl, GRPC_ERROR_REF(error), false);
   if (error == GRPC_ERROR_NONE) {
     grpc_metadata_batch* md =
         &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
@@ -1505,6 +1352,11 @@ static void receiving_initial_metadata_ready(void* bctlp, grpc_error* error) {
     if (md->deadline != GRPC_MILLIS_INF_FUTURE && !call->is_client) {
       call->send_deadline = md->deadline;
     }
+  } else {
+    if (bctl->batch_error == GRPC_ERROR_NONE) {
+      bctl->batch_error = GRPC_ERROR_REF(error);
+    }
+    cancel_with_error(call, GRPC_ERROR_REF(error));
   }
 
   grpc_closure* saved_rsr_closure = nullptr;
@@ -1542,7 +1394,12 @@ static void finish_batch(void* bctlp, grpc_error* error) {
   batch_control* bctl = static_cast<batch_control*>(bctlp);
   grpc_call* call = bctl->call;
   GRPC_CALL_COMBINER_STOP(&call->call_combiner, "on_complete");
-  add_batch_error(bctl, GRPC_ERROR_REF(error), false);
+  if (bctl->batch_error == GRPC_ERROR_NONE) {
+    bctl->batch_error = GRPC_ERROR_REF(error);
+  }
+  if (error != GRPC_ERROR_NONE) {
+    cancel_with_error(call, GRPC_ERROR_REF(error));
+  }
   finish_batch_step(bctl);
 }
 
@@ -1740,28 +1597,26 @@ static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
         call->send_extra_metadata_count = 1;
         call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
             call->channel, op->data.send_status_from_server.status);
-        {
-          grpc_error* override_error = GRPC_ERROR_NONE;
-          if (op->data.send_status_from_server.status != GRPC_STATUS_OK) {
-            override_error =
-                error_from_status(op->data.send_status_from_server.status,
-                                  "Returned non-ok status");
-          }
-          if (op->data.send_status_from_server.status_details != nullptr) {
-            call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
-                GRPC_MDSTR_GRPC_MESSAGE,
-                grpc_slice_ref_internal(
-                    *op->data.send_status_from_server.status_details));
-            call->send_extra_metadata_count++;
-            char* msg = grpc_slice_to_c_string(
-                GRPC_MDVALUE(call->send_extra_metadata[1].md));
-            override_error =
-                grpc_error_set_str(override_error, GRPC_ERROR_STR_GRPC_MESSAGE,
-                                   grpc_slice_from_copied_string(msg));
-            gpr_free(msg);
-          }
-          set_status_from_error(call, STATUS_FROM_API_OVERRIDE, override_error);
+        if (op->data.send_status_from_server.status_details != nullptr) {
+          call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
+              GRPC_MDSTR_GRPC_MESSAGE,
+              grpc_slice_ref_internal(
+                  *op->data.send_status_from_server.status_details));
+          call->send_extra_metadata_count++;
+          char* msg = grpc_slice_to_c_string(
+              GRPC_MDVALUE(call->send_extra_metadata[1].md));
+          gpr_free(msg);
         }
+        grpc_error* status_error =
+            op->data.send_status_from_server.status == GRPC_STATUS_OK
+                ? GRPC_ERROR_NONE
+                : grpc_error_set_int(
+                      GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+                          "Server returned error"),
+                      GRPC_ERROR_INT_GRPC_STATUS,
+                      static_cast<intptr_t>(
+                          op->data.send_status_from_server.status));
+        call->status_error = status_error;
         if (!prepare_application_metadata(
                 call,
                 static_cast<int>(