| 
					
				 | 
			
			
				@@ -59,6 +59,7 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #include "src/core/lib/iomgr/sockaddr_utils.h" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #include "src/core/lib/iomgr/tcp_client.h" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #include "src/core/lib/iomgr/tcp_server.h" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+#include "src/core/lib/iomgr/timer.h" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #include "src/core/lib/slice/slice_internal.h" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 #include "test/core/util/port.h" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -69,7 +70,7 @@ struct grpc_end2end_http_proxy { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_channel_args* channel_args; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   gpr_mu* mu; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_pollset* pollset; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-  gpr_atm shutdown; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  gpr_refcount users; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 }; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 // 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -77,6 +78,8 @@ struct grpc_end2end_http_proxy { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 // 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 typedef struct proxy_connection { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  grpc_end2end_http_proxy* proxy; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_endpoint* client_endpoint; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_endpoint* server_endpoint; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -103,13 +106,20 @@ typedef struct proxy_connection { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_http_request http_request; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } proxy_connection; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+static void proxy_connection_ref(proxy_connection* conn, const char* reason) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  gpr_ref(&conn->refcount); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+} 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 // Helper function to destroy the proxy connection. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 static void proxy_connection_unref(grpc_exec_ctx* exec_ctx, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                                   proxy_connection* conn) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                   proxy_connection* conn, const char* reason) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   if (gpr_unref(&conn->refcount)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    gpr_log(GPR_DEBUG, "endpoints: %p %p", conn->client_endpoint, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            conn->server_endpoint); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     grpc_endpoint_destroy(exec_ctx, conn->client_endpoint); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    if (conn->server_endpoint != NULL) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    if (conn->server_endpoint != NULL) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       grpc_endpoint_destroy(exec_ctx, conn->server_endpoint); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     grpc_pollset_set_destroy(exec_ctx, conn->pollset_set); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     grpc_slice_buffer_destroy_internal(exec_ctx, &conn->client_read_buffer); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     grpc_slice_buffer_destroy_internal(exec_ctx, 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -121,6 +131,7 @@ static void proxy_connection_unref(grpc_exec_ctx* exec_ctx, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     grpc_slice_buffer_destroy_internal(exec_ctx, &conn->server_write_buffer); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     grpc_http_parser_destroy(&conn->http_parser); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     grpc_http_request_destroy(&conn->http_request); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    gpr_unref(&conn->proxy->users); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     gpr_free(conn); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -139,7 +150,7 @@ static void proxy_connection_failed(grpc_exec_ctx* exec_ctx, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                            GRPC_ERROR_REF(error)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-  proxy_connection_unref(exec_ctx, conn); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  proxy_connection_unref(exec_ctx, conn, "conn_failed"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 // Callback for writing proxy data to the client. 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -163,7 +174,7 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         &conn->on_client_write_done); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     // No more writes.  Unref the connection. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    proxy_connection_unref(exec_ctx, conn); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    proxy_connection_unref(exec_ctx, conn, "write_done"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -188,7 +199,7 @@ static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         &conn->on_server_write_done); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     // No more writes.  Unref the connection. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    proxy_connection_unref(exec_ctx, conn); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    proxy_connection_unref(exec_ctx, conn, "server_write"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -214,7 +225,7 @@ static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     grpc_slice_buffer_move_into(&conn->client_read_buffer, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                 &conn->server_write_buffer); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    gpr_ref(&conn->refcount); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    proxy_connection_ref(conn, "client_read"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     grpc_endpoint_write(exec_ctx, conn->server_endpoint, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         &conn->server_write_buffer, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         &conn->on_server_write_done); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -246,7 +257,7 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     grpc_slice_buffer_move_into(&conn->server_read_buffer, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                 &conn->client_write_buffer); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    gpr_ref(&conn->refcount); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    proxy_connection_ref(conn, "server_read"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     grpc_endpoint_write(exec_ctx, conn->client_endpoint, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         &conn->client_write_buffer, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         &conn->on_client_write_done); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -270,7 +281,9 @@ static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   // Start reading from both client and server.  One of the read 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   // requests inherits our ref to conn, but we need to take a new ref 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   // for the other one. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-  gpr_ref(&conn->refcount); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  proxy_connection_ref(conn, "client_read"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  proxy_connection_ref(conn, "server_read"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  proxy_connection_unref(exec_ctx, conn, "write_response"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_endpoint_read(exec_ctx, conn->client_endpoint, &conn->client_read_buffer, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                      &conn->on_client_read_done); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_endpoint_read(exec_ctx, conn->server_endpoint, &conn->server_read_buffer, 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -312,6 +325,8 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                                  grpc_error* error) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   proxy_connection* conn = arg; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          grpc_error_string(error)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   if (error != GRPC_ERROR_NONE) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     proxy_connection_failed(exec_ctx, conn, true /* is_client */, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                             "HTTP proxy read request", error); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -376,12 +391,14 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   gpr_free(acceptor); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_end2end_http_proxy* proxy = arg; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   // Instantiate proxy_connection. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-  proxy_connection* conn = gpr_malloc(sizeof(*conn)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-  memset(conn, 0, sizeof(*conn)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  proxy_connection* conn = gpr_zalloc(sizeof(*conn)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  gpr_ref(&proxy->users); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   conn->client_endpoint = endpoint; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  conn->proxy = proxy; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   gpr_ref_init(&conn->refcount, 1); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   conn->pollset_set = grpc_pollset_set_create(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_pollset_set_add_pollset(exec_ctx, conn->pollset_set, proxy->pollset); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  grpc_endpoint_add_to_pollset_set(exec_ctx, endpoint, conn->pollset_set); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_closure_init(&conn->on_read_request_done, on_read_request_done, conn, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     grpc_schedule_on_exec_ctx); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_closure_init(&conn->on_server_connect_done, on_server_connect_done, conn, 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -416,6 +433,7 @@ static void thread_main(void* arg) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_end2end_http_proxy* proxy = arg; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   do { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    gpr_ref(&proxy->users); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     const gpr_timespec deadline = 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         gpr_time_add(now, gpr_time_from_seconds(1, GPR_TIMESPAN)); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -426,7 +444,7 @@ static void thread_main(void* arg) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         grpc_pollset_work(&exec_ctx, proxy->pollset, &worker, now, deadline)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     gpr_mu_unlock(proxy->mu); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     grpc_exec_ctx_flush(&exec_ctx); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-  } while (!gpr_atm_acq_load(&proxy->shutdown)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  } while (!gpr_unref(&proxy->users)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_exec_ctx_finish(&exec_ctx); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -434,6 +452,7 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(void) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_end2end_http_proxy* proxy = gpr_malloc(sizeof(*proxy)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   memset(proxy, 0, sizeof(*proxy)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  gpr_ref_init(&proxy->users, 1); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   // Construct proxy address. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   const int proxy_port = grpc_pick_unused_port_or_die(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   gpr_join_host_port(&proxy->proxy_name, "localhost", proxy_port); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -474,17 +493,16 @@ static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* arg, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-  gpr_atm_rel_store(&proxy->shutdown, 1);  // Signal proxy thread to shutdown. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  gpr_unref(&proxy->users);  // Signal proxy thread to shutdown. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   gpr_thd_join(proxy->thd); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_tcp_server_shutdown_listeners(&exec_ctx, proxy->server); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_tcp_server_unref(&exec_ctx, proxy->server); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   gpr_free(proxy->proxy_name); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_channel_args_destroy(&exec_ctx, proxy->channel_args); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-  grpc_closure destroyed; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-  grpc_closure_init(&destroyed, destroy_pollset, proxy->pollset, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    grpc_schedule_on_exec_ctx); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-  grpc_pollset_shutdown(&exec_ctx, proxy->pollset, &destroyed); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  grpc_pollset_shutdown(&exec_ctx, proxy->pollset, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        grpc_closure_create(destroy_pollset, proxy->pollset, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                            grpc_schedule_on_exec_ctx)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   gpr_free(proxy); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   grpc_exec_ctx_finish(&exec_ctx); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 |