| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168 | 
							- /*
 
-  *
 
-  * Copyright 2014, Google Inc.
 
-  * All rights reserved.
 
-  *
 
-  * Redistribution and use in source and binary forms, with or without
 
-  * modification, are permitted provided that the following conditions are
 
-  * met:
 
-  *
 
-  *     * Redistributions of source code must retain the above copyright
 
-  * notice, this list of conditions and the following disclaimer.
 
-  *     * Redistributions in binary form must reproduce the above
 
-  * copyright notice, this list of conditions and the following disclaimer
 
-  * in the documentation and/or other materials provided with the
 
-  * distribution.
 
-  *     * Neither the name of Google Inc. nor the names of its
 
-  * contributors may be used to endorse or promote products derived from
 
-  * this software without specific prior written permission.
 
-  *
 
-  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
-  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
-  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 
-  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 
-  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 
-  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 
-  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 
-  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 
-  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 
-  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 
-  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
-  *
 
-  */
 
- #include "src/core/surface/call.h"
 
- #include "src/core/channel/channel_stack.h"
 
- #include "src/core/channel/metadata_buffer.h"
 
- #include "src/core/iomgr/alarm.h"
 
- #include "src/core/support/string.h"
 
- #include "src/core/surface/byte_buffer_queue.h"
 
- #include "src/core/surface/channel.h"
 
- #include "src/core/surface/completion_queue.h"
 
- #include <grpc/support/alloc.h>
 
- #include <grpc/support/log.h>
 
- #include <stdio.h>
 
- #include <stdlib.h>
 
- #include <string.h>
 
- #define OP_IN_MASK(op, mask) (((1 << (op)) & (mask)) != 0)
 
- typedef struct {
 
-   gpr_uint8 md_out_buffer;
 
-   size_t md_out_count[2];
 
-   size_t md_out_capacity[2];
 
-   grpc_metadata *md_out[2];
 
-   grpc_byte_buffer *msg_out;
 
-   /* input buffers */
 
-   grpc_metadata_array initial_md_in;
 
-   grpc_metadata_array trailing_md_in;
 
-   grpc_recv_status status_in;
 
-   size_t msg_in_read_idx;
 
-   grpc_byte_buffer *msg_in;
 
-   gpr_uint8 got_status;
 
-   void *finished_tag;
 
- } legacy_state;
 
- typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
 
- typedef enum {
 
-   SEND_NOTHING,
 
-   SEND_INITIAL_METADATA,
 
-   SEND_MESSAGE,
 
-   SEND_TRAILING_METADATA,
 
-   SEND_STATUS,
 
-   SEND_FINISH
 
- } send_action;
 
- typedef struct {
 
-   grpc_ioreq_completion_func on_complete;
 
-   void *user_data;
 
-   grpc_op_error status;
 
- } completed_request;
 
- /* See reqinfo.set below for a description */
 
- #define REQSET_EMPTY 255
 
- #define REQSET_DONE  254
 
- /* The state of an ioreq */
 
- typedef struct reqinfo {
 
-   /* User supplied parameters */
 
-   grpc_ioreq_data data;
 
-   /* In which set is this ioreq?
 
-      This value could be:
 
-        - an element of grpc_ioreq_op enumeration, in which case
 
-          it designates the master ioreq in a set of requests
 
-        - REQSET_EMPTY, in which case this reqinfo type has no application
 
-          request against it
 
-        - REQSET_DONE, in which case this reqinfo has been satisfied for
 
-          all time for this call, and no further use will be made of it */
 
-   gpr_uint8 set;
 
-   grpc_op_error status;
 
-   grpc_ioreq_completion_func on_complete;
 
-   void *user_data;
 
-   gpr_uint32 need_mask;
 
-   gpr_uint32 complete_mask;
 
- } reqinfo;
 
- typedef enum {
 
-   STATUS_FROM_API_OVERRIDE = 0,
 
-   STATUS_FROM_WIRE,
 
-   STATUS_SOURCE_COUNT
 
- } status_source;
 
- typedef struct {
 
-   gpr_uint8 set;
 
-   grpc_status_code code;
 
-   grpc_mdstr *details;
 
- } received_status;
 
- struct grpc_call {
 
-   grpc_completion_queue *cq;
 
-   grpc_channel *channel;
 
-   grpc_mdctx *metadata_context;
 
-   /* TODO(ctiller): share with cq if possible? */
 
-   gpr_mu mu;
 
-   gpr_uint8 is_client;
 
-   gpr_uint8 got_initial_metadata;
 
-   gpr_uint8 have_alarm;
 
-   gpr_uint8 read_closed;
 
-   gpr_uint8 stream_closed;
 
-   gpr_uint8 got_status_code;
 
-   gpr_uint8 sending;
 
-   gpr_uint8 num_completed_requests;
 
-   gpr_uint8 need_more_data;
 
-   reqinfo requests[GRPC_IOREQ_OP_COUNT];
 
-   completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
 
-   grpc_byte_buffer_queue incoming_queue;
 
-   grpc_metadata_array buffered_initial_metadata;
 
-   grpc_metadata_array buffered_trailing_metadata;
 
-   grpc_mdelem **owned_metadata;
 
-   size_t owned_metadata_count;
 
-   size_t owned_metadata_capacity;
 
-   received_status status[STATUS_SOURCE_COUNT];
 
-   grpc_alarm alarm;
 
-   gpr_refcount internal_refcount;
 
-   legacy_state *legacy_state;
 
- };
 
- #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
 
- #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
 
- #define CALL_ELEM_FROM_CALL(call, idx) \
 
-   grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
 
