Ver código fonte

Initial flow control improvements: outgoing path

Craig Tiller 9 anos atrás
pai
commit
1f6759991d

+ 7 - 37
src/core/ext/transport/chttp2/transport/chttp2_transport.c

@@ -480,8 +480,6 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
 
 
   if (server_data) {
   if (server_data) {
     s->id = (uint32_t)(uintptr_t)server_data;
     s->id = (uint32_t)(uintptr_t)server_data;
-    s->outgoing_window = t->settings[GRPC_PEER_SETTINGS]
-                                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
     s->incoming_window = s->max_recv_bytes =
     s->incoming_window = s->max_recv_bytes =
         t->settings[GRPC_SENT_SETTINGS]
         t->settings[GRPC_SENT_SETTINGS]
                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
@@ -513,6 +511,7 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp,
   }
   }
 
 
   grpc_chttp2_list_remove_stalled_by_transport(t, s);
   grpc_chttp2_list_remove_stalled_by_transport(t, s);
+  grpc_chttp2_list_remove_stalled_by_stream(t, s);
 
 
   for (int i = 0; i < STREAM_LIST_COUNT; i++) {
   for (int i = 0; i < STREAM_LIST_COUNT; i++) {
     if (s->included[i]) {
     if (s->included[i]) {
@@ -801,8 +800,6 @@ static void maybe_start_some_streams(grpc_exec_ctx *exec_ctx,
                              "no_more_stream_ids");
                              "no_more_stream_ids");
     }
     }
 
 
-    s->outgoing_window = t->settings[GRPC_PEER_SETTINGS]
-                                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
     s->incoming_window = stream_incoming_window =
     s->incoming_window = stream_incoming_window =
         t->settings[GRPC_SENT_SETTINGS]
         t->settings[GRPC_SENT_SETTINGS]
                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
@@ -1714,36 +1711,6 @@ static void end_all_the_calls(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
   GRPC_ERROR_UNREF(error);
   GRPC_ERROR_UNREF(error);
 }
 }
 
 
-/** update window from a settings change */
-typedef struct {
-  grpc_chttp2_transport *t;
-  grpc_exec_ctx *exec_ctx;
-} update_global_window_args;
-
-static void update_global_window(void *args, uint32_t id, void *stream) {
-  update_global_window_args *a = args;
-  grpc_chttp2_transport *t = a->t;
-  grpc_chttp2_stream *s = stream;
-  int was_zero;
-  int is_zero;
-  int64_t initial_window_update = t->initial_window_update;
-
-  if (initial_window_update > 0) {
-    was_zero = s->outgoing_window <= 0;
-    GRPC_CHTTP2_FLOW_CREDIT_STREAM("settings", t, s, outgoing_window,
-                                   initial_window_update);
-    is_zero = s->outgoing_window <= 0;
-
-    if (was_zero && !is_zero) {
-      grpc_chttp2_become_writable(a->exec_ctx, t, s, true,
-                                  "update_global_window");
-    }
-  } else {
-    GRPC_CHTTP2_FLOW_DEBIT_STREAM("settings", t, s, outgoing_window,
-                                  -initial_window_update);
-  }
-}
-
 /*******************************************************************************
 /*******************************************************************************
  * INPUT PROCESSING - PARSING
  * INPUT PROCESSING - PARSING
  */
  */
