Эх сурвалжийг харах

Request/response path starting to work

Craig Tiller 10 жил өмнө
parent
commit
5945ee1a75

+ 0 - 1
src/core/channel/http_client_filter.c

@@ -179,7 +179,6 @@ static void init_channel_elem(grpc_channel_element *elem,
   /* The first and the last filters tend to be implemented differently to
   /* The first and the last filters tend to be implemented differently to
      handle the case that there's no 'next' filter to call on the up or down
      handle the case that there's no 'next' filter to call on the up or down
      path */
      path */
-  GPR_ASSERT(!is_first);
   GPR_ASSERT(!is_last);
   GPR_ASSERT(!is_last);
 
 
   /* initialize members */
   /* initialize members */

+ 23 - 9
src/core/client_config/subchannel.c

@@ -64,7 +64,7 @@ struct grpc_subchannel {
 
 
   /** non-transport related channel filters */
   /** non-transport related channel filters */
   const grpc_channel_filter **filters;
   const grpc_channel_filter **filters;
-  size_t filter_count;
+  size_t num_filters;
   /** channel arguments */
   /** channel arguments */
   grpc_channel_args *args;
   grpc_channel_args *args;
   /** address to connect to */
   /** address to connect to */
@@ -144,11 +144,10 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
   gpr_ref_init(&c->refs, 1);
   gpr_ref_init(&c->refs, 1);
   c->connector = connector;
   c->connector = connector;
   grpc_connector_ref(c->connector);
   grpc_connector_ref(c->connector);
-  c->filter_count = args->filter_count + 1;
-  c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->filter_count);
+  c->num_filters = args->filter_count;
+  c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
   memcpy(c->filters, args->filters,
   memcpy(c->filters, args->filters,
-         sizeof(grpc_channel_filter *) * args->filter_count);
-  c->filters[c->filter_count - 1] = &grpc_connected_channel_filter;
+         sizeof(grpc_channel_filter *) * c->num_filters);
   c->addr = gpr_malloc(args->addr_len);
   c->addr = gpr_malloc(args->addr_len);
   memcpy(c->addr, args->addr, args->addr_len);
   memcpy(c->addr, args->addr, args->addr_len);
   c->addr_len = args->addr_len;
   c->addr_len = args->addr_len;
@@ -249,13 +248,26 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
 }
 }
 
 
 static void publish_transport(grpc_subchannel *c) {
 static void publish_transport(grpc_subchannel *c) {
-	size_t channel_stack_size = grpc_channel_stack_size(c->filters, c->filter_count);
-	connection *con = gpr_malloc(sizeof(connection) + channel_stack_size);
-	grpc_channel_stack *stk = (grpc_channel_stack *)(con + 1);
+	size_t channel_stack_size;
+	connection *con;
+	grpc_channel_stack *stk;
+	size_t num_filters;
+	const grpc_channel_filter **filters;
 	waiting_for_connect *w4c;
 	waiting_for_connect *w4c;
+
+	num_filters = c->num_filters + c->connecting_result.num_filters + 1;
+	filters = gpr_malloc(sizeof(*filters) * num_filters);
+	memcpy(filters, c->filters, sizeof(*filters) * c->num_filters);
+	memcpy(filters + c->num_filters, c->connecting_result.filters, sizeof(*filters) * c->connecting_result.num_filters);
+	filters[num_filters - 1] = &grpc_connected_channel_filter;
+
+	channel_stack_size = grpc_channel_stack_size(filters, num_filters);
+	con = gpr_malloc(sizeof(connection) + channel_stack_size);
+	stk = (grpc_channel_stack *)(con + 1);
+
 	gpr_ref_init(&con->refs, 1);
 	gpr_ref_init(&con->refs, 1);
 	con->subchannel = c;
 	con->subchannel = c;
-	grpc_channel_stack_init(c->filters, c->filter_count, c->args, c->mdctx, stk);
+	grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk);
 	grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
 	grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
 	memset(&c->connecting_result, 0, sizeof(c->connecting_result));
 	memset(&c->connecting_result, 0, sizeof(c->connecting_result));
 
 
@@ -268,6 +280,8 @@ static void publish_transport(grpc_subchannel *c) {
 		abort(); /* not implemented */
 		abort(); /* not implemented */
 	}
 	}
 	gpr_mu_unlock(&c->mu);
 	gpr_mu_unlock(&c->mu);
+
+	gpr_free(filters);
 } 
 } 
 
 
 static void subchannel_connected(void *arg, int iomgr_success) {
 static void subchannel_connected(void *arg, int iomgr_success) {

+ 5 - 1
src/core/surface/channel_create.c

@@ -40,6 +40,7 @@
 
 
 #include "src/core/channel/channel_args.h"
 #include "src/core/channel/channel_args.h"
 #include "src/core/channel/client_channel.h"
 #include "src/core/channel/client_channel.h"
+#include "src/core/channel/http_client_filter.h"
 #include "src/core/client_config/resolver_registry.h"
 #include "src/core/client_config/resolver_registry.h"
 #include "src/core/iomgr/tcp_client.h"
 #include "src/core/iomgr/tcp_client.h"
 #include "src/core/surface/channel.h"
 #include "src/core/surface/channel.h"
@@ -72,8 +73,11 @@ static void connected(void *arg, grpc_endpoint *tcp) {
   if (tcp != NULL) {
   if (tcp != NULL) {
     c->result->transport =
     c->result->transport =
         grpc_create_chttp2_transport(c->args.channel_args, tcp, NULL, 0, c->args.metadata_context, 1);
         grpc_create_chttp2_transport(c->args.channel_args, tcp, NULL, 0, c->args.metadata_context, 1);
+    c->result->filters = gpr_malloc(sizeof(grpc_channel_filter*));
+    c->result->filters[0] = &grpc_http_client_filter;
+    c->result->num_filters = 1;
   } else {
   } else {
-    c->result->transport = NULL;
+    memset(c->result, 0, sizeof(*c->result));
   }
   }
   notify = c->notify;
   notify = c->notify;
   c->notify = NULL;
   c->notify = NULL;