- #define CALL_FROM_TOP_ELEM(top_elem) \
 
-   CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
 
- #define SWAP(type, x, y) \
 
-   do {                   \
 
-     type temp = x;       \
 
-     x = y;               \
 
-     y = temp;            \
 
-   } while (0)
 
- static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
 
- static send_action choose_send_action(grpc_call *call);
 
- static void enact_send_action(grpc_call *call, send_action sa);
 
- grpc_call *grpc_call_create(grpc_channel *channel,
 
-                             const void *server_transport_data) {
 
-   size_t i;
 
-   grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
 
-   grpc_call *call =
 
-       gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
 
-   memset(call, 0, sizeof(grpc_call));
 
-   gpr_mu_init(&call->mu);
 
-   call->channel = channel;
 
-   call->is_client = server_transport_data == NULL;
 
-   for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
 
-     call->requests[i].set = REQSET_EMPTY;
 
-   }
 
-   if (call->is_client) {
 
-     call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set = REQSET_DONE;
 
-   }
 
-   grpc_channel_internal_ref(channel);
 
-   call->metadata_context = grpc_channel_get_metadata_context(channel);
 
-   /* one ref is dropped in response to destroy, the other in
 
-      stream_closed */
 
-   gpr_ref_init(&call->internal_refcount, 2);
 
-   grpc_call_stack_init(channel_stack, server_transport_data,
 
-                        CALL_STACK_FROM_CALL(call));
 
-   return call;
 
- }
 
- legacy_state *get_legacy_state(grpc_call *call) {
 
-   if (call->legacy_state == NULL) {
 
-     call->legacy_state = gpr_malloc(sizeof(legacy_state));
 
-     memset(call->legacy_state, 0, sizeof(legacy_state));
 
-   }
 
-   return call->legacy_state;
 
- }
 
- void grpc_call_internal_ref(grpc_call *c) { gpr_ref(&c->internal_refcount); }
 
- static void destroy_call(void *call, int ignored_success) {
 
-   size_t i, j;
 
-   grpc_call *c = call;
 
-   grpc_call_stack_destroy(CALL_STACK_FROM_CALL(c));
 
-   grpc_channel_internal_unref(c->channel);
 
-   gpr_mu_destroy(&c->mu);
 
-   for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
 
-     if (c->status[i].details) {
 
-       grpc_mdstr_unref(c->status[i].details);
 
-     }
 
-   }
 
-   for (i = 0; i < c->owned_metadata_count; i++) {
 
-     grpc_mdelem_unref(c->owned_metadata[i]);
 
-   }
 
-   gpr_free(c->owned_metadata);
 
-   gpr_free(c->buffered_initial_metadata.metadata);
 
-   gpr_free(c->buffered_trailing_metadata.metadata);
 
-   if (c->legacy_state) {
 
-     for (i = 0; i < 2; i++) {
 
-       for (j = 0; j < c->legacy_state->md_out_count[i]; j++) {
 
-         gpr_free(c->legacy_state->md_out[i][j].key);
 
-         gpr_free(c->legacy_state->md_out[i][j].value);
 
-       }
 
-       gpr_free(c->legacy_state->md_out[i]);
 
-     }
 
-     gpr_free(c->legacy_state->initial_md_in.metadata);
 
-     gpr_free(c->legacy_state->trailing_md_in.metadata);
 
-     gpr_free(c->legacy_state);
 
-   }
 
-   gpr_free(c);
 
- }
 
- void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
 
-   if (gpr_unref(&c->internal_refcount)) {
 
-     if (allow_immediate_deletion) {
 
-       destroy_call(c, 1);
 
-     } else {
 
-       grpc_iomgr_add_callback(destroy_call, c);
 
-     }
 
-   }
 
- }
 
- static void set_status_code(grpc_call *call, status_source source,
 
-                             gpr_uint32 status) {
 
-   call->status[source].set = 1;
 
-   call->status[source].code = status;
 
- }
 
- static void set_status_details(grpc_call *call, status_source source,
 
-                                grpc_mdstr *status) {
 
-   if (call->status[source].details != NULL) {
 
-     grpc_mdstr_unref(call->status[source].details);
 
-   }
 
-   call->status[source].details = status;
 
- }
 
- static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) {
 
-   if (call->cq) return GRPC_CALL_ERROR_ALREADY_INVOKED;
 
-   call->cq = cq;
 
-   return GRPC_CALL_OK;
 
- }
 
- static void request_more_data(grpc_call *call) {
 
-   grpc_call_op op;
 
-   /* call down */
 
-   op.type = GRPC_REQUEST_DATA;
 
-   op.dir = GRPC_CALL_DOWN;
 
-   op.flags = 0;
 
-   op.done_cb = do_nothing;
 
-   op.user_data = NULL;
 
-   grpc_call_execute_op(call, &op);
 
- }
 
- static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
 
- static void unlock(grpc_call *call) {
 
-   send_action sa = SEND_NOTHING;
 
-   completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
 
-   int num_completed_requests = call->num_completed_requests;
 
-   int need_more_data =
 
-       call->need_more_data &&
 
-       call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set == REQSET_DONE;
 
-   int i;
 
-   if (need_more_data) {
 
-     call->need_more_data = 0;
 
-   }
 
-   if (num_completed_requests != 0) {
 
-     memcpy(completed_requests, call->completed_requests,
 
-            sizeof(completed_requests));
 
-     call->num_completed_requests = 0;
 
-   }
 
-   if (!call->sending) {
 
-     sa = choose_send_action(call);
 
-     if (sa != SEND_NOTHING) {
 
-       call->sending = 1;
 
-       grpc_call_internal_ref(call);
 
-     }
 
-   }
 
-   gpr_mu_unlock(&call->mu);
 
-   if (need_more_data) {
 
-     request_more_data(call);
 
-   }
 
-   if (sa != SEND_NOTHING) {
 
-     enact_send_action(call, sa);
 
-   }
 
-   for (i = 0; i < num_completed_requests; i++) {
 
-     completed_requests[i].on_complete(call, completed_requests[i].status,
 
-                                       completed_requests[i].user_data);
 
-   }
 
- }
 
