| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587 | /* * * Copyright 2015-2016 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */#include <grpc/support/port_platform.h>#include "src/core/lib/surface/server.h"#include <limits.h>#include <stdlib.h>#include <string.h>#include <grpc/support/alloc.h>#include <grpc/support/log.h>#include <grpc/support/string_util.h>#include "src/core/lib/channel/channel_args.h"#include "src/core/lib/channel/connected_channel.h"#include "src/core/lib/debug/stats.h"#include "src/core/lib/gpr/mpscq.h"#include "src/core/lib/gpr/spinlock.h"#include "src/core/lib/gpr/string.h"#include "src/core/lib/iomgr/executor.h"#include "src/core/lib/iomgr/iomgr.h"#include "src/core/lib/slice/slice_internal.h"#include "src/core/lib/surface/api_trace.h"#include "src/core/lib/surface/call.h"#include "src/core/lib/surface/channel.h"#include "src/core/lib/surface/completion_queue.h"#include "src/core/lib/surface/init.h"#include "src/core/lib/transport/metadata.h"#include "src/core/lib/transport/static_metadata.h"grpc_core::TraceFlag grpc_server_channel_trace(false, "server_channel");namespace {struct listener {  void* arg;  void (*start)(grpc_server* server, void* arg, grpc_pollset** pollsets,                size_t pollset_count);  void (*destroy)(grpc_server* server, void* arg, grpc_closure* closure);  struct listener* next;  intptr_t socket_uuid;  grpc_closure destroy_done;};enum requested_call_type { BATCH_CALL, REGISTERED_CALL };struct registered_method;struct requested_call {  gpr_mpscq_node request_link; /* must be first */  requested_call_type type;  size_t cq_idx;  void* tag;  grpc_server* server;  grpc_completion_queue* cq_bound_to_call;  grpc_call** call;  grpc_cq_completion completion;  grpc_metadata_array* initial_metadata;  union {    struct {      grpc_call_details* details;    } batch;    struct {      registered_method* method;      gpr_timespec* deadline;      grpc_byte_buffer** optional_payload;    } registered;  } data;};struct channel_registered_method {  registered_method* server_registered_method;  uint32_t flags;  bool has_host;  grpc_slice method;  grpc_slice host;};struct channel_data {  grpc_server* server;  grpc_connectivity_state connectivity_state;  grpc_channel* channel;  size_t cq_idx;  /* linked list of all channels on a server */  channel_data* next;  channel_data* prev;  channel_registered_method* registered_methods;  uint32_t registered_method_slots;  uint32_t registered_method_max_probes;  grpc_closure finish_destroy_channel_closure;  grpc_closure channel_connectivity_changed;  intptr_t socket_uuid;};typedef struct shutdown_tag {  void* tag;  grpc_completion_queue* cq;  grpc_cq_completion completion;} shutdown_tag;typedef enum {  /* waiting for metadata */  NOT_STARTED,  /* inital metadata read, not flow controlled in yet */  PENDING,  /* flow controlled in, on completion queue */  ACTIVATED,  /* cancelled before being queued */  ZOMBIED} call_state;typedef struct request_matcher request_matcher;struct call_data {  grpc_call* call;  gpr_atm state;  bool path_set;  bool host_set;  grpc_slice path;  grpc_slice host;  grpc_millis deadline;  grpc_completion_queue* cq_new;  grpc_metadata_batch* recv_initial_metadata;  uint32_t recv_initial_metadata_flags;  grpc_metadata_array initial_metadata;  request_matcher* matcher;  grpc_byte_buffer* payload;  grpc_closure got_initial_metadata;  grpc_closure server_on_recv_initial_metadata;  grpc_closure kill_zombie_closure;  grpc_closure* on_done_recv_initial_metadata;  grpc_closure recv_trailing_metadata_ready;  grpc_error* recv_initial_metadata_error;  grpc_closure* original_recv_trailing_metadata_ready;  grpc_error* recv_trailing_metadata_error;  bool seen_recv_trailing_metadata_ready;  grpc_closure publish;  call_data* pending_next;  grpc_call_combiner* call_combiner;};struct request_matcher {  grpc_server* server;  call_data* pending_head;  call_data* pending_tail;  gpr_locked_mpscq* requests_per_cq;};struct registered_method {  char* method;  char* host;  grpc_server_register_method_payload_handling payload_handling;  uint32_t flags;  /* one request matcher per method */  request_matcher matcher;  registered_method* next;};typedef struct {  grpc_channel** channels;  size_t num_channels;} channel_broadcaster;}  // namespacestruct grpc_server {  grpc_channel_args* channel_args;  grpc_resource_user* default_resource_user;  grpc_completion_queue** cqs;  grpc_pollset** pollsets;  size_t cq_count;  size_t pollset_count;  bool started;  /* The two following mutexes control access to server-state     mu_global controls access to non-call-related state (e.g., channel state)     mu_call controls access to call-related state (e.g., the call lists)     If they are ever required to be nested, you must lock mu_global     before mu_call. This is currently used in shutdown processing     (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */  gpr_mu mu_global; /* mutex for server and channel state */  gpr_mu mu_call;   /* mutex for call-specific state */  /* startup synchronization: flag is protected by mu_global, signals whether     we are doing the listener start routine or not */  bool starting;  gpr_cv starting_cv;  registered_method* registered_methods;  /** one request matcher for unregistered methods */  request_matcher unregistered_request_matcher;  gpr_atm shutdown_flag;  uint8_t shutdown_published;  size_t num_shutdown_tags;  shutdown_tag* shutdown_tags;  channel_data root_channel_data;  listener* listeners;  int listeners_destroyed;  gpr_refcount internal_refcount;  /** when did we print the last shutdown progress message */  gpr_timespec last_shutdown_message_time;  grpc_core::RefCountedPtr<grpc_core::channelz::ServerNode> channelz_server;};#define SERVER_FROM_CALL_ELEM(elem) \  (((channel_data*)(elem)->channel_data)->server)static void publish_new_rpc(void* calld, grpc_error* error);static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,                      grpc_error* error);/* Before calling maybe_finish_shutdown, we must hold mu_global and not   hold mu_call */static void maybe_finish_shutdown(grpc_server* server);/* * channel broadcaster *//* assumes server locked */static void channel_broadcaster_init(grpc_server* s, channel_broadcaster* cb) {  channel_data* c;  size_t count = 0;  for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {    count++;  }  cb->num_channels = count;  cb->channels = static_cast<grpc_channel**>(      gpr_malloc(sizeof(*cb->channels) * cb->num_channels));  count = 0;  for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {    cb->channels[count++] = c->channel;    GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");  }}struct shutdown_cleanup_args {  grpc_closure closure;  grpc_slice slice;};static void shutdown_cleanup(void* arg, grpc_error* error) {  struct shutdown_cleanup_args* a =      static_cast<struct shutdown_cleanup_args*>(arg);  grpc_slice_unref_internal(a->slice);  gpr_free(a);}static void send_shutdown(grpc_channel* channel, bool send_goaway,                          grpc_error* send_disconnect) {  struct shutdown_cleanup_args* sc =      static_cast<struct shutdown_cleanup_args*>(gpr_malloc(sizeof(*sc)));  GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc,                    grpc_schedule_on_exec_ctx);  grpc_transport_op* op = grpc_make_transport_op(&sc->closure);  grpc_channel_element* elem;  op->goaway_error =      send_goaway ? grpc_error_set_int(                        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown"),                        GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK)                  : GRPC_ERROR_NONE;  op->set_accept_stream = true;  sc->slice = grpc_slice_from_copied_string("Server shutdown");  op->disconnect_with_error = send_disconnect;  elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);  elem->filter->start_transport_op(elem, op);}static void channel_broadcaster_shutdown(channel_broadcaster* cb,                                         bool send_goaway,                                         grpc_error* force_disconnect) {  size_t i;  for (i = 0; i < cb->num_channels; i++) {    send_shutdown(cb->channels[i], send_goaway,                  GRPC_ERROR_REF(force_disconnect));    GRPC_CHANNEL_INTERNAL_UNREF(cb->channels[i], "broadcast");  }  gpr_free(cb->channels);  GRPC_ERROR_UNREF(force_disconnect);}/* * request_matcher */static void request_matcher_init(request_matcher* rm, grpc_server* server) {  memset(rm, 0, sizeof(*rm));  rm->server = server;  rm->requests_per_cq = static_cast<gpr_locked_mpscq*>(      gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count));  for (size_t i = 0; i < server->cq_count; i++) {    gpr_locked_mpscq_init(&rm->requests_per_cq[i]);  }}static void request_matcher_destroy(request_matcher* rm) {  for (size_t i = 0; i < rm->server->cq_count; i++) {    GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == nullptr);    gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]);  }  gpr_free(rm->requests_per_cq);}static void kill_zombie(void* elem, grpc_error* error) {  grpc_call_unref(      grpc_call_from_top_element(static_cast<grpc_call_element*>(elem)));}static void request_matcher_zombify_all_pending_calls(request_matcher* rm) {  while (rm->pending_head) {    call_data* calld = rm->pending_head;    rm->pending_head = calld->pending_next;    gpr_atm_no_barrier_store(&calld->state, ZOMBIED);    GRPC_CLOSURE_INIT(        &calld->kill_zombie_closure, kill_zombie,        grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),        grpc_schedule_on_exec_ctx);    GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);  }}static void request_matcher_kill_requests(grpc_server* server,                                          request_matcher* rm,                                          grpc_error* error) {  requested_call* rc;  for (size_t i = 0; i < server->cq_count; i++) {    while ((rc = reinterpret_cast<requested_call*>(                gpr_locked_mpscq_pop(&rm->requests_per_cq[i]))) != nullptr) {      fail_call(server, i, rc, GRPC_ERROR_REF(error));    }  }  GRPC_ERROR_UNREF(error);}/* * server proper */static void server_ref(grpc_server* server) {  gpr_ref(&server->internal_refcount);}static void server_delete(grpc_server* server) {  registered_method* rm;  size_t i;  server->channelz_server.reset();  grpc_channel_args_destroy(server->channel_args);  gpr_mu_destroy(&server->mu_global);  gpr_mu_destroy(&server->mu_call);  gpr_cv_destroy(&server->starting_cv);  while ((rm = server->registered_methods) != nullptr) {    server->registered_methods = rm->next;    if (server->started) {      request_matcher_destroy(&rm->matcher);    }    gpr_free(rm->method);    gpr_free(rm->host);    gpr_free(rm);  }  if (server->started) {    request_matcher_destroy(&server->unregistered_request_matcher);  }  for (i = 0; i < server->cq_count; i++) {    GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");  }  gpr_free(server->cqs);  gpr_free(server->pollsets);  gpr_free(server->shutdown_tags);  gpr_free(server);}static void server_unref(grpc_server* server) {  if (gpr_unref(&server->internal_refcount)) {    server_delete(server);  }}static int is_channel_orphaned(channel_data* chand) {  return chand->next == chand;}static void orphan_channel(channel_data* chand) {  chand->next->prev = chand->prev;  chand->prev->next = chand->next;  chand->next = chand->prev = chand;}static void finish_destroy_channel(void* cd, grpc_error* error) {  channel_data* chand = static_cast<channel_data*>(cd);  grpc_server* server = chand->server;  GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "server");  server_unref(server);}static void destroy_channel(channel_data* chand, grpc_error* error) {  if (is_channel_orphaned(chand)) return;  GPR_ASSERT(chand->server != nullptr);  orphan_channel(chand);  server_ref(chand->server);  maybe_finish_shutdown(chand->server);  GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure,                    finish_destroy_channel, chand, grpc_schedule_on_exec_ctx);  if (grpc_server_channel_trace.enabled() && error != GRPC_ERROR_NONE) {    const char* msg = grpc_error_string(error);    gpr_log(GPR_INFO, "Disconnected client: %s", msg);  }  GRPC_ERROR_UNREF(error);  grpc_transport_op* op =      grpc_make_transport_op(&chand->finish_destroy_channel_closure);  op->set_accept_stream = true;  grpc_channel_next_op(grpc_channel_stack_element(                           grpc_channel_get_channel_stack(chand->channel), 0),                       op);}static void done_request_event(void* req, grpc_cq_completion* c) {  gpr_free(req);}static void publish_call(grpc_server* server, call_data* calld, size_t cq_idx,                         requested_call* rc) {  grpc_call_set_completion_queue(calld->call, rc->cq_bound_to_call);  grpc_call* call = calld->call;  *rc->call = call;  calld->cq_new = server->cqs[cq_idx];  GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);  switch (rc->type) {    case BATCH_CALL:      GPR_ASSERT(calld->host_set);      GPR_ASSERT(calld->path_set);      rc->data.batch.details->host = grpc_slice_ref_internal(calld->host);      rc->data.batch.details->method = grpc_slice_ref_internal(calld->path);      rc->data.batch.details->deadline =          grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);      rc->data.batch.details->flags = calld->recv_initial_metadata_flags;      break;    case REGISTERED_CALL:      *rc->data.registered.deadline =          grpc_millis_to_timespec(calld->deadline, GPR_CLOCK_MONOTONIC);      if (rc->data.registered.optional_payload) {        *rc->data.registered.optional_payload = calld->payload;        calld->payload = nullptr;      }      break;    default:      GPR_UNREACHABLE_CODE(return );  }  grpc_cq_end_op(calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event,                 rc, &rc->completion);}static void publish_new_rpc(void* arg, grpc_error* error) {  grpc_call_element* call_elem = static_cast<grpc_call_element*>(arg);  call_data* calld = static_cast<call_data*>(call_elem->call_data);  channel_data* chand = static_cast<channel_data*>(call_elem->channel_data);  request_matcher* rm = calld->matcher;  grpc_server* server = rm->server;  if (error != GRPC_ERROR_NONE || gpr_atm_acq_load(&server->shutdown_flag)) {    gpr_atm_no_barrier_store(&calld->state, ZOMBIED);    GRPC_CLOSURE_INIT(        &calld->kill_zombie_closure, kill_zombie,        grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),        grpc_schedule_on_exec_ctx);    GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_REF(error));    return;  }  for (size_t i = 0; i < server->cq_count; i++) {    size_t cq_idx = (chand->cq_idx + i) % server->cq_count;    requested_call* rc = reinterpret_cast<requested_call*>(        gpr_locked_mpscq_try_pop(&rm->requests_per_cq[cq_idx]));    if (rc == nullptr) {      continue;    } else {      GRPC_STATS_INC_SERVER_CQS_CHECKED(i);      gpr_atm_no_barrier_store(&calld->state, ACTIVATED);      publish_call(server, calld, cq_idx, rc);      return; /* early out */    }  }  /* no cq to take the request found: queue it on the slow list */  GRPC_STATS_INC_SERVER_SLOWPATH_REQUESTS_QUEUED();  gpr_mu_lock(&server->mu_call);  // We need to ensure that all the queues are empty.  We do this under  // the server mu_call lock to ensure that if something is added to  // an empty request queue, it will block until the call is actually  // added to the pending list.  for (size_t i = 0; i < server->cq_count; i++) {    size_t cq_idx = (chand->cq_idx + i) % server->cq_count;    requested_call* rc = reinterpret_cast<requested_call*>(        gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));    if (rc == nullptr) {      continue;    } else {      gpr_mu_unlock(&server->mu_call);      GRPC_STATS_INC_SERVER_CQS_CHECKED(i + server->cq_count);      gpr_atm_no_barrier_store(&calld->state, ACTIVATED);      publish_call(server, calld, cq_idx, rc);      return; /* early out */    }  }  gpr_atm_no_barrier_store(&calld->state, PENDING);  if (rm->pending_head == nullptr) {    rm->pending_tail = rm->pending_head = calld;  } else {    rm->pending_tail->pending_next = calld;    rm->pending_tail = calld;  }  calld->pending_next = nullptr;  gpr_mu_unlock(&server->mu_call);}static void finish_start_new_rpc(    grpc_server* server, grpc_call_element* elem, request_matcher* rm,    grpc_server_register_method_payload_handling payload_handling) {  call_data* calld = static_cast<call_data*>(elem->call_data);  if (gpr_atm_acq_load(&server->shutdown_flag)) {    gpr_atm_no_barrier_store(&calld->state, ZOMBIED);    GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,                      grpc_schedule_on_exec_ctx);    GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);    return;  }  calld->matcher = rm;  switch (payload_handling) {    case GRPC_SRM_PAYLOAD_NONE:      publish_new_rpc(elem, GRPC_ERROR_NONE);      break;    case GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER: {      grpc_op op;      memset(&op, 0, sizeof(op));      op.op = GRPC_OP_RECV_MESSAGE;      op.data.recv_message.recv_message = &calld->payload;      GRPC_CLOSURE_INIT(&calld->publish, publish_new_rpc, elem,                        grpc_schedule_on_exec_ctx);      grpc_call_start_batch_and_execute(calld->call, &op, 1, &calld->publish);      break;    }  }}static void start_new_rpc(grpc_call_element* elem) {  channel_data* chand = static_cast<channel_data*>(elem->channel_data);  call_data* calld = static_cast<call_data*>(elem->call_data);  grpc_server* server = chand->server;  uint32_t i;  uint32_t hash;  channel_registered_method* rm;  if (chand->registered_methods && calld->path_set && calld->host_set) {    /* TODO(ctiller): unify these two searches */    /* check for an exact match with host */    hash = GRPC_MDSTR_KV_HASH(grpc_slice_hash(calld->host),                              grpc_slice_hash(calld->path));    for (i = 0; i <= chand->registered_method_max_probes; i++) {      rm = &chand->registered_methods[(hash + i) %                                      chand->registered_method_slots];      if (!rm) break;      if (!rm->has_host) continue;      if (!grpc_slice_eq(rm->host, calld->host)) continue;      if (!grpc_slice_eq(rm->method, calld->path)) continue;      if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&          0 == (calld->recv_initial_metadata_flags &                GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {        continue;      }      finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,                           rm->server_registered_method->payload_handling);      return;    }    /* check for a wildcard method definition (no host set) */    hash = GRPC_MDSTR_KV_HASH(0, grpc_slice_hash(calld->path));    for (i = 0; i <= chand->registered_method_max_probes; i++) {      rm = &chand->registered_methods[(hash + i) %                                      chand->registered_method_slots];      if (!rm) break;      if (rm->has_host) continue;      if (!grpc_slice_eq(rm->method, calld->path)) continue;      if ((rm->flags & GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) &&          0 == (calld->recv_initial_metadata_flags &                GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST)) {        continue;      }      finish_start_new_rpc(server, elem, &rm->server_registered_method->matcher,                           rm->server_registered_method->payload_handling);      return;    }  }  finish_start_new_rpc(server, elem, &server->unregistered_request_matcher,                       GRPC_SRM_PAYLOAD_NONE);}static int num_listeners(grpc_server* server) {  listener* l;  int n = 0;  for (l = server->listeners; l; l = l->next) {    n++;  }  return n;}static void done_shutdown_event(void* server, grpc_cq_completion* completion) {  server_unref(static_cast<grpc_server*>(server));}static int num_channels(grpc_server* server) {  channel_data* chand;  int n = 0;  for (chand = server->root_channel_data.next;       chand != &server->root_channel_data; chand = chand->next) {    n++;  }  return n;}static void kill_pending_work_locked(grpc_server* server, grpc_error* error) {  if (server->started) {    request_matcher_kill_requests(server, &server->unregistered_request_matcher,                                  GRPC_ERROR_REF(error));    request_matcher_zombify_all_pending_calls(        &server->unregistered_request_matcher);    for (registered_method* rm = server->registered_methods; rm;         rm = rm->next) {      request_matcher_kill_requests(server, &rm->matcher,                                    GRPC_ERROR_REF(error));      request_matcher_zombify_all_pending_calls(&rm->matcher);    }  }  GRPC_ERROR_UNREF(error);}static void maybe_finish_shutdown(grpc_server* server) {  size_t i;  if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {    return;  }  kill_pending_work_locked(      server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));  if (server->root_channel_data.next != &server->root_channel_data ||      server->listeners_destroyed < num_listeners(server)) {    if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),                                  server->last_shutdown_message_time),                     gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {      server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);      gpr_log(GPR_DEBUG,              "Waiting for %d channels and %d/%d listeners to be destroyed"              " before shutting down server",              num_channels(server),              num_listeners(server) - server->listeners_destroyed,              num_listeners(server));    }    return;  }  server->shutdown_published = 1;  for (i = 0; i < server->num_shutdown_tags; i++) {    server_ref(server);    grpc_cq_end_op(server->shutdown_tags[i].cq, server->shutdown_tags[i].tag,                   GRPC_ERROR_NONE, done_shutdown_event, server,                   &server->shutdown_tags[i].completion);  }}static void server_on_recv_initial_metadata(void* ptr, grpc_error* error) {  grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);  call_data* calld = static_cast<call_data*>(elem->call_data);  grpc_millis op_deadline;  if (error == GRPC_ERROR_NONE) {    GPR_ASSERT(calld->recv_initial_metadata->idx.named.path != nullptr);    GPR_ASSERT(calld->recv_initial_metadata->idx.named.authority != nullptr);    calld->path = grpc_slice_ref_internal(        GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md));    calld->host = grpc_slice_ref_internal(        GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.authority->md));    calld->path_set = true;    calld->host_set = true;    grpc_metadata_batch_remove(calld->recv_initial_metadata,                               calld->recv_initial_metadata->idx.named.path);    grpc_metadata_batch_remove(        calld->recv_initial_metadata,        calld->recv_initial_metadata->idx.named.authority);  } else {    GRPC_ERROR_REF(error);  }  op_deadline = calld->recv_initial_metadata->deadline;  if (op_deadline != GRPC_MILLIS_INF_FUTURE) {    calld->deadline = op_deadline;  }  if (calld->host_set && calld->path_set) {    /* do nothing */  } else {    /* Pass the error reference to calld->recv_initial_metadata_error */    grpc_error* src_error = error;    error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(        "Missing :authority or :path", &src_error, 1);    GRPC_ERROR_UNREF(src_error);    calld->recv_initial_metadata_error = GRPC_ERROR_REF(error);  }  grpc_closure* closure = calld->on_done_recv_initial_metadata;  calld->on_done_recv_initial_metadata = nullptr;  if (calld->seen_recv_trailing_metadata_ready) {    GRPC_CALL_COMBINER_START(calld->call_combiner,                             &calld->recv_trailing_metadata_ready,                             calld->recv_trailing_metadata_error,                             "continue server_recv_trailing_metadata_ready");  }  GRPC_CLOSURE_RUN(closure, error);}static void server_recv_trailing_metadata_ready(void* user_data,                                                grpc_error* error) {  grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);  call_data* calld = static_cast<call_data*>(elem->call_data);  if (calld->on_done_recv_initial_metadata != nullptr) {    calld->recv_trailing_metadata_error = GRPC_ERROR_REF(error);    calld->seen_recv_trailing_metadata_ready = true;    GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,                      server_recv_trailing_metadata_ready, elem,                      grpc_schedule_on_exec_ctx);    GRPC_CALL_COMBINER_STOP(calld->call_combiner,                            "deferring server_recv_trailing_metadata_ready "                            "until after server_on_recv_initial_metadata");    return;  }  error =      grpc_error_add_child(GRPC_ERROR_REF(error),                           GRPC_ERROR_REF(calld->recv_initial_metadata_error));  GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready, error);}static void server_mutate_op(grpc_call_element* elem,                             grpc_transport_stream_op_batch* op) {  call_data* calld = static_cast<call_data*>(elem->call_data);  if (op->recv_initial_metadata) {    GPR_ASSERT(op->payload->recv_initial_metadata.recv_flags == nullptr);    calld->recv_initial_metadata =        op->payload->recv_initial_metadata.recv_initial_metadata;    calld->on_done_recv_initial_metadata =        op->payload->recv_initial_metadata.recv_initial_metadata_ready;    op->payload->recv_initial_metadata.recv_initial_metadata_ready =        &calld->server_on_recv_initial_metadata;    op->payload->recv_initial_metadata.recv_flags =        &calld->recv_initial_metadata_flags;  }  if (op->recv_trailing_metadata) {    calld->original_recv_trailing_metadata_ready =        op->payload->recv_trailing_metadata.recv_trailing_metadata_ready;    op->payload->recv_trailing_metadata.recv_trailing_metadata_ready =        &calld->recv_trailing_metadata_ready;  }}static void server_start_transport_stream_op_batch(    grpc_call_element* elem, grpc_transport_stream_op_batch* op) {  server_mutate_op(elem, op);  grpc_call_next_op(elem, op);}static void got_initial_metadata(void* ptr, grpc_error* error) {  grpc_call_element* elem = static_cast<grpc_call_element*>(ptr);  call_data* calld = static_cast<call_data*>(elem->call_data);  if (error == GRPC_ERROR_NONE) {    start_new_rpc(elem);  } else {    if (gpr_atm_full_cas(&calld->state, NOT_STARTED, ZOMBIED)) {      GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem,                        grpc_schedule_on_exec_ctx);      GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);    } else if (gpr_atm_full_cas(&calld->state, PENDING, ZOMBIED)) {      /* zombied call will be destroyed when it's removed from the pending         queue... later */    }  }}static void accept_stream(void* cd, grpc_transport* transport,                          const void* transport_server_data) {  channel_data* chand = static_cast<channel_data*>(cd);  /* create a call */  grpc_call_create_args args;  memset(&args, 0, sizeof(args));  args.channel = chand->channel;  args.server_transport_data = transport_server_data;  args.send_deadline = GRPC_MILLIS_INF_FUTURE;  args.server = chand->server;  grpc_call* call;  grpc_error* error = grpc_call_create(&args, &call);  grpc_call_element* elem =      grpc_call_stack_element(grpc_call_get_call_stack(call), 0);  if (error != GRPC_ERROR_NONE) {    got_initial_metadata(elem, error);    GRPC_ERROR_UNREF(error);    return;  }  call_data* calld = static_cast<call_data*>(elem->call_data);  grpc_op op;  memset(&op, 0, sizeof(op));  op.op = GRPC_OP_RECV_INITIAL_METADATA;  op.data.recv_initial_metadata.recv_initial_metadata =      &calld->initial_metadata;  GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem,                    grpc_schedule_on_exec_ctx);  grpc_call_start_batch_and_execute(call, &op, 1, &calld->got_initial_metadata);}static void channel_connectivity_changed(void* cd, grpc_error* error) {  channel_data* chand = static_cast<channel_data*>(cd);  grpc_server* server = chand->server;  if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) {    grpc_transport_op* op = grpc_make_transport_op(nullptr);    op->on_connectivity_state_change = &chand->channel_connectivity_changed;    op->connectivity_state = &chand->connectivity_state;    grpc_channel_next_op(grpc_channel_stack_element(                             grpc_channel_get_channel_stack(chand->channel), 0),                         op);  } else {    gpr_mu_lock(&server->mu_global);    destroy_channel(chand, GRPC_ERROR_REF(error));    gpr_mu_unlock(&server->mu_global);    GRPC_CHANNEL_INTERNAL_UNREF(chand->channel, "connectivity");  }}static grpc_error* init_call_elem(grpc_call_element* elem,                                  const grpc_call_element_args* args) {  call_data* calld = static_cast<call_data*>(elem->call_data);  channel_data* chand = static_cast<channel_data*>(elem->channel_data);  memset(calld, 0, sizeof(call_data));  calld->deadline = GRPC_MILLIS_INF_FUTURE;  calld->call = grpc_call_from_top_element(elem);  calld->call_combiner = args->call_combiner;  GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata,                    server_on_recv_initial_metadata, elem,                    grpc_schedule_on_exec_ctx);  GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready,                    server_recv_trailing_metadata_ready, elem,                    grpc_schedule_on_exec_ctx);  server_ref(chand->server);  return GRPC_ERROR_NONE;}static void destroy_call_elem(grpc_call_element* elem,                              const grpc_call_final_info* final_info,                              grpc_closure* ignored) {  channel_data* chand = static_cast<channel_data*>(elem->channel_data);  call_data* calld = static_cast<call_data*>(elem->call_data);  GPR_ASSERT(calld->state != PENDING);  GRPC_ERROR_UNREF(calld->recv_initial_metadata_error);  if (calld->host_set) {    grpc_slice_unref_internal(calld->host);  }  if (calld->path_set) {    grpc_slice_unref_internal(calld->path);  }  grpc_metadata_array_destroy(&calld->initial_metadata);  grpc_byte_buffer_destroy(calld->payload);  server_unref(chand->server);}static grpc_error* init_channel_elem(grpc_channel_element* elem,                                     grpc_channel_element_args* args) {  channel_data* chand = static_cast<channel_data*>(elem->channel_data);  GPR_ASSERT(args->is_first);  GPR_ASSERT(!args->is_last);  chand->server = nullptr;  chand->channel = nullptr;  chand->next = chand->prev = chand;  chand->registered_methods = nullptr;  chand->connectivity_state = GRPC_CHANNEL_IDLE;  GRPC_CLOSURE_INIT(&chand->channel_connectivity_changed,                    channel_connectivity_changed, chand,                    grpc_schedule_on_exec_ctx);  return GRPC_ERROR_NONE;}static void destroy_channel_elem(grpc_channel_element* elem) {  size_t i;  channel_data* chand = static_cast<channel_data*>(elem->channel_data);  if (chand->registered_methods) {    for (i = 0; i < chand->registered_method_slots; i++) {      grpc_slice_unref_internal(chand->registered_methods[i].method);      if (chand->registered_methods[i].has_host) {        grpc_slice_unref_internal(chand->registered_methods[i].host);      }    }    gpr_free(chand->registered_methods);  }  if (chand->server) {    gpr_mu_lock(&chand->server->mu_global);    chand->next->prev = chand->prev;    chand->prev->next = chand->next;    chand->next = chand->prev = chand;    maybe_finish_shutdown(chand->server);    gpr_mu_unlock(&chand->server->mu_global);    server_unref(chand->server);  }}const grpc_channel_filter grpc_server_top_filter = {    server_start_transport_stream_op_batch,    grpc_channel_next_op,    sizeof(call_data),    init_call_elem,    grpc_call_stack_ignore_set_pollset_or_pollset_set,    destroy_call_elem,    sizeof(channel_data),    init_channel_elem,    destroy_channel_elem,    grpc_channel_next_get_info,    "server",};static void register_completion_queue(grpc_server* server,                                      grpc_completion_queue* cq,                                      void* reserved) {  size_t i, n;  GPR_ASSERT(!reserved);  for (i = 0; i < server->cq_count; i++) {    if (server->cqs[i] == cq) return;  }  GRPC_CQ_INTERNAL_REF(cq, "server");  n = server->cq_count++;  server->cqs = static_cast<grpc_completion_queue**>(gpr_realloc(      server->cqs, server->cq_count * sizeof(grpc_completion_queue*)));  server->cqs[n] = cq;}void grpc_server_register_completion_queue(grpc_server* server,                                           grpc_completion_queue* cq,                                           void* reserved) {  GRPC_API_TRACE(      "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,      (server, cq, reserved));  if (grpc_get_cq_completion_type(cq) != GRPC_CQ_NEXT) {    gpr_log(GPR_INFO,            "Completion queue which is not of type GRPC_CQ_NEXT is being "            "registered as a server-completion-queue");    /* Ideally we should log an error and abort but ruby-wrapped-language API       calls grpc_completion_queue_pluck() on server completion queues */  }  register_completion_queue(server, cq, reserved);}grpc_server* grpc_server_create(const grpc_channel_args* args, void* reserved) {  grpc_core::ExecCtx exec_ctx;  GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved));  grpc_server* server =      static_cast<grpc_server*>(gpr_zalloc(sizeof(grpc_server)));  gpr_mu_init(&server->mu_global);  gpr_mu_init(&server->mu_call);  gpr_cv_init(&server->starting_cv);  /* decremented by grpc_server_destroy */  gpr_ref_init(&server->internal_refcount, 1);  server->root_channel_data.next = server->root_channel_data.prev =      &server->root_channel_data;  server->channel_args = grpc_channel_args_copy(args);  const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ);  if (grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT)) {    arg = grpc_channel_args_find(        args, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE);    size_t channel_tracer_max_memory = grpc_channel_arg_get_integer(        arg,        {GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX});    server->channelz_server =        grpc_core::MakeRefCounted<grpc_core::channelz::ServerNode>(            server, channel_tracer_max_memory);    server->channelz_server->AddTraceEvent(        grpc_core::channelz::ChannelTrace::Severity::Info,        grpc_slice_from_static_string("Server created"));  }  if (args != nullptr) {    grpc_resource_quota* resource_quota =        grpc_resource_quota_from_channel_args(args, false /* create */);    if (resource_quota != nullptr) {      server->default_resource_user =          grpc_resource_user_create(resource_quota, "default");    }  }  return server;}static int streq(const char* a, const char* b) {  if (a == nullptr && b == nullptr) return 1;  if (a == nullptr) return 0;  if (b == nullptr) return 0;  return 0 == strcmp(a, b);}void* grpc_server_register_method(    grpc_server* server, const char* method, const char* host,    grpc_server_register_method_payload_handling payload_handling,    uint32_t flags) {  registered_method* m;  GRPC_API_TRACE(      "grpc_server_register_method(server=%p, method=%s, host=%s, "      "flags=0x%08x)",      4, (server, method, host, flags));  if (!method) {    gpr_log(GPR_ERROR,            "grpc_server_register_method method string cannot be NULL");    return nullptr;  }  for (m = server->registered_methods; m; m = m->next) {    if (streq(m->method, method) && streq(m->host, host)) {      gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,              host ? host : "*");      return nullptr;    }  }  if ((flags & ~GRPC_INITIAL_METADATA_USED_MASK) != 0) {    gpr_log(GPR_ERROR, "grpc_server_register_method invalid flags 0x%08x",            flags);    return nullptr;  }  m = static_cast<registered_method*>(gpr_zalloc(sizeof(registered_method)));  m->method = gpr_strdup(method);  m->host = gpr_strdup(host);  m->next = server->registered_methods;  m->payload_handling = payload_handling;  m->flags = flags;  server->registered_methods = m;  return m;}static void start_listeners(void* s, grpc_error* error) {  grpc_server* server = static_cast<grpc_server*>(s);  for (listener* l = server->listeners; l; l = l->next) {    l->start(server, l->arg, server->pollsets, server->pollset_count);  }  gpr_mu_lock(&server->mu_global);  server->starting = false;  gpr_cv_signal(&server->starting_cv);  gpr_mu_unlock(&server->mu_global);  server_unref(server);}void grpc_server_start(grpc_server* server) {  size_t i;  grpc_core::ExecCtx exec_ctx;  GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));  server->started = true;  server->pollset_count = 0;  server->pollsets = static_cast<grpc_pollset**>(      gpr_malloc(sizeof(grpc_pollset*) * server->cq_count));  for (i = 0; i < server->cq_count; i++) {    if (grpc_cq_can_listen(server->cqs[i])) {      server->pollsets[server->pollset_count++] =          grpc_cq_pollset(server->cqs[i]);    }  }  request_matcher_init(&server->unregistered_request_matcher, server);  for (registered_method* rm = server->registered_methods; rm; rm = rm->next) {    request_matcher_init(&rm->matcher, server);  }  server_ref(server);  server->starting = true;  GRPC_CLOSURE_SCHED(      GRPC_CLOSURE_CREATE(start_listeners, server,                          grpc_executor_scheduler(GRPC_EXECUTOR_SHORT)),      GRPC_ERROR_NONE);}void grpc_server_get_pollsets(grpc_server* server, grpc_pollset*** pollsets,                              size_t* pollset_count) {  *pollset_count = server->pollset_count;  *pollsets = server->pollsets;}void grpc_server_setup_transport(grpc_server* s, grpc_transport* transport,                                 grpc_pollset* accepting_pollset,                                 const grpc_channel_args* args,                                 intptr_t socket_uuid,                                 grpc_resource_user* resource_user) {  size_t num_registered_methods;  size_t alloc;  registered_method* rm;  channel_registered_method* crm;  grpc_channel* channel;  channel_data* chand;  uint32_t hash;  size_t slots;  uint32_t probes;  uint32_t max_probes = 0;  grpc_transport_op* op = nullptr;  channel = grpc_channel_create(nullptr, args, GRPC_SERVER_CHANNEL, transport,                                resource_user);  chand = static_cast<channel_data*>(      grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0)          ->channel_data);  chand->server = s;  server_ref(s);  chand->channel = channel;  chand->socket_uuid = socket_uuid;  size_t cq_idx;  for (cq_idx = 0; cq_idx < s->cq_count; cq_idx++) {    if (grpc_cq_pollset(s->cqs[cq_idx]) == accepting_pollset) break;  }  if (cq_idx == s->cq_count) {    /* completion queue not found: pick a random one to publish new calls to */    cq_idx = static_cast<size_t>(rand()) % s->cq_count;  }  chand->cq_idx = cq_idx;  num_registered_methods = 0;  for (rm = s->registered_methods; rm; rm = rm->next) {    num_registered_methods++;  }  /* build a lookup table phrased in terms of mdstr's in this channels context     to quickly find registered methods */  if (num_registered_methods > 0) {    slots = 2 * num_registered_methods;    alloc = sizeof(channel_registered_method) * slots;    chand->registered_methods =        static_cast<channel_registered_method*>(gpr_zalloc(alloc));    for (rm = s->registered_methods; rm; rm = rm->next) {      grpc_slice host;      bool has_host;      grpc_slice method;      if (rm->host != nullptr) {        host = grpc_slice_intern(grpc_slice_from_static_string(rm->host));        has_host = true;      } else {        has_host = false;      }      method = grpc_slice_intern(grpc_slice_from_static_string(rm->method));      hash = GRPC_MDSTR_KV_HASH(has_host ? grpc_slice_hash(host) : 0,                                grpc_slice_hash(method));      for (probes = 0; chand->registered_methods[(hash + probes) % slots]                           .server_registered_method != nullptr;           probes++)        ;      if (probes > max_probes) max_probes = probes;      crm = &chand->registered_methods[(hash + probes) % slots];      crm->server_registered_method = rm;      crm->flags = rm->flags;      crm->has_host = has_host;      if (has_host) {        crm->host = host;      }      crm->method = method;    }    GPR_ASSERT(slots <= UINT32_MAX);    chand->registered_method_slots = static_cast<uint32_t>(slots);    chand->registered_method_max_probes = max_probes;  }  gpr_mu_lock(&s->mu_global);  chand->next = &s->root_channel_data;  chand->prev = chand->next->prev;  chand->next->prev = chand->prev->next = chand;  gpr_mu_unlock(&s->mu_global);  GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");  op = grpc_make_transport_op(nullptr);  op->set_accept_stream = true;  op->set_accept_stream_fn = accept_stream;  op->set_accept_stream_user_data = chand;  op->on_connectivity_state_change = &chand->channel_connectivity_changed;  op->connectivity_state = &chand->connectivity_state;  if (gpr_atm_acq_load(&s->shutdown_flag) != 0) {    op->disconnect_with_error =        GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server shutdown");  }  grpc_transport_perform_op(transport, op);}void grpc_server_populate_server_sockets(    grpc_server* s, grpc_core::channelz::ChildRefsList* server_sockets,    intptr_t start_idx) {  gpr_mu_lock(&s->mu_global);  channel_data* c = nullptr;  for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {    intptr_t socket_uuid = c->socket_uuid;    if (socket_uuid >= start_idx) {      server_sockets->push_back(socket_uuid);    }  }  gpr_mu_unlock(&s->mu_global);}void grpc_server_populate_listen_sockets(    grpc_server* server, grpc_core::channelz::ChildRefsList* listen_sockets) {  gpr_mu_lock(&server->mu_global);  for (listener* l = server->listeners; l != nullptr; l = l->next) {    listen_sockets->push_back(l->socket_uuid);  }  gpr_mu_unlock(&server->mu_global);}void done_published_shutdown(void* done_arg, grpc_cq_completion* storage) {  (void)done_arg;  gpr_free(storage);}static void listener_destroy_done(void* s, grpc_error* error) {  grpc_server* server = static_cast<grpc_server*>(s);  gpr_mu_lock(&server->mu_global);  server->listeners_destroyed++;  maybe_finish_shutdown(server);  gpr_mu_unlock(&server->mu_global);}/*  - Kills all pending requests-for-incoming-RPC-calls (i.e the requests made via    grpc_server_request_call and grpc_server_request_registered call will now be    cancelled). See 'kill_pending_work_locked()'  - Shuts down the listeners (i.e the server will no longer listen on the port    for new incoming channels).  - Iterates through all channels on the server and sends shutdown msg (see    'channel_broadcaster_shutdown()' for details) to the clients via the    transport layer. The transport layer then guarantees the following:     -- Sends shutdown to the client (for eg: HTTP2 transport sends GOAWAY)     -- If the server has outstanding calls that are in the process, the        connection is NOT closed until the server is done with all those calls     -- Once, there are no more calls in progress, the channel is closed */void grpc_server_shutdown_and_notify(grpc_server* server,                                     grpc_completion_queue* cq, void* tag) {  listener* l;  shutdown_tag* sdt;  channel_broadcaster broadcaster;  grpc_core::ExecCtx exec_ctx;  GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,                 (server, cq, tag));  /* wait for startup to be finished: locks mu_global */  gpr_mu_lock(&server->mu_global);  while (server->starting) {    gpr_cv_wait(&server->starting_cv, &server->mu_global,                gpr_inf_future(GPR_CLOCK_MONOTONIC));  }  /* stay locked, and gather up some stuff to do */  GPR_ASSERT(grpc_cq_begin_op(cq, tag));  if (server->shutdown_published) {    grpc_cq_end_op(cq, tag, GRPC_ERROR_NONE, done_published_shutdown, nullptr,                   static_cast<grpc_cq_completion*>(                       gpr_malloc(sizeof(grpc_cq_completion))));    gpr_mu_unlock(&server->mu_global);    return;  }  server->shutdown_tags = static_cast<shutdown_tag*>(      gpr_realloc(server->shutdown_tags,                  sizeof(shutdown_tag) * (server->num_shutdown_tags + 1)));  sdt = &server->shutdown_tags[server->num_shutdown_tags++];  sdt->tag = tag;  sdt->cq = cq;  if (gpr_atm_acq_load(&server->shutdown_flag)) {    gpr_mu_unlock(&server->mu_global);    return;  }  server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);  channel_broadcaster_init(server, &broadcaster);  gpr_atm_rel_store(&server->shutdown_flag, 1);  /* collect all unregistered then registered calls */  gpr_mu_lock(&server->mu_call);  kill_pending_work_locked(      server, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));  gpr_mu_unlock(&server->mu_call);  maybe_finish_shutdown(server);  gpr_mu_unlock(&server->mu_global);  /* Shutdown listeners */  for (l = server->listeners; l; l = l->next) {    GRPC_CLOSURE_INIT(&l->destroy_done, listener_destroy_done, server,                      grpc_schedule_on_exec_ctx);    l->destroy(server, l->arg, &l->destroy_done);  }  channel_broadcaster_shutdown(&broadcaster, true /* send_goaway */,                               GRPC_ERROR_NONE);  if (server->default_resource_user != nullptr) {    grpc_resource_quota_unref(        grpc_resource_user_quota(server->default_resource_user));    grpc_resource_user_shutdown(server->default_resource_user);    grpc_resource_user_unref(server->default_resource_user);  }}void grpc_server_cancel_all_calls(grpc_server* server) {  channel_broadcaster broadcaster;  grpc_core::ExecCtx exec_ctx;  GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));  gpr_mu_lock(&server->mu_global);  channel_broadcaster_init(server, &broadcaster);  gpr_mu_unlock(&server->mu_global);  channel_broadcaster_shutdown(      &broadcaster, false /* send_goaway */,      GRPC_ERROR_CREATE_FROM_STATIC_STRING("Cancelling all calls"));}void grpc_server_destroy(grpc_server* server) {  listener* l;  grpc_core::ExecCtx exec_ctx;  GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));  gpr_mu_lock(&server->mu_global);  GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);  GPR_ASSERT(server->listeners_destroyed == num_listeners(server));  while (server->listeners) {    l = server->listeners;    server->listeners = l->next;    gpr_free(l);  }  gpr_mu_unlock(&server->mu_global);  server_unref(server);}void grpc_server_add_listener(grpc_server* server, void* arg,                              void (*start)(grpc_server* server, void* arg,                                            grpc_pollset** pollsets,                                            size_t pollset_count),                              void (*destroy)(grpc_server* server, void* arg,                                              grpc_closure* on_done),                              intptr_t socket_uuid) {  listener* l = static_cast<listener*>(gpr_malloc(sizeof(listener)));  l->arg = arg;  l->start = start;  l->destroy = destroy;  l->socket_uuid = socket_uuid;  l->next = server->listeners;  server->listeners = l;}static grpc_call_error queue_call_request(grpc_server* server, size_t cq_idx,                                          requested_call* rc) {  call_data* calld = nullptr;  request_matcher* rm = nullptr;  if (gpr_atm_acq_load(&server->shutdown_flag)) {    fail_call(server, cq_idx, rc,              GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown"));    return GRPC_CALL_OK;  }  switch (rc->type) {    case BATCH_CALL:      rm = &server->unregistered_request_matcher;      break;    case REGISTERED_CALL:      rm = &rc->data.registered.method->matcher;      break;  }  if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) {    /* this was the first queued request: we need to lock and start       matching calls */    gpr_mu_lock(&server->mu_call);    while ((calld = rm->pending_head) != nullptr) {      rc = reinterpret_cast<requested_call*>(          gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]));      if (rc == nullptr) break;      rm->pending_head = calld->pending_next;      gpr_mu_unlock(&server->mu_call);      if (!gpr_atm_full_cas(&calld->state, PENDING, ACTIVATED)) {        // Zombied Call        GRPC_CLOSURE_INIT(            &calld->kill_zombie_closure, kill_zombie,            grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),            grpc_schedule_on_exec_ctx);        GRPC_CLOSURE_SCHED(&calld->kill_zombie_closure, GRPC_ERROR_NONE);      } else {        publish_call(server, calld, cq_idx, rc);      }      gpr_mu_lock(&server->mu_call);    }    gpr_mu_unlock(&server->mu_call);  }  return GRPC_CALL_OK;}grpc_call_error grpc_server_request_call(    grpc_server* server, grpc_call** call, grpc_call_details* details,    grpc_metadata_array* initial_metadata,    grpc_completion_queue* cq_bound_to_call,    grpc_completion_queue* cq_for_notification, void* tag) {  grpc_call_error error;  grpc_core::ExecCtx exec_ctx;  requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));  GRPC_STATS_INC_SERVER_REQUESTED_CALLS();  GRPC_API_TRACE(      "grpc_server_request_call("      "server=%p, call=%p, details=%p, initial_metadata=%p, "      "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",      7,      (server, call, details, initial_metadata, cq_bound_to_call,       cq_for_notification, tag));  size_t cq_idx;  for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {    if (server->cqs[cq_idx] == cq_for_notification) {      break;    }  }  if (cq_idx == server->cq_count) {    gpr_free(rc);    error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;    goto done;  }  if (grpc_cq_begin_op(cq_for_notification, tag) == false) {    gpr_free(rc);    error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;    goto done;  }  details->reserved = nullptr;  rc->cq_idx = cq_idx;  rc->type = BATCH_CALL;  rc->server = server;  rc->tag = tag;  rc->cq_bound_to_call = cq_bound_to_call;  rc->call = call;  rc->data.batch.details = details;  rc->initial_metadata = initial_metadata;  error = queue_call_request(server, cq_idx, rc);done:  return error;}grpc_call_error grpc_server_request_registered_call(    grpc_server* server, void* rmp, grpc_call** call, gpr_timespec* deadline,    grpc_metadata_array* initial_metadata, grpc_byte_buffer** optional_payload,    grpc_completion_queue* cq_bound_to_call,    grpc_completion_queue* cq_for_notification, void* tag) {  grpc_call_error error;  grpc_core::ExecCtx exec_ctx;  requested_call* rc = static_cast<requested_call*>(gpr_malloc(sizeof(*rc)));  registered_method* rm = static_cast<registered_method*>(rmp);  GRPC_STATS_INC_SERVER_REQUESTED_CALLS();  GRPC_API_TRACE(      "grpc_server_request_registered_call("      "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "      "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "      "tag=%p)",      9,      (server, rmp, call, deadline, initial_metadata, optional_payload,       cq_bound_to_call, cq_for_notification, tag));  size_t cq_idx;  for (cq_idx = 0; cq_idx < server->cq_count; cq_idx++) {    if (server->cqs[cq_idx] == cq_for_notification) {      break;    }  }  if (cq_idx == server->cq_count) {    gpr_free(rc);    error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;    goto done;  }  if ((optional_payload == nullptr) !=      (rm->payload_handling == GRPC_SRM_PAYLOAD_NONE)) {    gpr_free(rc);    error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;    goto done;  }  if (grpc_cq_begin_op(cq_for_notification, tag) == false) {    gpr_free(rc);    error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;    goto done;  }  rc->cq_idx = cq_idx;  rc->type = REGISTERED_CALL;  rc->server = server;  rc->tag = tag;  rc->cq_bound_to_call = cq_bound_to_call;  rc->call = call;  rc->data.registered.method = rm;  rc->data.registered.deadline = deadline;  rc->initial_metadata = initial_metadata;  rc->data.registered.optional_payload = optional_payload;  error = queue_call_request(server, cq_idx, rc);done:  return error;}static void fail_call(grpc_server* server, size_t cq_idx, requested_call* rc,                      grpc_error* error) {  *rc->call = nullptr;  rc->initial_metadata->count = 0;  GPR_ASSERT(error != GRPC_ERROR_NONE);  grpc_cq_end_op(server->cqs[cq_idx], rc->tag, error, done_request_event, rc,                 &rc->completion);}const grpc_channel_args* grpc_server_get_channel_args(grpc_server* server) {  return server->channel_args;}grpc_resource_user* grpc_server_get_default_resource_user(grpc_server* server) {  return server->default_resource_user;}int grpc_server_has_open_connections(grpc_server* server) {  int r;  gpr_mu_lock(&server->mu_global);  r = server->root_channel_data.next != &server->root_channel_data;  gpr_mu_unlock(&server->mu_global);  return r;}grpc_core::channelz::ServerNode* grpc_server_get_channelz_node(    grpc_server* server) {  if (server == nullptr) {    return nullptr;  }  return server->channelz_server.get();}
 |