|  | @@ -59,6 +59,7 @@
 | 
	
		
			
				|  |  |  #include "src/core/lib/iomgr/sockaddr_utils.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/iomgr/tcp_client.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/iomgr/tcp_server.h"
 | 
	
		
			
				|  |  | +#include "src/core/lib/iomgr/timer.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/slice/slice_internal.h"
 | 
	
		
			
				|  |  |  #include "test/core/util/port.h"
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -69,7 +70,7 @@ struct grpc_end2end_http_proxy {
 | 
	
		
			
				|  |  |    grpc_channel_args* channel_args;
 | 
	
		
			
				|  |  |    gpr_mu* mu;
 | 
	
		
			
				|  |  |    grpc_pollset* pollset;
 | 
	
		
			
				|  |  | -  gpr_atm shutdown;
 | 
	
		
			
				|  |  | +  gpr_refcount users;
 | 
	
		
			
				|  |  |  };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  //
 | 
	
	
		
			
				|  | @@ -77,6 +78,8 @@ struct grpc_end2end_http_proxy {
 | 
	
		
			
				|  |  |  //
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  typedef struct proxy_connection {
 | 
	
		
			
				|  |  | +  grpc_end2end_http_proxy* proxy;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |    grpc_endpoint* client_endpoint;
 | 
	
		
			
				|  |  |    grpc_endpoint* server_endpoint;
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -103,13 +106,26 @@ typedef struct proxy_connection {
 | 
	
		
			
				|  |  |    grpc_http_request http_request;
 | 
	
		
			
				|  |  |  } proxy_connection;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +static void proxy_connection_ref(proxy_connection* conn, const char* reason) {
 | 
	
		
			
				|  |  | +  gpr_log(GPR_DEBUG, "proxy_connection_ref: %p %s %" PRIdPTR " --> %" PRIdPTR,
 | 
	
		
			
				|  |  | +          conn, reason, gpr_atm_no_barrier_load(&conn->refcount.count),
 | 
	
		
			
				|  |  | +          gpr_atm_no_barrier_load(&conn->refcount.count) - 1);
 | 
	
		
			
				|  |  | +  gpr_ref(&conn->refcount);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  // Helper function to destroy the proxy connection.
 | 
	
		
			
				|  |  |  static void proxy_connection_unref(grpc_exec_ctx* exec_ctx,
 | 
	
		
			
				|  |  | -                                   proxy_connection* conn) {
 | 
	
		
			
				|  |  | +                                   proxy_connection* conn, const char* reason) {
 | 
	
		
			
				|  |  | +  gpr_log(GPR_DEBUG, "proxy_connection_unref: %p %s %" PRIdPTR " --> %" PRIdPTR,
 | 
	
		
			
				|  |  | +          conn, reason, gpr_atm_no_barrier_load(&conn->refcount.count),
 | 
	
		
			
				|  |  | +          gpr_atm_no_barrier_load(&conn->refcount.count) - 1);
 | 
	
		
			
				|  |  |    if (gpr_unref(&conn->refcount)) {
 | 
	
		
			
				|  |  | +    gpr_log(GPR_DEBUG, "endpoints: %p %p", conn->client_endpoint,
 | 
	
		
			
				|  |  | +            conn->server_endpoint);
 | 
	
		
			
				|  |  |      grpc_endpoint_destroy(exec_ctx, conn->client_endpoint);
 | 
	
		
			
				|  |  | -    if (conn->server_endpoint != NULL)
 | 
	
		
			
				|  |  | +    if (conn->server_endpoint != NULL) {
 | 
	
		
			
				|  |  |        grpc_endpoint_destroy(exec_ctx, conn->server_endpoint);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |      grpc_pollset_set_destroy(exec_ctx, conn->pollset_set);
 | 
	
		
			
				|  |  |      grpc_slice_buffer_destroy_internal(exec_ctx, &conn->client_read_buffer);
 | 
	
		
			
				|  |  |      grpc_slice_buffer_destroy_internal(exec_ctx,
 | 
	
	
		
			
				|  | @@ -121,6 +137,7 @@ static void proxy_connection_unref(grpc_exec_ctx* exec_ctx,
 | 
	
		
			
				|  |  |      grpc_slice_buffer_destroy_internal(exec_ctx, &conn->server_write_buffer);
 | 
	
		
			
				|  |  |      grpc_http_parser_destroy(&conn->http_parser);
 | 
	
		
			
				|  |  |      grpc_http_request_destroy(&conn->http_request);
 | 
	
		
			
				|  |  | +    gpr_unref(&conn->proxy->users);
 | 
	
		
			
				|  |  |      gpr_free(conn);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
	
		
			
				|  | @@ -139,7 +156,7 @@ static void proxy_connection_failed(grpc_exec_ctx* exec_ctx,
 | 
	
		
			
				|  |  |      grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint,
 | 
	
		
			
				|  |  |                             GRPC_ERROR_REF(error));
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  proxy_connection_unref(exec_ctx, conn);
 | 
	
		
			
				|  |  | +  proxy_connection_unref(exec_ctx, conn, "conn_failed");
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // Callback for writing proxy data to the client.
 | 
	
	
		
			
				|  | @@ -163,7 +180,7 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg,
 | 
	
		
			
				|  |  |                          &conn->on_client_write_done);
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      // No more writes.  Unref the connection.
 | 
	
		
			
				|  |  | -    proxy_connection_unref(exec_ctx, conn);
 | 
	
		
			
				|  |  | +    proxy_connection_unref(exec_ctx, conn, "write_done");
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -188,7 +205,7 @@ static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg,
 | 
	
		
			
				|  |  |                          &conn->on_server_write_done);
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      // No more writes.  Unref the connection.
 | 
	
		
			
				|  |  | -    proxy_connection_unref(exec_ctx, conn);
 | 
	
		
			
				|  |  | +    proxy_connection_unref(exec_ctx, conn, "server_write");
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -214,7 +231,7 @@ static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg,
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      grpc_slice_buffer_move_into(&conn->client_read_buffer,
 | 
	
		
			
				|  |  |                                  &conn->server_write_buffer);
 | 
	
		
			
				|  |  | -    gpr_ref(&conn->refcount);
 | 
	
		
			
				|  |  | +    proxy_connection_ref(conn, "client_read");
 | 
	
		
			
				|  |  |      grpc_endpoint_write(exec_ctx, conn->server_endpoint,
 | 
	
		
			
				|  |  |                          &conn->server_write_buffer,
 | 
	
		
			
				|  |  |                          &conn->on_server_write_done);
 | 
	
	
		
			
				|  | @@ -246,7 +263,7 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg,
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      grpc_slice_buffer_move_into(&conn->server_read_buffer,
 | 
	
		
			
				|  |  |                                  &conn->client_write_buffer);
 | 
	
		
			
				|  |  | -    gpr_ref(&conn->refcount);
 | 
	
		
			
				|  |  | +    proxy_connection_ref(conn, "server_read");
 | 
	
		
			
				|  |  |      grpc_endpoint_write(exec_ctx, conn->client_endpoint,
 | 
	
		
			
				|  |  |                          &conn->client_write_buffer,
 | 
	
		
			
				|  |  |                          &conn->on_client_write_done);
 | 
	
	
		
			
				|  | @@ -270,7 +287,9 @@ static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg,
 | 
	
		
			
				|  |  |    // Start reading from both client and server.  One of the read
 | 
	
		
			
				|  |  |    // requests inherits our ref to conn, but we need to take a new ref
 | 
	
		
			
				|  |  |    // for the other one.
 | 
	
		
			
				|  |  | -  gpr_ref(&conn->refcount);
 | 
	
		
			
				|  |  | +  proxy_connection_ref(conn, "client_read");
 | 
	
		
			
				|  |  | +  proxy_connection_ref(conn, "server_read");
 | 
	
		
			
				|  |  | +  proxy_connection_unref(exec_ctx, conn, "write_response");
 | 
	
		
			
				|  |  |    grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer,
 | 
	
		
			
				|  |  |                       &conn->on_client_read_done);
 | 
	
		
			
				|  |  |    grpc_endpoint_read(exec_ctx, conn->server_endpoint, &conn->server_read_buffer,
 | 
	
	
		
			
				|  | @@ -312,6 +331,8 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg,
 | 
	
		
			
				|  |  |  static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
 | 
	
		
			
				|  |  |                                   grpc_error* error) {
 | 
	
		
			
				|  |  |    proxy_connection* conn = arg;
 | 
	
		
			
				|  |  | +  gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn,
 | 
	
		
			
				|  |  | +          grpc_error_string(error));
 | 
	
		
			
				|  |  |    if (error != GRPC_ERROR_NONE) {
 | 
	
		
			
				|  |  |      proxy_connection_failed(exec_ctx, conn, true /* is_client */,
 | 
	
		
			
				|  |  |                              "HTTP proxy read request", error);
 | 
	
	
		
			
				|  | @@ -376,12 +397,15 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg,
 | 
	
		
			
				|  |  |    gpr_free(acceptor);
 | 
	
		
			
				|  |  |    grpc_end2end_http_proxy* proxy = arg;
 | 
	
		
			
				|  |  |    // Instantiate proxy_connection.
 | 
	
		
			
				|  |  | -  proxy_connection* conn = gpr_malloc(sizeof(*conn));
 | 
	
		
			
				|  |  | -  memset(conn, 0, sizeof(*conn));
 | 
	
		
			
				|  |  | +  proxy_connection* conn = gpr_zalloc(sizeof(*conn));
 | 
	
		
			
				|  |  | +  gpr_ref(&proxy->users);
 | 
	
		
			
				|  |  |    conn->client_endpoint = endpoint;
 | 
	
		
			
				|  |  | +  conn->proxy = proxy;
 | 
	
		
			
				|  |  |    gpr_ref_init(&conn->refcount, 1);
 | 
	
		
			
				|  |  |    conn->pollset_set = grpc_pollset_set_create();
 | 
	
		
			
				|  |  | +  gpr_log(GPR_DEBUG, "on_accept: %p", conn);
 | 
	
		
			
				|  |  |    grpc_pollset_set_add_pollset(exec_ctx, conn->pollset_set, proxy->pollset);
 | 
	
		
			
				|  |  | +  grpc_endpoint_add_to_pollset_set(exec_ctx, endpoint, conn->pollset_set);
 | 
	
		
			
				|  |  |    grpc_closure_init(&conn->on_read_request_done, on_read_request_done, conn,
 | 
	
		
			
				|  |  |                      grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  |    grpc_closure_init(&conn->on_server_connect_done, on_server_connect_done, conn,
 | 
	
	
		
			
				|  | @@ -416,6 +440,7 @@ static void thread_main(void* arg) {
 | 
	
		
			
				|  |  |    grpc_end2end_http_proxy* proxy = arg;
 | 
	
		
			
				|  |  |    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
 | 
	
		
			
				|  |  |    do {
 | 
	
		
			
				|  |  | +    gpr_ref(&proxy->users);
 | 
	
		
			
				|  |  |      const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
 | 
	
		
			
				|  |  |      const gpr_timespec deadline =
 | 
	
		
			
				|  |  |          gpr_time_add(now, gpr_time_from_seconds(1, GPR_TIMESPAN));
 | 
	
	
		
			
				|  | @@ -426,7 +451,7 @@ static void thread_main(void* arg) {
 | 
	
		
			
				|  |  |          grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, now, deadline));
 | 
	
		
			
				|  |  |      gpr_mu_unlock(proxy->mu);
 | 
	
		
			
				|  |  |      grpc_exec_ctx_flush(&exec_ctx);
 | 
	
		
			
				|  |  | -  } while (!gpr_atm_acq_load(&proxy->shutdown));
 | 
	
		
			
				|  |  | +  } while (!gpr_unref(&proxy->users));
 | 
	
		
			
				|  |  |    grpc_exec_ctx_finish(&exec_ctx);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -434,6 +459,7 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(void) {
 | 
	
		
			
				|  |  |    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
 | 
	
		
			
				|  |  |    grpc_end2end_http_proxy* proxy = gpr_malloc(sizeof(*proxy));
 | 
	
		
			
				|  |  |    memset(proxy, 0, sizeof(*proxy));
 | 
	
		
			
				|  |  | +  gpr_ref_init(&proxy->users, 1);
 | 
	
		
			
				|  |  |    // Construct proxy address.
 | 
	
		
			
				|  |  |    const int proxy_port = grpc_pick_unused_port_or_die();
 | 
	
		
			
				|  |  |    gpr_join_host_port(&proxy->proxy_name, "localhost", proxy_port);
 | 
	
	
		
			
				|  | @@ -474,17 +500,16 @@ static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* arg,
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) {
 | 
	
		
			
				|  |  | -  gpr_atm_rel_store(&proxy->shutdown, 1);  // Signal proxy thread to shutdown.
 | 
	
		
			
				|  |  | +  gpr_unref(&proxy->users);  // Signal proxy thread to shutdown.
 | 
	
		
			
				|  |  |    grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
 | 
	
		
			
				|  |  |    gpr_thd_join(proxy->thd);
 | 
	
		
			
				|  |  |    grpc_tcp_server_shutdown_listeners(&exec_ctx, proxy->server);
 | 
	
		
			
				|  |  |    grpc_tcp_server_unref(&exec_ctx, proxy->server);
 | 
	
		
			
				|  |  |    gpr_free(proxy->proxy_name);
 | 
	
		
			
				|  |  |    grpc_channel_args_destroy(&exec_ctx, proxy->channel_args);
 | 
	
		
			
				|  |  | -  grpc_closure destroyed;
 | 
	
		
			
				|  |  | -  grpc_closure_init(&destroyed, destroy_pollset, proxy->pollset,
 | 
	
		
			
				|  |  | -                    grpc_schedule_on_exec_ctx);
 | 
	
		
			
				|  |  | -  grpc_pollset_shutdown(&exec_ctx, proxy->pollset, &destroyed);
 | 
	
		
			
				|  |  | +  grpc_pollset_shutdown(&exec_ctx, proxy->pollset,
 | 
	
		
			
				|  |  | +                        grpc_closure_create(destroy_pollset, proxy->pollset,
 | 
	
		
			
				|  |  | +                                            grpc_schedule_on_exec_ctx));
 | 
	
		
			
				|  |  |    gpr_free(proxy);
 | 
	
		
			
				|  |  |    grpc_exec_ctx_finish(&exec_ctx);
 | 
	
		
			
				|  |  |  }
 |