- static void get_final_status(grpc_call *call, grpc_recv_status_args args) {
 
-   int i;
 
-   for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
 
-     if (call->status[i].set) {
 
-       *args.code = call->status[i].code;
 
-       if (call->status[i].details) {
 
-         gpr_slice details = call->status[i].details->slice;
 
-         size_t len = GPR_SLICE_LENGTH(details);
 
-         if (len + 1 > *args.details_capacity) {
 
-           *args.details_capacity = GPR_MAX(len + 1, *args.details_capacity * 3 / 2);
 
-           *args.details = gpr_realloc(*args.details, *args.details_capacity);
 
-         }
 
-         memcpy(*args.details, GPR_SLICE_START_PTR(details), len);
 
-         (*args.details)[len] = 0;
 
-       } else {
 
-         goto no_details;
 
-       }
 
-       return;
 
-     }
 
-   }
 
-   *args.code = GRPC_STATUS_UNKNOWN;
 
- no_details:
 
-   if (0 == *args.details_capacity) {
 
-     *args.details_capacity = 8;
 
-     *args.details = gpr_malloc(*args.details_capacity);
 
-   }
 
-   **args.details = 0;
 
- }
 
- static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
 
-                             grpc_op_error status) {
 
-   completed_request *cr;
 
-   size_t i;
 
-   if (call->requests[op].set < GRPC_IOREQ_OP_COUNT) {
 
-     reqinfo *master = &call->requests[call->requests[op].set];
 
-     /* ioreq is live: we need to do something */
 
-     master->complete_mask |= 1 << op;
 
-     call->requests[op].set =
 
-         (op == GRPC_IOREQ_SEND_MESSAGE || op == GRPC_IOREQ_RECV_MESSAGE)
 
-             ? REQSET_EMPTY
 
-             : REQSET_DONE;
 
-     if (master->complete_mask == master->need_mask ||
 
-         status == GRPC_OP_ERROR) {
 
-       if (OP_IN_MASK(GRPC_IOREQ_RECV_STATUS, master->need_mask)) {
 
-         get_final_status(
 
-             call,
 
-             call->requests[GRPC_IOREQ_RECV_STATUS].data.recv_status);
 
-       }
 
-       for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
 
-         if (call->requests[i].set == op) {
 
-           if (call->requests[i].status != GRPC_OP_OK) {
 
-             status = GRPC_OP_ERROR;
 
-           }
 
-           call->requests[i].set = REQSET_EMPTY;
 
-         }
 
-       }
 
-       cr = &call->completed_requests[call->num_completed_requests++];
 
-       cr->status = status;
 
-       cr->on_complete = master->on_complete;
 
-       cr->user_data = master->user_data;
 
-     }
 
-   }
 
- }
 
- static void finish_send_op(grpc_call *call, grpc_ioreq_op op, grpc_op_error error) {
 
-   lock(call);
 
-   finish_ioreq_op(call, op, error);
 
-   call->sending = 0;
 
-   unlock(call);
 
-   grpc_call_internal_unref(call, 0);
 
- }
 
- static void finish_write_step(void *pc, grpc_op_error error) {
 
-   finish_send_op(pc, GRPC_IOREQ_SEND_MESSAGE, error);
 
- }
 
- static void finish_finish_step(void *pc, grpc_op_error error) {
 
-   finish_send_op(pc, GRPC_IOREQ_SEND_CLOSE, error);
 
- }
 
- static void finish_start_step(void *pc, grpc_op_error error) {
 
-   finish_send_op(pc, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
 
- }
 
- static send_action choose_send_action(grpc_call *call) {
 
-   switch (call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].set) {
 
-     case REQSET_EMPTY:
 
-       return SEND_NOTHING;
 
-     default:
 
-       return SEND_INITIAL_METADATA;
 
-     case REQSET_DONE:
 
-       break;
 
-   }
 
-   switch (call->requests[GRPC_IOREQ_SEND_MESSAGE].set) {
 
-     case REQSET_EMPTY:
 
-       return SEND_NOTHING;
 
-     default:
 
-       return SEND_MESSAGE;
 
-     case REQSET_DONE:
 
-       break;
 
-   }
 
-   switch (call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].set) {
 
-     case REQSET_EMPTY:
 
-       return SEND_NOTHING;
 
-     default:
 
-       finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_OK);
 
-       return SEND_TRAILING_METADATA;
 
-     case REQSET_DONE:
 
-       break;
 
-   }
 
-   switch (call->requests[GRPC_IOREQ_SEND_STATUS].set) {
 
-     case REQSET_EMPTY:
 
-       return SEND_NOTHING;
 
-     default:
 
-       finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_OK);
 
-       return SEND_STATUS;
 
-     case REQSET_DONE:
 
-       break;
 
-   }
 
-   switch (call->requests[GRPC_IOREQ_SEND_CLOSE].set) {
 
-     case REQSET_EMPTY:
 
-     case REQSET_DONE:
 
-       return SEND_NOTHING;
 
-     default:
 
-       return SEND_FINISH;
 
-   }
 
- }
 