@@ -1827,9 +1794,12 @@ static void read_action_locked(grpc_exec_ctx *exec_ctx, void *tp,
 
 
     GPR_TIMER_BEGIN("post_parse_locked", 0);
     GPR_TIMER_BEGIN("post_parse_locked", 0);
     if (t->initial_window_update != 0) {
     if (t->initial_window_update != 0) {
-      update_global_window_args args = {t, exec_ctx};
-      grpc_chttp2_stream_map_for_each(&t->stream_map, update_global_window,
-                                      &args);
+      if (t->initial_window_update > 0) {
+        grpc_chttp2_stream *s;
+        while (grpc_chttp2_list_pop_stalled_by_stream(t, &s)) {
+          grpc_chttp2_list_add_writable_stream(t, s);
+        }
+      }
       t->initial_window_update = 0;
       t->initial_window_update = 0;
     }
     }
     /* handle higher level things */
     /* handle higher level things */

+ 2 - 4
src/core/ext/transport/chttp2/transport/frame_window_update.c

@@ -110,11 +110,9 @@ grpc_error *grpc_chttp2_window_update_parser_parse(
 
 
     if (t->incoming_stream_id != 0) {
     if (t->incoming_stream_id != 0) {
       if (s != NULL) {
       if (s != NULL) {
-        bool was_zero = s->outgoing_window <= 0;
-        GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", t, s, outgoing_window,
+        GRPC_CHTTP2_FLOW_CREDIT_STREAM("parse", t, s, outgoing_window_delta,
                                        received_update);
                                        received_update);
-        bool is_zero = s->outgoing_window <= 0;
-        if (was_zero && !is_zero) {
+        if (grpc_chttp2_list_remove_stalled_by_stream(t, s)) {
           grpc_chttp2_become_writable(exec_ctx, t, s, false,
           grpc_chttp2_become_writable(exec_ctx, t, s, false,
                                       "stream.read_flow_control");
                                       "stream.read_flow_control");
         }
         }

+ 23 - 13
src/core/ext/transport/chttp2/transport/internal.h

@@ -59,6 +59,7 @@ typedef enum {
   GRPC_CHTTP2_LIST_WRITABLE,
   GRPC_CHTTP2_LIST_WRITABLE,
   GRPC_CHTTP2_LIST_WRITING,
   GRPC_CHTTP2_LIST_WRITING,
   GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
   GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
+  GRPC_CHTTP2_LIST_STALLED_BY_STREAM,
   /** streams that are waiting to start because there are too many concurrent
   /** streams that are waiting to start because there are too many concurrent
       streams on the connection */
       streams on the connection */
   GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
   GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
@@ -360,8 +361,10 @@ struct grpc_chttp2_stream {
   /** HTTP2 stream id for this stream, or zero if one has not been assigned */
   /** HTTP2 stream id for this stream, or zero if one has not been assigned */
   uint32_t id;
   uint32_t id;
 
 
-  /** window available for us to send to peer */
-  int64_t outgoing_window;
+  /** window available for us to send to peer, over or under the initial window
+    * size of the transport... ie:
+    * outgoing_window = outgoing_window_delta + transport.initial_window_size */
+  int64_t outgoing_window_delta;
   /** The number of bytes the upper layers have offered to receive.
   /** The number of bytes the upper layers have offered to receive.
       As the upper layer offers more bytes, this value increases.
       As the upper layer offers more bytes, this value increases.
       As bytes are read, this value decreases. */
       As bytes are read, this value decreases. */
@@ -472,34 +475,41 @@ bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport *t,
                                           grpc_chttp2_stream *s);
                                           grpc_chttp2_stream *s);
 /** Get a writable stream
 /** Get a writable stream
     returns non-zero if there was a stream available */
     returns non-zero if there was a stream available */
-int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
-                                         grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
+                                          grpc_chttp2_stream **s);
 bool grpc_chttp2_list_remove_writable_stream(
 bool grpc_chttp2_list_remove_writable_stream(
     grpc_chttp2_transport *t, grpc_chttp2_stream *s) GRPC_MUST_USE_RESULT;
     grpc_chttp2_transport *t, grpc_chttp2_stream *s) GRPC_MUST_USE_RESULT;
 
 
 bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport *t,
 bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport *t,
                                          grpc_chttp2_stream *s);
                                          grpc_chttp2_stream *s);
-int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t);
-int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
-                                        grpc_chttp2_stream **s);
+bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t);
+bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
+                                         grpc_chttp2_stream **s);
 
 
 void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport *t,
 void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport *t,
                                          grpc_chttp2_stream *s);
                                          grpc_chttp2_stream *s);
-int grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport *t,
-                                        grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport *t,
+                                         grpc_chttp2_stream **s);
 
 
 void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t,
 void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t,
                                                   grpc_chttp2_stream *s);
                                                   grpc_chttp2_stream *s);
-int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
-                                                 grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
+                                                  grpc_chttp2_stream **s);
 
 
 void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
 void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
                                                grpc_chttp2_stream *s);
                                                grpc_chttp2_stream *s);
-int grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
-                                              grpc_chttp2_stream **s);
+bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
+                                               grpc_chttp2_stream **s);
 void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t,
 void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t,
                                                   grpc_chttp2_stream *s);
                                                   grpc_chttp2_stream *s);
 
 
+void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport *t,
+                                            grpc_chttp2_stream *s);
+bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t,
+                                            grpc_chttp2_stream **s);
+bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t,
+                                               grpc_chttp2_stream *s);
+
 grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t,
 grpc_chttp2_stream *grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport *t,
                                                       uint32_t id);
                                                       uint32_t id);
 grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx,
 grpc_chttp2_stream *grpc_chttp2_parsing_accept_stream(grpc_exec_ctx *exec_ctx,

+ 29 - 14
src/core/ext/transport/chttp2/transport/stream_lists.c

@@ -37,14 +37,14 @@
 
 
 /* core list management */
 /* core list management */
 
 
-static int stream_list_empty(grpc_chttp2_transport *t,
-                             grpc_chttp2_stream_list_id id) {
+static bool stream_list_empty(grpc_chttp2_transport *t,
+                              grpc_chttp2_stream_list_id id) {
   return t->lists[id].head == NULL;
   return t->lists[id].head == NULL;
 }
 }
 
 
-static int stream_list_pop(grpc_chttp2_transport *t,
-                           grpc_chttp2_stream **stream,
-                           grpc_chttp2_stream_list_id id) {
+static bool stream_list_pop(grpc_chttp2_transport *t,
+                            grpc_chttp2_stream **stream,
+                            grpc_chttp2_stream_list_id id) {
   grpc_chttp2_stream *s = t->lists[id].head;
   grpc_chttp2_stream *s = t->lists[id].head;
   if (s) {
   if (s) {
     grpc_chttp2_stream *new_head = s->links[id].next;
     grpc_chttp2_stream *new_head = s->links[id].next;
@@ -124,8 +124,8 @@ bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport *t,
   return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITABLE);
   return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITABLE);
 }
 }
 
 
-int grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
-                                         grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport *t,
+                                          grpc_chttp2_stream **s) {
   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITABLE);
   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITABLE);
 }
 }
 
 
