|  | @@ -18,203 +18,311 @@
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #include <grpc/support/port_platform.h>
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -#include <string.h>
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | +#include <grpc/grpc_security.h>
 | 
	
		
			
				|  |  |  #include <grpc/load_reporting.h>
 | 
	
		
			
				|  |  | +#include <grpc/slice.h>
 | 
	
		
			
				|  |  |  #include <grpc/support/alloc.h>
 | 
	
		
			
				|  |  |  #include <grpc/support/log.h>
 | 
	
		
			
				|  |  |  #include <grpc/support/string_util.h>
 | 
	
		
			
				|  |  | -#include <grpc/support/sync.h>
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +#include "src/core/ext/filters/client_channel/parse_address.h"
 | 
	
		
			
				|  |  | +#include "src/core/ext/filters/client_channel/uri_parser.h"
 | 
	
		
			
				|  |  | +#include "src/core/ext/filters/load_reporting/registered_opencensus_objects.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/filters/load_reporting/server_load_reporting_filter.h"
 | 
	
		
			
				|  |  | -#include "src/core/ext/filters/load_reporting/server_load_reporting_plugin.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/channel/channel_args.h"
 | 
	
		
			
				|  |  | +#include "src/core/lib/channel/context.h"
 | 
	
		
			
				|  |  | +#include "src/core/lib/gpr/string.h"
 | 
	
		
			
				|  |  | +#include "src/core/lib/iomgr/resolve_address.h"
 | 
	
		
			
				|  |  | +#include "src/core/lib/iomgr/sockaddr_posix.h"
 | 
	
		
			
				|  |  | +#include "src/core/lib/iomgr/socket_utils.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/profiling/timers.h"
 | 
	
		
			
				|  |  | +#include "src/core/lib/security/context/security_context.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/slice/slice_internal.h"
 | 
	
		
			
				|  |  | +#include "src/core/lib/surface/call.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/transport/static_metadata.h"
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -namespace {
 | 
	
		
			
				|  |  | -struct call_data {
 | 
	
		
			
				|  |  | -  intptr_t id; /**< an id unique to the call */
 | 
	
		
			
				|  |  | -  bool have_trailing_md_string;
 | 
	
		
			
				|  |  | -  grpc_slice trailing_md_string;
 | 
	
		
			
				|  |  | -  bool have_initial_md_string;
 | 
	
		
			
				|  |  | -  grpc_slice initial_md_string;
 | 
	
		
			
				|  |  | -  bool have_service_method;
 | 
	
		
			
				|  |  | -  grpc_slice service_method;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* stores the recv_initial_metadata op's ready closure, which we wrap with our
 | 
	
		
			
				|  |  | -   * own (on_initial_md_ready) in order to capture the incoming initial metadata
 | 
	
		
			
				|  |  | -   * */
 | 
	
		
			
				|  |  | -  grpc_closure* ops_recv_initial_metadata_ready;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* to get notified of the availability of the incoming initial metadata. */
 | 
	
		
			
				|  |  | -  grpc_closure on_initial_md_ready;
 | 
	
		
			
				|  |  | -  grpc_metadata_batch* recv_initial_metadata;
 | 
	
		
			
				|  |  | -};
 | 
	
		
			
				|  |  | +namespace grpc {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -struct channel_data {
 | 
	
		
			
				|  |  | -  intptr_t id; /**< an id unique to the channel */
 | 
	
		
			
				|  |  | -};
 | 
	
		
			
				|  |  | -}  // namespace
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void on_initial_md_ready(void* user_data, grpc_error* err) {
 | 
	
		
			
				|  |  | -  grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
 | 
	
		
			
				|  |  | -  call_data* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  if (err == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | -    if (calld->recv_initial_metadata->idx.named.path != nullptr) {
 | 
	
		
			
				|  |  | -      calld->service_method = grpc_slice_ref_internal(
 | 
	
		
			
				|  |  | -          GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.path->md));
 | 
	
		
			
				|  |  | -      calld->have_service_method = true;
 | 
	
		
			
				|  |  | -    } else {
 | 
	
		
			
				|  |  | -      err = grpc_error_add_child(
 | 
	
		
			
				|  |  | -          err, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing :path header"));
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    if (calld->recv_initial_metadata->idx.named.lb_token != nullptr) {
 | 
	
		
			
				|  |  | -      calld->initial_md_string = grpc_slice_ref_internal(
 | 
	
		
			
				|  |  | -          GRPC_MDVALUE(calld->recv_initial_metadata->idx.named.lb_token->md));
 | 
	
		
			
				|  |  | -      calld->have_initial_md_string = true;
 | 
	
		
			
				|  |  | -      grpc_metadata_batch_remove(
 | 
	
		
			
				|  |  | -          calld->recv_initial_metadata,
 | 
	
		
			
				|  |  | -          calld->recv_initial_metadata->idx.named.lb_token);
 | 
	
		
			
				|  |  | +grpc_error* ServerLoadReportingChannelData::Init(
 | 
	
		
			
				|  |  | +    grpc_channel_element* /* elem */, grpc_channel_element_args* args) {
 | 
	
		
			
				|  |  | +  GPR_ASSERT(!args->is_last);
 | 
	
		
			
				|  |  | +  // Find and record the peer_identity.
 | 
	
		
			
				|  |  | +  const grpc_auth_context* auth_context =
 | 
	
		
			
				|  |  | +      grpc_find_auth_context_in_args(args->channel_args);
 | 
	
		
			
				|  |  | +  if (auth_context != nullptr &&
 | 
	
		
			
				|  |  | +      grpc_auth_context_peer_is_authenticated(auth_context)) {
 | 
	
		
			
				|  |  | +    grpc_auth_property_iterator auth_it =
 | 
	
		
			
				|  |  | +        grpc_auth_context_peer_identity(auth_context);
 | 
	
		
			
				|  |  | +    const grpc_auth_property* auth_property =
 | 
	
		
			
				|  |  | +        grpc_auth_property_iterator_next(&auth_it);
 | 
	
		
			
				|  |  | +    if (auth_property != nullptr) {
 | 
	
		
			
				|  |  | +      peer_identity_ = auth_property->value;
 | 
	
		
			
				|  |  | +      peer_identity_len_ = auth_property->value_length;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | -    GRPC_ERROR_REF(err);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  GRPC_CLOSURE_RUN(calld->ops_recv_initial_metadata_ready, err);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -/* Constructor for call_data */
 | 
	
		
			
				|  |  | -static grpc_error* init_call_elem(grpc_call_element* elem,
 | 
	
		
			
				|  |  | -                                  const grpc_call_element_args* args) {
 | 
	
		
			
				|  |  | -  call_data* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  | -  calld->id = (intptr_t)args->call_stack;
 | 
	
		
			
				|  |  | -  GRPC_CLOSURE_INIT(&calld->on_initial_md_ready, on_initial_md_ready, elem,
 | 
	
		
			
				|  |  | -                    grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  /* TODO(dgq): do something with the data
 | 
	
		
			
				|  |  | -  channel_data *chand = elem->channel_data;
 | 
	
		
			
				|  |  | -  grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_CREATION,
 | 
	
		
			
				|  |  | -                                                (intptr_t)chand->id,
 | 
	
		
			
				|  |  | -                                                (intptr_t)calld->id,
 | 
	
		
			
				|  |  | -                                                NULL,
 | 
	
		
			
				|  |  | -                                                NULL,
 | 
	
		
			
				|  |  | -                                                NULL,
 | 
	
		
			
				|  |  | -                                                NULL};
 | 
	
		
			
				|  |  | -  */
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    return GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -/* Destructor for call_data */
 | 
	
		
			
				|  |  | -static void destroy_call_elem(grpc_call_element* elem,
 | 
	
		
			
				|  |  | -                              const grpc_call_final_info* final_info,
 | 
	
		
			
				|  |  | -                              grpc_closure* ignored) {
 | 
	
		
			
				|  |  | -  call_data* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  | +void ServerLoadReportingCallData::Destroy(
 | 
	
		
			
				|  |  | +    grpc_call_element* elem, const grpc_call_final_info* final_info,
 | 
	
		
			
				|  |  | +    grpc_closure* then_call_closure) {
 | 
	
		
			
				|  |  | +  ServerLoadReportingChannelData* chand =
 | 
	
		
			
				|  |  | +      reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
 | 
	
		
			
				|  |  | +  // Only record an end if we've recorded its corresponding start, which is
 | 
	
		
			
				|  |  | +  // indicated by a non-null client_ip_and_lr_token_. Note that it's possible
 | 
	
		
			
				|  |  | +  // that we attempt to record the call end before we have recorded the call
 | 
	
		
			
				|  |  | +  // start, because the data needed for recording the start comes from the
 | 
	
		
			
				|  |  | +  // initial metadata, which may not be ready before the call finishes.
 | 
	
		
			
				|  |  | +  if (client_ip_and_lr_token_ != nullptr) {
 | 
	
		
			
				|  |  | +    opencensus::stats::Record(
 | 
	
		
			
				|  |  | +        {{::grpc::load_reporter::MeasureEndCount(), 1},
 | 
	
		
			
				|  |  | +         {::grpc::load_reporter::MeasureEndBytesSent(),
 | 
	
		
			
				|  |  | +          final_info->stats.transport_stream_stats.outgoing.data_bytes},
 | 
	
		
			
				|  |  | +         {::grpc::load_reporter::MeasureEndBytesReceived(),
 | 
	
		
			
				|  |  | +          final_info->stats.transport_stream_stats.incoming.data_bytes},
 | 
	
		
			
				|  |  | +         {::grpc::load_reporter::MeasureEndLatencyMs(),
 | 
	
		
			
				|  |  | +          gpr_time_to_millis(final_info->stats.latency)}},
 | 
	
		
			
				|  |  | +        {{::grpc::load_reporter::TagKeyToken(),
 | 
	
		
			
				|  |  | +          {client_ip_and_lr_token_, client_ip_and_lr_token_len_}},
 | 
	
		
			
				|  |  | +         {::grpc::load_reporter::TagKeyHost(),
 | 
	
		
			
				|  |  | +          {target_host_, target_host_len_}},
 | 
	
		
			
				|  |  | +         {::grpc::load_reporter::TagKeyUserId(),
 | 
	
		
			
				|  |  | +          {chand->peer_identity(), chand->peer_identity_len()}},
 | 
	
		
			
				|  |  | +         {::grpc::load_reporter::TagKeyStatus(),
 | 
	
		
			
				|  |  | +          GetStatusTagForStatus(final_info->final_status)}});
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  grpc_slice_unref_internal(service_method_);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  /* TODO(dgq): do something with the data
 | 
	
		
			
				|  |  | -  channel_data *chand = elem->channel_data;
 | 
	
		
			
				|  |  | -  grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CALL_DESTRUCTION,
 | 
	
		
			
				|  |  | -                                                (intptr_t)chand->id,
 | 
	
		
			
				|  |  | -                                                (intptr_t)calld->id,
 | 
	
		
			
				|  |  | -                                                final_info,
 | 
	
		
			
				|  |  | -                                                calld->initial_md_string,
 | 
	
		
			
				|  |  | -                                                calld->trailing_md_string,
 | 
	
		
			
				|  |  | -                                                calld->service_method};
 | 
	
		
			
				|  |  | -  */
 | 
	
		
			
				|  |  | +void ServerLoadReportingCallData::StartTransportStreamOpBatch(
 | 
	
		
			
				|  |  | +    grpc_call_element* elem, TransportStreamOpBatch* op) {
 | 
	
		
			
				|  |  | +  GPR_TIMER_SCOPE("lr_start_transport_stream_op", 0);
 | 
	
		
			
				|  |  | +  if (op->recv_initial_metadata() != nullptr) {
 | 
	
		
			
				|  |  | +    // Save some fields to use when initial metadata is ready.
 | 
	
		
			
				|  |  | +    peer_string_ = op->get_peer_string();
 | 
	
		
			
				|  |  | +    recv_initial_metadata_ = op->recv_initial_metadata();
 | 
	
		
			
				|  |  | +    original_recv_initial_metadata_ready_ = op->recv_initial_metadata_ready();
 | 
	
		
			
				|  |  | +    // Substitute the original closure for the wrapper closure.
 | 
	
		
			
				|  |  | +    op->set_recv_initial_metadata_ready(&recv_initial_metadata_ready_);
 | 
	
		
			
				|  |  | +  } else if (op->send_trailing_metadata() != nullptr) {
 | 
	
		
			
				|  |  | +    GRPC_LOG_IF_ERROR(
 | 
	
		
			
				|  |  | +        "server_load_reporting_filter",
 | 
	
		
			
				|  |  | +        grpc_metadata_batch_filter(op->send_trailing_metadata()->batch(),
 | 
	
		
			
				|  |  | +                                   SendTrailingMetadataFilter, elem,
 | 
	
		
			
				|  |  | +                                   "send_trailing_metadata filtering error"));
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  grpc_call_next_op(elem, op->op());
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  if (calld->have_initial_md_string) {
 | 
	
		
			
				|  |  | -    grpc_slice_unref_internal(calld->initial_md_string);
 | 
	
		
			
				|  |  | +void ServerLoadReportingCallData::GetCensusSafeClientIpString(
 | 
	
		
			
				|  |  | +    char** client_ip_string, size_t* size) {
 | 
	
		
			
				|  |  | +  // Find the client URI string.
 | 
	
		
			
				|  |  | +  const char* client_uri_str =
 | 
	
		
			
				|  |  | +      reinterpret_cast<const char*>(gpr_atm_acq_load(peer_string_));
 | 
	
		
			
				|  |  | +  if (client_uri_str == nullptr) {
 | 
	
		
			
				|  |  | +    gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | +            "Unable to extract client URI string (peer string) from gRPC "
 | 
	
		
			
				|  |  | +            "metadata.");
 | 
	
		
			
				|  |  | +    *client_ip_string = nullptr;
 | 
	
		
			
				|  |  | +    *size = 0;
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (calld->have_trailing_md_string) {
 | 
	
		
			
				|  |  | -    grpc_slice_unref_internal(calld->trailing_md_string);
 | 
	
		
			
				|  |  | +  // Parse the client URI string into grpc_uri.
 | 
	
		
			
				|  |  | +  grpc_uri* client_uri = grpc_uri_parse(client_uri_str, true);
 | 
	
		
			
				|  |  | +  if (client_uri == nullptr) {
 | 
	
		
			
				|  |  | +    gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | +            "Unable to parse the client URI string (peer string) to a client "
 | 
	
		
			
				|  |  | +            "URI.");
 | 
	
		
			
				|  |  | +    *client_ip_string = nullptr;
 | 
	
		
			
				|  |  | +    *size = 0;
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  if (calld->have_service_method) {
 | 
	
		
			
				|  |  | -    grpc_slice_unref_internal(calld->service_method);
 | 
	
		
			
				|  |  | +  // Parse the client URI into grpc_resolved_address.
 | 
	
		
			
				|  |  | +  grpc_resolved_address resolved_address;
 | 
	
		
			
				|  |  | +  bool success = grpc_parse_uri(client_uri, &resolved_address);
 | 
	
		
			
				|  |  | +  grpc_uri_destroy(client_uri);
 | 
	
		
			
				|  |  | +  if (!success) {
 | 
	
		
			
				|  |  | +    gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | +            "Unable to parse client URI into a grpc_resolved_address.");
 | 
	
		
			
				|  |  | +    *client_ip_string = nullptr;
 | 
	
		
			
				|  |  | +    *size = 0;
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // Convert the socket address in the grpc_resolved_address into a hex string
 | 
	
		
			
				|  |  | +  // according to the address family.
 | 
	
		
			
				|  |  | +  grpc_sockaddr* addr = reinterpret_cast<grpc_sockaddr*>(resolved_address.addr);
 | 
	
		
			
				|  |  | +  if (addr->sa_family == GRPC_AF_INET) {
 | 
	
		
			
				|  |  | +    grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(addr);
 | 
	
		
			
				|  |  | +    gpr_asprintf(client_ip_string, "%08x", grpc_ntohl(addr4->sin_addr.s_addr));
 | 
	
		
			
				|  |  | +    *size = 8;
 | 
	
		
			
				|  |  | +  } else if (addr->sa_family == GRPC_AF_INET6) {
 | 
	
		
			
				|  |  | +    grpc_sockaddr_in6* addr6 = reinterpret_cast<grpc_sockaddr_in6*>(addr);
 | 
	
		
			
				|  |  | +    *client_ip_string = static_cast<char*>(gpr_malloc(32));
 | 
	
		
			
				|  |  | +    for (size_t i = 0; i < 16; ++i) {
 | 
	
		
			
				|  |  | +      sprintf(*client_ip_string + i, "%02x",
 | 
	
		
			
				|  |  | +              addr6->sin6_addr.__in6_u.__u6_addr8[i]);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    *size = 32;
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    GPR_UNREACHABLE_CODE();
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -/* Constructor for channel_data */
 | 
	
		
			
				|  |  | -static grpc_error* init_channel_elem(grpc_channel_element* elem,
 | 
	
		
			
				|  |  | -                                     grpc_channel_element_args* args) {
 | 
	
		
			
				|  |  | -  GPR_ASSERT(!args->is_last);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  channel_data* chand = static_cast<channel_data*>(elem->channel_data);
 | 
	
		
			
				|  |  | -  chand->id = (intptr_t)args->channel_stack;
 | 
	
		
			
				|  |  | +void ServerLoadReportingCallData::StoreClientIpAndLrToken(const char* lr_token,
 | 
	
		
			
				|  |  | +                                                          size_t lr_token_len) {
 | 
	
		
			
				|  |  | +  char* client_ip;
 | 
	
		
			
				|  |  | +  size_t client_ip_len;
 | 
	
		
			
				|  |  | +  GetCensusSafeClientIpString(&client_ip, &client_ip_len);
 | 
	
		
			
				|  |  | +  client_ip_and_lr_token_len_ =
 | 
	
		
			
				|  |  | +      kLengthPrefixSize + client_ip_len + lr_token_len;
 | 
	
		
			
				|  |  | +  client_ip_and_lr_token_ = static_cast<char*>(
 | 
	
		
			
				|  |  | +      gpr_zalloc(client_ip_and_lr_token_len_ * sizeof(char)));
 | 
	
		
			
				|  |  | +  char* cur_pos = client_ip_and_lr_token_;
 | 
	
		
			
				|  |  | +  // Store the IP length prefix.
 | 
	
		
			
				|  |  | +  if (client_ip_len == 0) {
 | 
	
		
			
				|  |  | +    strncpy(cur_pos, kEmptyAddressLengthString, kLengthPrefixSize);
 | 
	
		
			
				|  |  | +  } else if (client_ip_len == 8) {
 | 
	
		
			
				|  |  | +    strncpy(cur_pos, kEncodedIpv4AddressLengthString, kLengthPrefixSize);
 | 
	
		
			
				|  |  | +  } else if (client_ip_len == 32) {
 | 
	
		
			
				|  |  | +    strncpy(cur_pos, kEncodedIpv6AddressLengthString, kLengthPrefixSize);
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    GPR_UNREACHABLE_CODE();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  cur_pos += kLengthPrefixSize;
 | 
	
		
			
				|  |  | +  // Store the IP.
 | 
	
		
			
				|  |  | +  if (client_ip_len != 0) {
 | 
	
		
			
				|  |  | +    strncpy(cur_pos, client_ip, client_ip_len);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  gpr_free(client_ip);
 | 
	
		
			
				|  |  | +  cur_pos += client_ip_len;
 | 
	
		
			
				|  |  | +  // Store the LR token.
 | 
	
		
			
				|  |  | +  if (lr_token_len != 0) {
 | 
	
		
			
				|  |  | +    strncpy(cur_pos, lr_token, lr_token_len);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  GPR_ASSERT(cur_pos + lr_token_len - client_ip_and_lr_token_ ==
 | 
	
		
			
				|  |  | +             client_ip_and_lr_token_len_);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  /* TODO(dgq): do something with the data
 | 
	
		
			
				|  |  | -  grpc_load_reporting_call_data lr_call_data = {GRPC_LR_POINT_CHANNEL_CREATION,
 | 
	
		
			
				|  |  | -                                                (intptr_t)chand,
 | 
	
		
			
				|  |  | -                                                0,
 | 
	
		
			
				|  |  | -                                                NULL,
 | 
	
		
			
				|  |  | -                                                NULL,
 | 
	
		
			
				|  |  | -                                                NULL,
 | 
	
		
			
				|  |  | -                                                NULL};
 | 
	
		
			
				|  |  | -                                                */
 | 
	
		
			
				|  |  | +grpc_filtered_mdelem ServerLoadReportingCallData::RecvInitialMetadataFilter(
 | 
	
		
			
				|  |  | +    void* user_data, grpc_mdelem md) {
 | 
	
		
			
				|  |  | +  grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
 | 
	
		
			
				|  |  | +  ServerLoadReportingCallData* calld =
 | 
	
		
			
				|  |  | +      reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
 | 
	
		
			
				|  |  | +  if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_PATH)) {
 | 
	
		
			
				|  |  | +    calld->service_method_ = grpc_slice_ref_internal(GRPC_MDVALUE(md));
 | 
	
		
			
				|  |  | +  } else if (calld->target_host_ == nullptr &&
 | 
	
		
			
				|  |  | +             grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_AUTHORITY)) {
 | 
	
		
			
				|  |  | +    grpc_slice target_host_slice = GRPC_MDVALUE(md);
 | 
	
		
			
				|  |  | +    calld->target_host_len_ = GRPC_SLICE_LENGTH(target_host_slice);
 | 
	
		
			
				|  |  | +    calld->target_host_ =
 | 
	
		
			
				|  |  | +        reinterpret_cast<char*>(gpr_zalloc(calld->target_host_len_));
 | 
	
		
			
				|  |  | +    for (size_t i = 0; i < calld->target_host_len_; ++i) {
 | 
	
		
			
				|  |  | +      calld->target_host_[i] = static_cast<char>(
 | 
	
		
			
				|  |  | +          tolower(GRPC_SLICE_START_PTR(target_host_slice)[i]));
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  } else if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_TOKEN)) {
 | 
	
		
			
				|  |  | +    if (calld->client_ip_and_lr_token_ == nullptr) {
 | 
	
		
			
				|  |  | +      calld->StoreClientIpAndLrToken(
 | 
	
		
			
				|  |  | +          reinterpret_cast<const char*> GRPC_SLICE_START_PTR(GRPC_MDVALUE(md)),
 | 
	
		
			
				|  |  | +          GRPC_SLICE_LENGTH(GRPC_MDVALUE(md)));
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    return GRPC_FILTERED_REMOVE();
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  return GRPC_FILTERED_MDELEM(md);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  return GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  | +void ServerLoadReportingCallData::RecvInitialMetadataReady(void* arg,
 | 
	
		
			
				|  |  | +                                                           grpc_error* err) {
 | 
	
		
			
				|  |  | +  grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(arg);
 | 
	
		
			
				|  |  | +  ServerLoadReportingCallData* calld =
 | 
	
		
			
				|  |  | +      reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
 | 
	
		
			
				|  |  | +  ServerLoadReportingChannelData* chand =
 | 
	
		
			
				|  |  | +      reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
 | 
	
		
			
				|  |  | +  if (err == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +    GRPC_LOG_IF_ERROR(
 | 
	
		
			
				|  |  | +        "server_load_reporting_filter",
 | 
	
		
			
				|  |  | +        grpc_metadata_batch_filter(calld->recv_initial_metadata_->batch(),
 | 
	
		
			
				|  |  | +                                   RecvInitialMetadataFilter, elem,
 | 
	
		
			
				|  |  | +                                   "recv_initial_metadata filtering error"));
 | 
	
		
			
				|  |  | +    // If the LB token was not found in the recv_initial_metadata, only the
 | 
	
		
			
				|  |  | +    // client IP part will be recorded (with an empty LB token).
 | 
	
		
			
				|  |  | +    if (calld->client_ip_and_lr_token_ == nullptr) {
 | 
	
		
			
				|  |  | +      calld->StoreClientIpAndLrToken(nullptr, 0);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    opencensus::stats::Record(
 | 
	
		
			
				|  |  | +        {{::grpc::load_reporter::MeasureStartCount(), 1}},
 | 
	
		
			
				|  |  | +        {{::grpc::load_reporter::TagKeyToken(),
 | 
	
		
			
				|  |  | +          {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}},
 | 
	
		
			
				|  |  | +         {::grpc::load_reporter::TagKeyHost(),
 | 
	
		
			
				|  |  | +          {calld->target_host_, calld->target_host_len_}},
 | 
	
		
			
				|  |  | +         {::grpc::load_reporter::TagKeyUserId(),
 | 
	
		
			
				|  |  | +          {chand->peer_identity(), chand->peer_identity_len()}}});
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  GRPC_CLOSURE_RUN(calld->original_recv_initial_metadata_ready_,
 | 
	
		
			
				|  |  | +                   GRPC_ERROR_REF(err));
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -/* Destructor for channel data */
 | 
	
		
			
				|  |  | -static void destroy_channel_elem(grpc_channel_element* elem) {
 | 
	
		
			
				|  |  | -  /* TODO(dgq): do something with the data
 | 
	
		
			
				|  |  | -  channel_data *chand = elem->channel_data;
 | 
	
		
			
				|  |  | -  grpc_load_reporting_call_data lr_call_data = {
 | 
	
		
			
				|  |  | -      GRPC_LR_POINT_CHANNEL_DESTRUCTION,
 | 
	
		
			
				|  |  | -      (intptr_t)chand->id,
 | 
	
		
			
				|  |  | -      0,
 | 
	
		
			
				|  |  | -      NULL,
 | 
	
		
			
				|  |  | -      NULL,
 | 
	
		
			
				|  |  | -      NULL,
 | 
	
		
			
				|  |  | -      NULL};
 | 
	
		
			
				|  |  | -  */
 | 
	
		
			
				|  |  | +grpc_error* ServerLoadReportingCallData::Init(
 | 
	
		
			
				|  |  | +    grpc_call_element* elem, const grpc_call_element_args* args) {
 | 
	
		
			
				|  |  | +  service_method_ = grpc_empty_slice();
 | 
	
		
			
				|  |  | +  GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
 | 
	
		
			
				|  |  | +                    elem, grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  | +  return GRPC_ERROR_NONE;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static grpc_filtered_mdelem lr_trailing_md_filter(void* user_data,
 | 
	
		
			
				|  |  | -                                                  grpc_mdelem md) {
 | 
	
		
			
				|  |  | -  grpc_call_element* elem = static_cast<grpc_call_element*>(user_data);
 | 
	
		
			
				|  |  | -  call_data* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  | +grpc_filtered_mdelem ServerLoadReportingCallData::SendTrailingMetadataFilter(
 | 
	
		
			
				|  |  | +    void* user_data, grpc_mdelem md) {
 | 
	
		
			
				|  |  | +  grpc_call_element* elem = reinterpret_cast<grpc_call_element*>(user_data);
 | 
	
		
			
				|  |  | +  ServerLoadReportingCallData* calld =
 | 
	
		
			
				|  |  | +      reinterpret_cast<ServerLoadReportingCallData*>(elem->call_data);
 | 
	
		
			
				|  |  | +  ServerLoadReportingChannelData* chand =
 | 
	
		
			
				|  |  | +      reinterpret_cast<ServerLoadReportingChannelData*>(elem->channel_data);
 | 
	
		
			
				|  |  | +  // TODO(juanlishen): GRPC_MDSTR_LB_COST_BIN meaning?
 | 
	
		
			
				|  |  |    if (grpc_slice_eq(GRPC_MDKEY(md), GRPC_MDSTR_LB_COST_BIN)) {
 | 
	
		
			
				|  |  | -    calld->trailing_md_string = GRPC_MDVALUE(md);
 | 
	
		
			
				|  |  | +    const grpc_slice value = GRPC_MDVALUE(md);
 | 
	
		
			
				|  |  | +    const size_t cost_entry_size = GRPC_SLICE_LENGTH(value);
 | 
	
		
			
				|  |  | +    if (cost_entry_size < sizeof(double)) {
 | 
	
		
			
				|  |  | +      gpr_log(GPR_ERROR,
 | 
	
		
			
				|  |  | +              "Cost metadata value too small (%zu bytes) to hold valid data. "
 | 
	
		
			
				|  |  | +              "Ignoring.",
 | 
	
		
			
				|  |  | +              cost_entry_size);
 | 
	
		
			
				|  |  | +      return GRPC_FILTERED_REMOVE();
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    const double* cost_entry_ptr =
 | 
	
		
			
				|  |  | +        reinterpret_cast<const double*>(GRPC_SLICE_START_PTR(value));
 | 
	
		
			
				|  |  | +    double cost_value = *cost_entry_ptr++;
 | 
	
		
			
				|  |  | +    const char* cost_name = reinterpret_cast<const char*>(cost_entry_ptr);
 | 
	
		
			
				|  |  | +    const size_t cost_name_len = cost_entry_size - sizeof(double);
 | 
	
		
			
				|  |  | +    opencensus::stats::Record(
 | 
	
		
			
				|  |  | +        {{::grpc::load_reporter::MeasureOtherCallMetric(), cost_value}},
 | 
	
		
			
				|  |  | +        {{::grpc::load_reporter::TagKeyToken(),
 | 
	
		
			
				|  |  | +          {calld->client_ip_and_lr_token_, calld->client_ip_and_lr_token_len_}},
 | 
	
		
			
				|  |  | +         {::grpc::load_reporter::TagKeyHost(),
 | 
	
		
			
				|  |  | +          {calld->target_host_, calld->target_host_len_}},
 | 
	
		
			
				|  |  | +         {::grpc::load_reporter::TagKeyUserId(),
 | 
	
		
			
				|  |  | +          {chand->peer_identity(), chand->peer_identity_len()}},
 | 
	
		
			
				|  |  | +         {::grpc::load_reporter::TagKeyMetricName(),
 | 
	
		
			
				|  |  | +          {cost_name, cost_name_len}}});
 | 
	
		
			
				|  |  |      return GRPC_FILTERED_REMOVE();
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    return GRPC_FILTERED_MDELEM(md);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -static void lr_start_transport_stream_op_batch(
 | 
	
		
			
				|  |  | -    grpc_call_element* elem, grpc_transport_stream_op_batch* op) {
 | 
	
		
			
				|  |  | -  GPR_TIMER_SCOPE("lr_start_transport_stream_op_batch", 0);
 | 
	
		
			
				|  |  | -  call_data* calld = static_cast<call_data*>(elem->call_data);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  if (op->recv_initial_metadata) {
 | 
	
		
			
				|  |  | -    /* substitute our callback for the higher callback */
 | 
	
		
			
				|  |  | -    calld->recv_initial_metadata =
 | 
	
		
			
				|  |  | -        op->payload->recv_initial_metadata.recv_initial_metadata;
 | 
	
		
			
				|  |  | -    calld->ops_recv_initial_metadata_ready =
 | 
	
		
			
				|  |  | -        op->payload->recv_initial_metadata.recv_initial_metadata_ready;
 | 
	
		
			
				|  |  | -    op->payload->recv_initial_metadata.recv_initial_metadata_ready =
 | 
	
		
			
				|  |  | -        &calld->on_initial_md_ready;
 | 
	
		
			
				|  |  | -  } else if (op->send_trailing_metadata) {
 | 
	
		
			
				|  |  | -    GRPC_LOG_IF_ERROR(
 | 
	
		
			
				|  |  | -        "grpc_metadata_batch_filter",
 | 
	
		
			
				|  |  | -        grpc_metadata_batch_filter(
 | 
	
		
			
				|  |  | -            op->payload->send_trailing_metadata.send_trailing_metadata,
 | 
	
		
			
				|  |  | -            lr_trailing_md_filter, elem,
 | 
	
		
			
				|  |  | -            "LR trailing metadata filtering error"));
 | 
	
		
			
				|  |  | +const char* ServerLoadReportingCallData::GetStatusTagForStatus(
 | 
	
		
			
				|  |  | +    grpc_status_code status) {
 | 
	
		
			
				|  |  | +  switch (status) {
 | 
	
		
			
				|  |  | +    case GRPC_STATUS_OK:
 | 
	
		
			
				|  |  | +      return ::grpc::load_reporter::kCallStatusOk;
 | 
	
		
			
				|  |  | +    case GRPC_STATUS_UNKNOWN:
 | 
	
		
			
				|  |  | +    case GRPC_STATUS_DEADLINE_EXCEEDED:
 | 
	
		
			
				|  |  | +    case GRPC_STATUS_UNIMPLEMENTED:
 | 
	
		
			
				|  |  | +    case GRPC_STATUS_INTERNAL:
 | 
	
		
			
				|  |  | +    case GRPC_STATUS_UNAVAILABLE:
 | 
	
		
			
				|  |  | +    case GRPC_STATUS_DATA_LOSS:
 | 
	
		
			
				|  |  | +      return ::grpc::load_reporter::kCallStatusServerError;
 | 
	
		
			
				|  |  | +    default:
 | 
	
		
			
				|  |  | +      return ::grpc::load_reporter::kCallStatusClientError;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  grpc_call_next_op(elem, op);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -const grpc_channel_filter grpc_server_load_reporting_filter = {
 | 
	
		
			
				|  |  | -    lr_start_transport_stream_op_batch,
 | 
	
		
			
				|  |  | -    grpc_channel_next_op,
 | 
	
		
			
				|  |  | -    sizeof(call_data),
 | 
	
		
			
				|  |  | -    init_call_elem,
 | 
	
		
			
				|  |  | -    grpc_call_stack_ignore_set_pollset_or_pollset_set,
 | 
	
		
			
				|  |  | -    destroy_call_elem,
 | 
	
		
			
				|  |  | -    sizeof(channel_data),
 | 
	
		
			
				|  |  | -    init_channel_elem,
 | 
	
		
			
				|  |  | -    destroy_channel_elem,
 | 
	
		
			
				|  |  | -    grpc_channel_next_get_info,
 | 
	
		
			
				|  |  | -    "load_reporting"};
 | 
	
		
			
				|  |  | +}  // namespace grpc
 |