- static void send_metadata(grpc_call *call, grpc_mdelem *elem) {
 
-   grpc_call_op op;
 
-   op.type = GRPC_SEND_METADATA;
 
-   op.dir = GRPC_CALL_DOWN;
 
-   op.flags = 0;
 
-   op.data.metadata = elem;
 
-   op.done_cb = do_nothing;
 
-   op.user_data = NULL;
 
-   grpc_call_execute_op(call, &op);
 
- }
 
- static void enact_send_action(grpc_call *call, send_action sa) {
 
-   grpc_ioreq_data data;
 
-   grpc_call_op op;
 
-   size_t i;
 
-   char status_str[GPR_LTOA_MIN_BUFSIZE];
 
-   switch (sa) {
 
-     case SEND_NOTHING:
 
-       abort();
 
-       break;
 
-     case SEND_INITIAL_METADATA:
 
-       data = call->requests[GRPC_IOREQ_SEND_INITIAL_METADATA].data;
 
-       for (i = 0; i < data.send_metadata.count; i++) {
 
-         const grpc_metadata *md = &data.send_metadata.metadata[i];
 
-         send_metadata(call,
 
-                       grpc_mdelem_from_string_and_buffer(
 
-                           call->metadata_context, md->key,
 
-                           (const gpr_uint8 *)md->value, md->value_length));
 
-       }
 
-       op.type = GRPC_SEND_START;
 
-       op.dir = GRPC_CALL_DOWN;
 
-       op.flags = 0;
 
-       op.data.start.pollset = grpc_cq_pollset(call->cq);
 
-       op.done_cb = finish_start_step;
 
-       op.user_data = call;
 
-       grpc_call_execute_op(call, &op);
 
-       break;
 
-     case SEND_MESSAGE:
 
-       data = call->requests[GRPC_IOREQ_SEND_MESSAGE].data;
 
-       op.type = GRPC_SEND_MESSAGE;
 
-       op.dir = GRPC_CALL_DOWN;
 
-       op.flags = 0;
 
-       op.data.message = data.send_message;
 
-       op.done_cb = finish_write_step;
 
-       op.user_data = call;
 
-       grpc_call_execute_op(call, &op);
 
-       break;
 
-     case SEND_TRAILING_METADATA:
 
-       data = call->requests[GRPC_IOREQ_SEND_TRAILING_METADATA].data;
 
-       for (i = 0; i < data.send_metadata.count; i++) {
 
-         const grpc_metadata *md = &data.send_metadata.metadata[i];
 
-         send_metadata(call,
 
-                       grpc_mdelem_from_string_and_buffer(
 
-                           call->metadata_context, md->key,
 
-                           (const gpr_uint8 *)md->value, md->value_length));
 
-       }
 
-       lock(call);
 
-       call->sending = 0;
 
-       unlock(call);
 
-       grpc_call_internal_unref(call, 0);
 
-       break;
 
-     case SEND_STATUS:
 
-       /* TODO(ctiller): cache common status values */
 
-       data = call->requests[GRPC_IOREQ_SEND_CLOSE].data;
 
-       gpr_ltoa(data.send_status.code, status_str);
 
-       send_metadata(
 
-           call,
 
-           grpc_mdelem_from_metadata_strings(
 
-               call->metadata_context,
 
-               grpc_mdstr_ref(grpc_channel_get_status_string(call->channel)),
 
-               grpc_mdstr_from_string(call->metadata_context, status_str)));
 
-       if (data.send_status.details) {
 
-         send_metadata(call,
 
-                       grpc_mdelem_from_metadata_strings(
 
-                           call->metadata_context,
 
-                           grpc_mdstr_ref(
 
-                               grpc_channel_get_message_string(call->channel)),
 
-                           grpc_mdstr_from_string(call->metadata_context,
 
-                                                  data.send_status.details)));
 
-       }
 
-       break;
 
-     case SEND_FINISH:
 
-       op.type = GRPC_SEND_FINISH;
 
-       op.dir = GRPC_CALL_DOWN;
 
-       op.flags = 0;
 
-       op.done_cb = finish_finish_step;
 
-       op.user_data = call;
 
-       grpc_call_execute_op(call, &op);
 
-       break;
 
-   }
 
- }
 
- static grpc_call_error start_ioreq_error(grpc_call *call,
 
-                                          gpr_uint32 mutated_ops,
 
-                                          grpc_call_error ret) {
 
-   size_t i;
 
-   for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
 
-     if (mutated_ops & (1 << i)) {
 
-       call->requests[i].set = REQSET_EMPTY;
 
-     }
 
-   }
 
-   return ret;
 
- }
 
- static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
 
-                                    size_t nreqs,
 
-                                    grpc_ioreq_completion_func completion,
 