@@ -139,12 +139,12 @@ bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport *t,
   return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITING);
   return stream_list_add(t, s, GRPC_CHTTP2_LIST_WRITING);
 }
 }
 
 
-int grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t) {
+bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport *t) {
   return !stream_list_empty(t, GRPC_CHTTP2_LIST_WRITING);
   return !stream_list_empty(t, GRPC_CHTTP2_LIST_WRITING);
 }
 }
 
 
-int grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
-                                        grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport *t,
+                                         grpc_chttp2_stream **s) {
   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITING);
   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WRITING);
 }
 }
 
 
@@ -153,8 +153,8 @@ void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport *t,
   stream_list_add(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
   stream_list_add(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
 }
 }
 
 
-int grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
-                                                 grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport *t,
+                                                  grpc_chttp2_stream **s) {
   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY);
 }
 }
 
 
@@ -163,8 +163,8 @@ void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport *t,
   stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
   stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
 }
 }
 
 
-int grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
-                                              grpc_chttp2_stream **s) {
+bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport *t,
+                                               grpc_chttp2_stream **s) {
   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
   return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
 }
 }
 
 
@@ -172,3 +172,18 @@ void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport *t,
                                                   grpc_chttp2_stream *s) {
                                                   grpc_chttp2_stream *s) {
   stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
   stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
 }
 }
+
+void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport *t,
+                                            grpc_chttp2_stream *s) {
+  stream_list_add(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+}
+
+bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport *t,
+                                            grpc_chttp2_stream **s) {
+  return stream_list_pop(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+}
+
+bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport *t,
+                                               grpc_chttp2_stream *s) {
+  return stream_list_maybe_remove(t, s, GRPC_CHTTP2_LIST_STALLED_BY_STREAM);
+}

+ 13 - 5
src/core/ext/transport/chttp2/transport/writing.c

@@ -138,10 +138,15 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
     if (sent_initial_metadata) {
     if (sent_initial_metadata) {
       /* send any body bytes, if allowed by flow control */
       /* send any body bytes, if allowed by flow control */
       if (s->flow_controlled_buffer.length > 0) {
       if (s->flow_controlled_buffer.length > 0) {
-        uint32_t max_outgoing =
-            (uint32_t)GPR_MIN(t->settings[GRPC_ACKED_SETTINGS]
-                                         [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
-                              GPR_MIN(s->outgoing_window, t->outgoing_window));
+        uint32_t stream_outgoing_window = (uint32_t)GPR_MAX(
+            0,
+            s->outgoing_window_delta +
+                (int64_t)t->settings[GRPC_ACKED_SETTINGS]
+                                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
+        uint32_t max_outgoing = (uint32_t)GPR_MIN(
+            t->settings[GRPC_ACKED_SETTINGS]
+                       [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
+            GPR_MIN(stream_outgoing_window, t->outgoing_window));
         if (max_outgoing > 0) {
         if (max_outgoing > 0) {
           uint32_t send_bytes =
           uint32_t send_bytes =
               (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length);
               (uint32_t)GPR_MIN(max_outgoing, s->flow_controlled_buffer.length);
@@ -154,7 +159,7 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
           grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes,
           grpc_chttp2_encode_data(s->id, &s->flow_controlled_buffer, send_bytes,
                                   is_last_frame, &s->stats.outgoing,
                                   is_last_frame, &s->stats.outgoing,
                                   &t->outbuf);
                                   &t->outbuf);
-          GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window,
+          GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", t, s, outgoing_window_delta,
                                         send_bytes);
                                         send_bytes);
           GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
           GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", t, outgoing_window,
                                            send_bytes);
                                            send_bytes);
@@ -176,6 +181,9 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx,
         } else if (t->outgoing_window == 0) {
         } else if (t->outgoing_window == 0) {
           grpc_chttp2_list_add_stalled_by_transport(t, s);
           grpc_chttp2_list_add_stalled_by_transport(t, s);
           now_writing = true;
           now_writing = true;
+        } else if (stream_outgoing_window == 0) {
+          grpc_chttp2_list_add_stalled_by_stream(t, s);
+          now_writing = true;
         }
         }
       }
       }
       if (s->send_trailing_metadata != NULL &&
       if (s->send_trailing_metadata != NULL &&