|  | @@ -47,6 +47,7 @@
 | 
	
		
			
				|  |  |  #include "src/core/ext/client_channel/lb_policy_registry.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/client_channel/proxy_mapper_registry.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/client_channel/resolver_registry.h"
 | 
	
		
			
				|  |  | +#include "src/core/ext/client_channel/retry_throttle.h"
 | 
	
		
			
				|  |  |  #include "src/core/ext/client_channel/subchannel.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/channel/channel_args.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/channel/connected_channel.h"
 | 
	
	
		
			
				|  | @@ -189,6 +190,8 @@ typedef struct client_channel_channel_data {
 | 
	
		
			
				|  |  |    grpc_combiner *combiner;
 | 
	
		
			
				|  |  |    /** currently active load balancer */
 | 
	
		
			
				|  |  |    grpc_lb_policy *lb_policy;
 | 
	
		
			
				|  |  | +  /** retry throttle data */
 | 
	
		
			
				|  |  | +  grpc_server_retry_throttle_data *retry_throttle_data;
 | 
	
		
			
				|  |  |    /** maps method names to method_parameters structs */
 | 
	
		
			
				|  |  |    grpc_slice_hash_table *method_params_table;
 | 
	
		
			
				|  |  |    /** incoming resolver result - set by resolver.next() */
 | 
	
	
		
			
				|  | @@ -284,6 +287,65 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand,
 | 
	
		
			
				|  |  |                                                 &w->on_changed);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +typedef struct {
 | 
	
		
			
				|  |  | +  char *server_name;
 | 
	
		
			
				|  |  | +  grpc_server_retry_throttle_data *retry_throttle_data;
 | 
	
		
			
				|  |  | +} service_config_parsing_state;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +static void parse_retry_throttle_params(const grpc_json *field, void *arg) {
 | 
	
		
			
				|  |  | +  service_config_parsing_state *parsing_state = arg;
 | 
	
		
			
				|  |  | +  if (strcmp(field->key, "retryThrottling") == 0) {
 | 
	
		
			
				|  |  | +    if (parsing_state->retry_throttle_data != NULL) return;  // Duplicate.
 | 
	
		
			
				|  |  | +    if (field->type != GRPC_JSON_OBJECT) return;
 | 
	
		
			
				|  |  | +    int max_milli_tokens = 0;
 | 
	
		
			
				|  |  | +    int milli_token_ratio = 0;
 | 
	
		
			
				|  |  | +    for (grpc_json *sub_field = field->child; sub_field != NULL;
 | 
	
		
			
				|  |  | +         sub_field = sub_field->next) {
 | 
	
		
			
				|  |  | +      if (sub_field->key == NULL) return;
 | 
	
		
			
				|  |  | +      if (strcmp(sub_field->key, "maxTokens") == 0) {
 | 
	
		
			
				|  |  | +        if (max_milli_tokens != 0) return;  // Duplicate.
 | 
	
		
			
				|  |  | +        if (sub_field->type != GRPC_JSON_NUMBER) return;
 | 
	
		
			
				|  |  | +        max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
 | 
	
		
			
				|  |  | +        if (max_milli_tokens == -1) return;
 | 
	
		
			
				|  |  | +        max_milli_tokens *= 1000;
 | 
	
		
			
				|  |  | +      } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
 | 
	
		
			
				|  |  | +        if (milli_token_ratio != 0) return;  // Duplicate.
 | 
	
		
			
				|  |  | +        if (sub_field->type != GRPC_JSON_NUMBER) return;
 | 
	
		
			
				|  |  | +        // We support up to 3 decimal digits.
 | 
	
		
			
				|  |  | +        size_t whole_len = strlen(sub_field->value);
 | 
	
		
			
				|  |  | +        uint32_t multiplier = 1;
 | 
	
		
			
				|  |  | +        uint32_t decimal_value = 0;
 | 
	
		
			
				|  |  | +        const char *decimal_point = strchr(sub_field->value, '.');
 | 
	
		
			
				|  |  | +        if (decimal_point != NULL) {
 | 
	
		
			
				|  |  | +          whole_len = (size_t)(decimal_point - sub_field->value);
 | 
	
		
			
				|  |  | +          multiplier = 1000;
 | 
	
		
			
				|  |  | +          size_t decimal_len = strlen(decimal_point + 1);
 | 
	
		
			
				|  |  | +          if (decimal_len > 3) decimal_len = 3;
 | 
	
		
			
				|  |  | +          if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
 | 
	
		
			
				|  |  | +                                         &decimal_value)) {
 | 
	
		
			
				|  |  | +            return;
 | 
	
		
			
				|  |  | +          }
 | 
	
		
			
				|  |  | +          uint32_t decimal_multiplier = 1;
 | 
	
		
			
				|  |  | +          for (size_t i = 0; i < (3 - decimal_len); ++i) {
 | 
	
		
			
				|  |  | +            decimal_multiplier *= 10;
 | 
	
		
			
				|  |  | +          }
 | 
	
		
			
				|  |  | +          decimal_value *= decimal_multiplier;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        uint32_t whole_value;
 | 
	
		
			
				|  |  | +        if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
 | 
	
		
			
				|  |  | +                                       &whole_value)) {
 | 
	
		
			
				|  |  | +          return;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        milli_token_ratio = (int)((whole_value * multiplier) + decimal_value);
 | 
	
		
			
				|  |  | +        if (milli_token_ratio <= 0) return;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    parsing_state->retry_throttle_data =
 | 
	
		
			
				|  |  | +        grpc_retry_throttle_map_get_data_for_server(
 | 
	
		
			
				|  |  | +            parsing_state->server_name, max_milli_tokens, milli_token_ratio);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |                                                void *arg, grpc_error *error) {
 | 
	
		
			
				|  |  |    channel_data *chand = arg;
 | 
	
	
		
			
				|  | @@ -295,6 +357,8 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |    bool exit_idle = false;
 | 
	
		
			
				|  |  |    grpc_error *state_error = GRPC_ERROR_CREATE("No load balancing policy");
 | 
	
		
			
				|  |  |    char *service_config_json = NULL;
 | 
	
		
			
				|  |  | +  service_config_parsing_state parsing_state;
 | 
	
		
			
				|  |  | +  memset(&parsing_state, 0, sizeof(parsing_state));
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    if (chand->resolver_result != NULL) {
 | 
	
		
			
				|  |  |      // Find LB policy name.
 | 
	
	
		
			
				|  | @@ -355,6 +419,19 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |        grpc_service_config *service_config =
 | 
	
		
			
				|  |  |            grpc_service_config_create(service_config_json);
 | 
	
		
			
				|  |  |        if (service_config != NULL) {
 | 
	
		
			
				|  |  | +        channel_arg =
 | 
	
		
			
				|  |  | +            grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
 | 
	
		
			
				|  |  | +        GPR_ASSERT(channel_arg != NULL);
 | 
	
		
			
				|  |  | +        GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING);
 | 
	
		
			
				|  |  | +        grpc_uri *uri =
 | 
	
		
			
				|  |  | +            grpc_uri_parse(exec_ctx, channel_arg->value.string, true);
 | 
	
		
			
				|  |  | +        GPR_ASSERT(uri->path[0] != '\0');
 | 
	
		
			
				|  |  | +        parsing_state.server_name =
 | 
	
		
			
				|  |  | +            uri->path[0] == '/' ? uri->path + 1 : uri->path;
 | 
	
		
			
				|  |  | +        grpc_service_config_parse_global_params(
 | 
	
		
			
				|  |  | +            service_config, parse_retry_throttle_params, &parsing_state);
 | 
	
		
			
				|  |  | +        parsing_state.server_name = NULL;
 | 
	
		
			
				|  |  | +        grpc_uri_destroy(uri);
 | 
	
		
			
				|  |  |          method_params_table = grpc_service_config_create_method_config_table(
 | 
	
		
			
				|  |  |              exec_ctx, service_config, method_parameters_create_from_json,
 | 
	
		
			
				|  |  |              &method_parameters_vtable);
 | 
	
	
		
			
				|  | @@ -386,6 +463,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |      chand->info_service_config_json = service_config_json;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    gpr_mu_unlock(&chand->info_mu);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  if (chand->retry_throttle_data != NULL) {
 | 
	
		
			
				|  |  | +    grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  chand->retry_throttle_data = parsing_state.retry_throttle_data;
 | 
	
		
			
				|  |  |    if (chand->method_params_table != NULL) {
 | 
	
		
			
				|  |  |      grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -613,6 +695,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |    gpr_free(chand->info_lb_policy_name);
 | 
	
		
			
				|  |  |    gpr_free(chand->info_service_config_json);
 | 
	
		
			
				|  |  | +  if (chand->retry_throttle_data != NULL) {
 | 
	
		
			
				|  |  | +    grpc_server_retry_throttle_data_unref(chand->retry_throttle_data);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |    if (chand->method_params_table != NULL) {
 | 
	
		
			
				|  |  |      grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table);
 | 
	
		
			
				|  |  |    }
 | 
	
	
		
			
				|  | @@ -654,6 +739,7 @@ typedef struct client_channel_call_data {
 | 
	
		
			
				|  |  |    grpc_slice path;  // Request path.
 | 
	
		
			
				|  |  |    gpr_timespec call_start_time;
 | 
	
		
			
				|  |  |    gpr_timespec deadline;
 | 
	
		
			
				|  |  | +  grpc_server_retry_throttle_data *retry_throttle_data;
 | 
	
		
			
				|  |  |    method_parameters *method_params;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_error *cancel_error;
 | 
	
	
		
			
				|  | @@ -676,6 +762,9 @@ typedef struct client_channel_call_data {
 | 
	
		
			
				|  |  |    grpc_call_stack *owning_call;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    grpc_linked_mdelem lb_token_mdelem;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  grpc_closure on_complete;
 | 
	
		
			
				|  |  | +  grpc_closure *original_on_complete;
 | 
	
		
			
				|  |  |  } call_data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  grpc_subchannel_call *grpc_client_channel_get_subchannel_call(
 | 
	
	
		
			
				|  | @@ -728,7 +817,7 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
 | 
	
		
			
				|  |  |    gpr_free(ops);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -// Sets calld->method_params.
 | 
	
		
			
				|  |  | +// Sets calld->method_params and calld->retry_throttle_data.
 | 
	
		
			
				|  |  |  // If the method params specify a timeout, populates
 | 
	
		
			
				|  |  |  // *per_method_deadline and returns true.
 | 
	
		
			
				|  |  |  static bool set_call_method_params_from_service_config_locked(
 | 
	
	
		
			
				|  | @@ -736,6 +825,10 @@ static bool set_call_method_params_from_service_config_locked(
 | 
	
		
			
				|  |  |      gpr_timespec *per_method_deadline) {
 | 
	
		
			
				|  |  |    channel_data *chand = elem->channel_data;
 | 
	
		
			
				|  |  |    call_data *calld = elem->call_data;
 | 
	
		
			
				|  |  | +  if (chand->retry_throttle_data != NULL) {
 | 
	
		
			
				|  |  | +    calld->retry_throttle_data =
 | 
	
		
			
				|  |  | +        grpc_server_retry_throttle_data_ref(chand->retry_throttle_data);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |    if (chand->method_params_table != NULL) {
 | 
	
		
			
				|  |  |      calld->method_params = grpc_method_config_table_get(
 | 
	
		
			
				|  |  |          exec_ctx, chand->method_params_table, calld->path);
 | 
	
	
		
			
				|  | @@ -1056,6 +1149,26 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx,
 | 
	
		
			
				|  |  |    add_waiting_locked(calld, op);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
 | 
	
		
			
				|  |  | +  grpc_call_element *elem = arg;
 | 
	
		
			
				|  |  | +  call_data *calld = elem->call_data;
 | 
	
		
			
				|  |  | +  if (calld->retry_throttle_data != NULL) {
 | 
	
		
			
				|  |  | +    if (error == GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  | +      grpc_server_retry_throttle_data_record_success(
 | 
	
		
			
				|  |  | +          calld->retry_throttle_data);
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      // TODO(roth): In a subsequent PR, check the return value here and
 | 
	
		
			
				|  |  | +      // decide whether or not to retry.  Note that we should only
 | 
	
		
			
				|  |  | +      // record failures whose statuses match the configured retryable
 | 
	
		
			
				|  |  | +      // or non-fatal status codes.
 | 
	
		
			
				|  |  | +      grpc_server_retry_throttle_data_record_failure(
 | 
	
		
			
				|  |  | +          calld->retry_throttle_data);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  grpc_closure_run(exec_ctx, calld->original_on_complete,
 | 
	
		
			
				|  |  | +                   GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |                                               grpc_error *error_ignored) {
 | 
	
		
			
				|  |  |    GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0);
 | 
	
	
		
			
				|  | @@ -1064,6 +1177,14 @@ static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
 | 
	
		
			
				|  |  |    grpc_call_element *elem = op->handler_private.args[0];
 | 
	
		
			
				|  |  |    call_data *calld = elem->call_data;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +  if (op->recv_trailing_metadata != NULL) {
 | 
	
		
			
				|  |  | +    GPR_ASSERT(op->on_complete != NULL);
 | 
	
		
			
				|  |  | +    calld->original_on_complete = op->on_complete;
 | 
	
		
			
				|  |  | +    grpc_closure_init(&calld->on_complete, on_complete, elem,
 | 
	
		
			
				|  |  | +                      grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  | +    op->on_complete = &calld->on_complete;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    start_transport_stream_op_locked_inner(exec_ctx, op, elem);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
 |