-                                    void *user_data) {
 
-   size_t i;
 
-   gpr_uint32 have_ops = 0;
 
-   grpc_ioreq_op op;
 
-   reqinfo *requests = call->requests;
 
-   reqinfo *master;
 
-   grpc_ioreq_data data;
 
-   gpr_uint8 set;
 
-   if (nreqs == 0) {
 
-     return GRPC_CALL_OK;
 
-   }
 
-   set = reqs[0].op;
 
-   master = &requests[set];
 
-   for (i = 0; i < nreqs; i++) {
 
-     op = reqs[i].op;
 
-     if (requests[op].set < GRPC_IOREQ_OP_COUNT) {
 
-       return start_ioreq_error(call, have_ops,
 
-                                GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
 
-     } else if (requests[op].set == REQSET_DONE) {
 
-       return start_ioreq_error(call, have_ops,
 
-                                GRPC_CALL_ERROR_ALREADY_INVOKED);
 
-     }
 
-     have_ops |= 1 << op;
 
-     data = reqs[i].data;
 
-     requests[op].data = data;
 
-     requests[op].set = set;
 
-   }
 
-   GPR_ASSERT(master != NULL);
 
-   master->need_mask = have_ops;
 
-   master->complete_mask = 0;
 
-   master->on_complete = completion;
 
-   master->user_data = user_data;
 
-   for (i = 0; i < nreqs; i++) {
 
-     op = reqs[i].op;
 
-     data = reqs[i].data;
 
-     switch (op) {
 
-       default:
 
-         break;
 
-       case GRPC_IOREQ_RECV_MESSAGE:
 
-         *data.recv_message = grpc_bbq_pop(&call->incoming_queue);
 
-         if (*data.recv_message) {
 
-           finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
 
-         } else {
 
-           call->need_more_data = 1;
 
-         }
 
-         if (call->stream_closed) {
 
-           finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
 
-         }
 
-         break;
 
-       case GRPC_IOREQ_RECV_STATUS:
 
-         if (call->stream_closed) {
 
-           finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
 
-         }
 
-         break;
 
-       case GRPC_IOREQ_SEND_MESSAGE:
 
-         if (call->stream_closed) {
 
-           finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, GRPC_OP_ERROR);
 
-         }
 
-         break;
 
-       case GRPC_IOREQ_SEND_CLOSE:
 
-         if (requests[GRPC_IOREQ_SEND_MESSAGE].set == REQSET_EMPTY) {
 
-           requests[GRPC_IOREQ_SEND_MESSAGE].set = REQSET_DONE;
 
-         }
 
-         break;
 
-       case GRPC_IOREQ_SEND_INITIAL_METADATA:
 
-         if (call->stream_closed) {
 
-           finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA,
 
-                           GRPC_OP_ERROR);
 
-         }
 
-         break;
 
-       case GRPC_IOREQ_RECV_INITIAL_METADATA:
 
-         data.recv_metadata->count = 0;
 
-         if (call->buffered_initial_metadata.count > 0) {
 
-           SWAP(grpc_metadata_array, *data.recv_metadata,
 
-                call->buffered_initial_metadata);
 
-         }
 
-         if (call->got_initial_metadata) {
 
-           finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
 
-         } else if (call->stream_closed) {
 
-           finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA,
 
-                           GRPC_OP_ERROR);
 
-         }
 
-         break;
 
-       case GRPC_IOREQ_RECV_TRAILING_METADATA:
 
-         data.recv_metadata->count = 0;
 
-         if (call->buffered_trailing_metadata.count > 0) {
 
-           SWAP(grpc_metadata_array, *data.recv_metadata,
 
-                call->buffered_trailing_metadata);
 
-         }
 
-         if (call->read_closed) {
 
-           finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
 
-         }
 
-         break;
 
-     }
 
-   }
 
-   return GRPC_CALL_OK;
 
- }
 
- static void call_start_ioreq_done(grpc_call *call, grpc_op_error status,
 
-                                   void *user_data) {
 
-   grpc_cq_end_ioreq(call->cq, user_data, call, do_nothing, NULL, status);
 
- }
 
- grpc_call_error grpc_call_start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
 
-                                       size_t nreqs, void *tag) {
 
-   grpc_call_error err;
 
-   lock(call);
 
-   err = start_ioreq(call, reqs, nreqs, call_start_ioreq_done, tag);
 
-   unlock(call);
 
-   return err;
 
- }
 
- grpc_call_error grpc_call_start_ioreq_and_call_back(
 
-     grpc_call *call, const grpc_ioreq *reqs, size_t nreqs,
 
-     grpc_ioreq_completion_func on_complete, void *user_data) {
 
-   grpc_call_error err;
 
-   lock(call);
 
-   err = start_ioreq(call, reqs, nreqs, on_complete, user_data);
 
-   unlock(call);
 
-   return err;
 
- }
 
- void grpc_call_destroy(grpc_call *c) {
 
-   int cancel;
 
-   lock(c);
 
-   if (c->have_alarm) {
 
-     grpc_alarm_cancel(&c->alarm);
 
-     c->have_alarm = 0;
 
-   }
 
-   cancel = !c->stream_closed;
 
-   unlock(c);
 
-   if (cancel) grpc_call_cancel(c);
 
-   grpc_call_internal_unref(c, 1);
 
- }
 
- grpc_call_error grpc_call_cancel(grpc_call *c) {
 
-   grpc_call_element *elem;
 
-   grpc_call_op op;
 
-   op.type = GRPC_CANCEL_OP;
 
-   op.dir = GRPC_CALL_DOWN;
 
-   op.flags = 0;
 
-   op.done_cb = do_nothing;
 
-   op.user_data = NULL;
 
-   elem = CALL_ELEM_FROM_CALL(c, 0);
 
-   elem->filter->call_op(elem, NULL, &op);
 
-   return GRPC_CALL_OK;
 
- }
 
- grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
 
-                                              grpc_status_code status,
 
-                                              const char *description) {
 
-   grpc_mdstr *details =
 
-       description ? grpc_mdstr_from_string(c->metadata_context, description)
 
-                   : NULL;
 
-   lock(c);
 
-   set_status_code(c, STATUS_FROM_API_OVERRIDE, status);
 
-   set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
 
-   unlock(c);
 
-   return grpc_call_cancel(c);
 
- }
 
- void grpc_call_execute_op(grpc_call *call, grpc_call_op *op) {
 
-   grpc_call_element *elem;
 
-   GPR_ASSERT(op->dir == GRPC_CALL_DOWN);
 
-   elem = CALL_ELEM_FROM_CALL(call, 0);
 
-   elem->filter->call_op(elem, NULL, op);
 
- }
 
- grpc_call_error grpc_call_add_metadata(grpc_call *call, grpc_metadata *metadata,
 
-                                        gpr_uint32 flags) {
 
-   legacy_state *ls;
 
-   grpc_metadata *mdout;
 
-   lock(call);
 
-   ls = get_legacy_state(call);
 
-   if (ls->md_out_count[ls->md_out_buffer] == ls->md_out_capacity[ls->md_out_buffer]) {
 
-     ls->md_out_capacity[ls->md_out_buffer] =
 
-         GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2, ls->md_out_capacity[ls->md_out_buffer] + 8);
 
-     ls->md_out[ls->md_out_buffer] =
 
-         gpr_realloc(ls->md_out[ls->md_out_buffer], sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]);
 
-   }
 
-   mdout = &ls->md_out[ls->md_out_buffer][ls->md_out_count[ls->md_out_buffer]++];
 
-   mdout->key = gpr_strdup(metadata->key);
 
-   mdout->value = gpr_malloc(metadata->value_length);
 
-   mdout->value_length = metadata->value_length;
 
-   memcpy(mdout->value, metadata->value, metadata->value_length);
 
-   unlock(call);
 
-   return GRPC_CALL_OK;
 
- }
 
- static void maybe_finish_legacy(grpc_call *call) {
 
-   legacy_state *ls = get_legacy_state(call);
 
-   if (ls->got_status && ls->msg_in_read_idx == ls->msg_in.count) {
 
-     grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL,
 
-                          ls->status_in.status, ls->status_in.details,
 
-                          ls->trailing_md_in.metadata, ls->trailing_md_in.count);
 
-   }
 
- }
 
- static void finish_status(grpc_call *call, grpc_op_error status,
 
-                           void *ignored) {
 
-   legacy_state *ls;
 
-   lock(call);
 
-   ls = get_legacy_state(call);
 
-   ls->got_status = 1;
 
-   maybe_finish_legacy(call);
 
-   unlock(call);
 
- }
 
- static void finish_recv_metadata(grpc_call *call, grpc_op_error status,
 
-                                  void *tag) {
 
-   legacy_state *ls;
 
-   lock(call);
 
-   ls = get_legacy_state(call);
 
-   if (status == GRPC_OP_OK) {
 
-     grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL,
 
-                                      ls->initial_md_in.count, ls->initial_md_in.metadata);
 
-   } else {
 
-     grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0,
 
-                                      NULL);
 
-   }
 
-   unlock(call);
 
- }
 
- static void finish_send_metadata(grpc_call *call, grpc_op_error status,
 
-                                  void *tag) {}
 
- grpc_call_error grpc_call_invoke(grpc_call *call, grpc_completion_queue *cq,
 
-                                  void *metadata_read_tag, void *finished_tag,
 
-                                  gpr_uint32 flags) {
 
-   grpc_ioreq reqs[2];
 
-   legacy_state *ls;
 
-   grpc_call_error err;
 
-   grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
 
-   grpc_cq_begin_op(cq, call, GRPC_FINISHED);
 
-   lock(call);
 
-   ls = get_legacy_state(call);
 
-   err = bind_cq(call, cq);
 
-   if (err != GRPC_CALL_OK) goto done;
 
-   ls->finished_tag = finished_tag;
 
-   reqs[0].op = GRPC_IOREQ_SEND_INITIAL_METADATA;
 
-   reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
 
-   reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
 
-   ls->md_out_buffer++;
 
-   err = start_ioreq(call, reqs, 1, finish_send_metadata, NULL);
 
-   if (err != GRPC_CALL_OK) goto done;
 
-   reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA;
 
-   reqs[0].data.recv_metadata = &ls->initial_md_in;
 
-   err = start_ioreq(call, reqs, 1, finish_recv_metadata, metadata_read_tag);
 
-   if (err != GRPC_CALL_OK) goto done;
 
-   reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA;
 
-   reqs[0].data.recv_metadata = &ls->trailing_md_in;
 
-   reqs[1].op = GRPC_IOREQ_RECV_STATUS;
 
-   reqs[1].data.recv_status = &ls->status_in;
 
-   err = start_ioreq(call, reqs, 2, finish_status, NULL);
 
-   if (err != GRPC_CALL_OK) goto done;
 
- done:
 
-   unlock(call);
 
-   return err;
 
- }
 
- grpc_call_error grpc_call_server_accept(grpc_call *call,
 
-                                         grpc_completion_queue *cq,
 
-                                         void *finished_tag) {
 
-   grpc_ioreq req;
 
-   grpc_call_error err;
 
-   /* inform the completion queue of an incoming operation (corresponding to
 
-      finished_tag) */
 
-   grpc_cq_begin_op(cq, call, GRPC_FINISHED);
 
-   lock(call);
 
-   err = bind_cq(call, cq);
 
-   if (err != GRPC_CALL_OK) return err;
 
-   get_legacy_state(call)->finished_tag = finished_tag;
 
-   req.op = GRPC_IOREQ_RECV_STATUS;
 
-   req.data.recv_status = &get_legacy_state(call)->status_in;
 
-   err = start_ioreq(call, &req, 1, finish_status, NULL);
 
-   unlock(call);
 
-   return err;
 
- }
 
- static void finish_send_initial_metadata(grpc_call *call, grpc_op_error status,
 
-                                          void *tag) {}
 
- grpc_call_error grpc_call_server_end_initial_metadata(grpc_call *call,
 
-                                                       gpr_uint32 flags) {
 
-   grpc_ioreq req;
 
-   grpc_call_error err;
 
-   legacy_state *ls;
 
-   lock(call);
 
-   ls = get_legacy_state(call);
 
-   req.op = GRPC_IOREQ_SEND_INITIAL_METADATA;
 
-   req.data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
 
-   req.data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
 
-   err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL);
 
-   unlock(call);
 
-   return err;
 
- }
 
- void grpc_call_initial_metadata_complete(grpc_call_element *surface_element) {
 
-   grpc_call *call = grpc_call_from_top_element(surface_element);
 
-   lock(call);
 
-   call->got_initial_metadata = 1;
 
-   finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
 
-   unlock(call);
 
- }
 
- static void finish_read_event(void *p, grpc_op_error error) {
 
-   grpc_byte_buffer_destroy(p);
 
- }
 
- static void finish_read(grpc_call *call, grpc_op_error error, void *tag) {
 
-   legacy_state *ls;
 
-   lock(call);
 
-   ls = get_legacy_state(call);
 
-   if (ls->msg_in.count == 0) {
 
-     grpc_cq_end_read(call->cq, tag, call, do_nothing, NULL, NULL);
 
-   } else {
 
-     grpc_byte_buffer *msg = ls->msg_in.buffers[ls->msg_in_read_idx++];
 
-     grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg);
 
-     maybe_finish_legacy(call);
 
-   }
 
-   unlock(call);
 
- }
 
- grpc_call_error grpc_call_start_read(grpc_call *call, void *tag) {
 
-   legacy_state *ls;
 
-   grpc_ioreq req;
 
-   grpc_call_error err;
 
-   grpc_byte_buffer *msg;
 
-   grpc_cq_begin_op(call->cq, call, GRPC_READ);
 
-   lock(call);
 
-   ls = get_legacy_state(call);
 
-   if (ls->msg_in_read_idx == ls->msg_in.count) {
 
-     ls->msg_in_read_idx = 0;
 
-     req.op = GRPC_IOREQ_RECV_MESSAGES;
 
-     req.data.recv_messages = &ls->msg_in;
 
-     err = start_ioreq(call, &req, 1, finish_read, tag);
 
-   } else {
 
-     err = GRPC_CALL_OK;
 
-     msg = ls->msg_in.buffers[ls->msg_in_read_idx++];
 
-     grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg);
 
-     maybe_finish_legacy(call);
 
-   }
 
-   unlock(call);
 
-   return err;
 
- }
 
- static void finish_write(grpc_call *call, grpc_op_error status, void *tag) {
 
-   lock(call);
 
-   grpc_byte_buffer_destroy(get_legacy_state(call)->msg_out);
 
-   unlock(call);
 
-   grpc_cq_end_write_accepted(call->cq, tag, call, do_nothing, NULL, status);
 
- }
 
- grpc_call_error grpc_call_start_write(grpc_call *call,
 
-                                       grpc_byte_buffer *byte_buffer, void *tag,
 
-                                       gpr_uint32 flags) {
 
-   grpc_ioreq req;
 
-   legacy_state *ls;
 
-   grpc_call_error err;
 
-   grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
 
-   lock(call);
 
-   ls = get_legacy_state(call);
 
-   ls->msg_out = grpc_byte_buffer_copy(byte_buffer);
 
-   req.op = GRPC_IOREQ_SEND_MESSAGES;
 
-   req.data.send_messages.count = 1;
 
-   req.data.send_messages.messages = &ls->msg_out;
 
-   err = start_ioreq(call, &req, 1, finish_write, tag);
 
-   unlock(call);
 
-   return err;
 
- }
 
- static void finish_finish(grpc_call *call, grpc_op_error status, void *tag) {
 
-   grpc_cq_end_finish_accepted(call->cq, tag, call, do_nothing, NULL, status);
 
- }
 
- grpc_call_error grpc_call_writes_done(grpc_call *call, void *tag) {
 
-   grpc_ioreq req;
 
-   grpc_call_error err;
 
-   grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
 
-   lock(call);
 
-   req.op = GRPC_IOREQ_SEND_CLOSE;
 
-   err = start_ioreq(call, &req, 1, finish_finish, tag);
 
-   unlock(call);
 
-   return err;
 
- }
 
- grpc_call_error grpc_call_start_write_status(grpc_call *call,
 
-                                              grpc_status_code status,
 
-                                              const char *details, void *tag) {
 
-   grpc_ioreq reqs[2];
 
-   grpc_call_error err;
 
-   legacy_state *ls;
 
-   grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
 
-   lock(call);
 
-   ls = get_legacy_state(call);
 
-   reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA;
 
-   reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
 
-   reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
 
-   reqs[1].op = GRPC_IOREQ_SEND_CLOSE;
 
-   reqs[1].data.send_close.status = status;
 
-   reqs[1].data.send_close.details = details;
 
-   err = start_ioreq(call, reqs, 2, finish_finish, tag);
 
-   unlock(call);
 
-   return err;
 
- }
 
- grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
 
-   return CALL_FROM_TOP_ELEM(elem);
 
- }
 
- static void call_alarm(void *arg, int success) {
 
-   grpc_call *call = arg;
 
-   if (success) {
 
-     if (call->is_client) {
 
-       grpc_call_cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
 
-                                    "Deadline Exceeded");
 
-     } else {
 
-       grpc_call_cancel(call);
 
-     }
 
-   }
 
-   grpc_call_internal_unref(call, 1);
 
- }
 
- void grpc_call_set_deadline(grpc_call_element *elem, gpr_timespec deadline) {
 
-   grpc_call *call = CALL_FROM_TOP_ELEM(elem);
 
-   if (call->have_alarm) {
 
-     gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
 
-   }
 
-   grpc_call_internal_ref(call);
 
-   call->have_alarm = 1;
 
-   grpc_alarm_init(&call->alarm, deadline, call_alarm, call, gpr_now());
 
- }
 
- void grpc_call_read_closed(grpc_call_element *elem) {
 
-   grpc_call *call = CALL_FROM_TOP_ELEM(elem);
 
-   lock(call);
 
-   GPR_ASSERT(!call->read_closed);
 
-   call->read_closed = 1;
 
-   finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
 
-   finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
 
-   finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
 
-   unlock(call);
 
- }
 
- void grpc_call_stream_closed(grpc_call_element *elem) {
 
-   grpc_call *call = CALL_FROM_TOP_ELEM(elem);
 
-   lock(call);
 
-   GPR_ASSERT(!call->stream_closed);
 
-   if (!call->read_closed) {
 
-     call->read_closed = 1;
 
-     finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
 
-     finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
 
-     finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
 
-   }
 
-   call->stream_closed = 1;
 
-   if (call->buffered_messages.count == 0) {
 
-     finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
 
-   }
 
-   unlock(call);
 
-   grpc_call_internal_unref(call, 0);
 
- }
 
- /* we offset status by a small amount when storing it into transport metadata
 
-    as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
 
-    */
 
- #define STATUS_OFFSET 1
 
- static void destroy_status(void *ignored) {}
 
- static gpr_uint32 decode_status(grpc_mdelem *md) {
 
-   gpr_uint32 status;
 
-   void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
 
-   if (user_data) {
 
-     status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET;
 
-   } else {
 
-     if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
 
-                                    GPR_SLICE_LENGTH(md->value->slice),
 
-                                    &status)) {
 
-       status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
 
-     }
 
-     grpc_mdelem_set_user_data(md, destroy_status,
 
-                               (void *)(gpr_intptr)(status + STATUS_OFFSET));
 
-   }
 
-   return status;
 
- }
 
- void grpc_call_recv_message(grpc_call_element *elem,
 
-                             grpc_byte_buffer *byte_buffer) {
 
-   grpc_call *call = CALL_FROM_TOP_ELEM(elem);
 
-   grpc_byte_buffer_array *dest;
 
-   lock(call);
 
-   if (call->requests[GRPC_IOREQ_RECV_MESSAGE].master != NULL) {
 
-     if (call->requests[GRPC_IOREQ_RECV_MESSAGE].state != REQ_READY) {
 
-       call->requests[GRPC_IOREQ_RECV_MESSAGE].status = GRPC_OP_ERROR;
 
-     } else {
 
-       *call->requests[GRPC_IOREQ_RECV_MESSAGE].data.recv_message = byte_buffer;
 
-       finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
 
-     }
 
-   } else {
 
-     dest = &call->buffered_messages;
 
-   }
 
-   if (dest->count == dest->capacity) {
 
-     dest->capacity = GPR_MAX(dest->capacity + 1, dest->capacity * 3 / 2);
 
-     dest->buffers =
 
-         gpr_realloc(dest->buffers, sizeof(grpc_byte_buffer *) * dest->capacity);
 
-   }
 
-   dest->buffers[dest->count++] = byte_buffer;
 
-   finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGES, GRPC_OP_OK);
 
-   unlock(call);
 
- }
 
- void grpc_call_recv_metadata(grpc_call_element *elem, grpc_mdelem *md) {
 
-   grpc_call *call = CALL_FROM_TOP_ELEM(elem);
 
-   grpc_mdstr *key = md->key;
 
-   grpc_metadata_array *dest;
 
-   grpc_metadata *mdusr;
 
-   lock(call);
 
-   if (key == grpc_channel_get_status_string(call->channel)) {
 
-     set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
 
-     grpc_mdelem_unref(md);
 
-   } else if (key == grpc_channel_get_message_string(call->channel)) {
 
-     set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
 
-     grpc_mdelem_unref(md);
 
-   } else {
 
-     if (!call->got_initial_metadata) {
 
-       dest = call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA].state == REQ_READY
 
-                  ? call->requests[GRPC_IOREQ_RECV_INITIAL_METADATA]
 
-                        .data.recv_metadata
 
-                  : &call->buffered_initial_metadata;
 
-     } else {
 
-       dest =
 
-           call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA].state == REQ_READY
 
-               ? call->requests[GRPC_IOREQ_RECV_TRAILING_METADATA]
 
-                     .data.recv_metadata
 
-               : &call->buffered_trailing_metadata;
 
-     }
 
-     if (dest->count == dest->capacity) {
 
-       dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
 
-       dest->metadata =
 
-           gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
 
-     }
 
-     mdusr = &dest->metadata[dest->count++];
 
-     mdusr->key = (char *)grpc_mdstr_as_c_string(md->key);
 
-     mdusr->value = (char *)grpc_mdstr_as_c_string(md->value);
 
-     mdusr->value_length = GPR_SLICE_LENGTH(md->value->slice);
 
-     if (call->owned_metadata_count == call->owned_metadata_capacity) {
 
-       call->owned_metadata_capacity = GPR_MAX(
 
-           call->owned_metadata_capacity + 8, call->owned_metadata_capacity * 2);
 
-       call->owned_metadata =
 
-           gpr_realloc(call->owned_metadata,
 
-                       sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
 
-     }
 
-     call->owned_metadata[call->owned_metadata_count++] = md;
 
-   }
 
-   unlock(call);
 
- }
 
- grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
 
-   return CALL_STACK_FROM_CALL(call);
 
- }
 
 
  |