Przeglądaj źródła

Merge pull request #22575 from yashykt/newfilter

Move decompression into gRPC Core
Yash Tibrewal 5 lat temu
rodzic
commit
b7941bedf3

+ 2 - 0
BUILD

@@ -1193,11 +1193,13 @@ grpc_cc_library(
         "src/core/ext/filters/http/client/http_client_filter.cc",
         "src/core/ext/filters/http/http_filters_plugin.cc",
         "src/core/ext/filters/http/message_compress/message_compress_filter.cc",
+        "src/core/ext/filters/http/message_compress/message_decompress_filter.cc",
         "src/core/ext/filters/http/server/http_server_filter.cc",
     ],
     hdrs = [
         "src/core/ext/filters/http/client/http_client_filter.h",
         "src/core/ext/filters/http/message_compress/message_compress_filter.h",
+        "src/core/ext/filters/http/message_compress/message_decompress_filter.h",
         "src/core/ext/filters/http/server/http_server_filter.h",
     ],
     language = "c++",

+ 2 - 0
BUILD.gn

@@ -318,6 +318,8 @@ config("grpc_config") {
         "src/core/ext/filters/http/http_filters_plugin.cc",
         "src/core/ext/filters/http/message_compress/message_compress_filter.cc",
         "src/core/ext/filters/http/message_compress/message_compress_filter.h",
+        "src/core/ext/filters/http/message_compress/message_decompress_filter.cc",
+        "src/core/ext/filters/http/message_compress/message_decompress_filter.h",
         "src/core/ext/filters/http/server/http_server_filter.cc",
         "src/core/ext/filters/http/server/http_server_filter.h",
         "src/core/ext/filters/max_age/max_age_filter.cc",

+ 2 - 0
CMakeLists.txt

@@ -1369,6 +1369,7 @@ add_library(grpc
   src/core/ext/filters/http/client_authority_filter.cc
   src/core/ext/filters/http/http_filters_plugin.cc
   src/core/ext/filters/http/message_compress/message_compress_filter.cc
+  src/core/ext/filters/http/message_compress/message_decompress_filter.cc
   src/core/ext/filters/http/server/http_server_filter.cc
   src/core/ext/filters/max_age/max_age_filter.cc
   src/core/ext/filters/message_size/message_size_filter.cc
@@ -2028,6 +2029,7 @@ add_library(grpc_unsecure
   src/core/ext/filters/http/client_authority_filter.cc
   src/core/ext/filters/http/http_filters_plugin.cc
   src/core/ext/filters/http/message_compress/message_compress_filter.cc
+  src/core/ext/filters/http/message_compress/message_decompress_filter.cc
   src/core/ext/filters/http/server/http_server_filter.cc
   src/core/ext/filters/max_age/max_age_filter.cc
   src/core/ext/filters/message_size/message_size_filter.cc

+ 2 - 0
Makefile

@@ -3694,6 +3694,7 @@ LIBGRPC_SRC = \
     src/core/ext/filters/http/client_authority_filter.cc \
     src/core/ext/filters/http/http_filters_plugin.cc \
     src/core/ext/filters/http/message_compress/message_compress_filter.cc \
+    src/core/ext/filters/http/message_compress/message_decompress_filter.cc \
     src/core/ext/filters/http/server/http_server_filter.cc \
     src/core/ext/filters/max_age/max_age_filter.cc \
     src/core/ext/filters/message_size/message_size_filter.cc \
@@ -4327,6 +4328,7 @@ LIBGRPC_UNSECURE_SRC = \
     src/core/ext/filters/http/client_authority_filter.cc \
     src/core/ext/filters/http/http_filters_plugin.cc \
     src/core/ext/filters/http/message_compress/message_compress_filter.cc \
+    src/core/ext/filters/http/message_compress/message_decompress_filter.cc \
     src/core/ext/filters/http/server/http_server_filter.cc \
     src/core/ext/filters/max_age/max_age_filter.cc \
     src/core/ext/filters/message_size/message_size_filter.cc \

+ 4 - 0
build_autogenerated.yaml

@@ -423,6 +423,7 @@ libs:
   - src/core/ext/filters/http/client/http_client_filter.h
   - src/core/ext/filters/http/client_authority_filter.h
   - src/core/ext/filters/http/message_compress/message_compress_filter.h
+  - src/core/ext/filters/http/message_compress/message_decompress_filter.h
   - src/core/ext/filters/http/server/http_server_filter.h
   - src/core/ext/filters/max_age/max_age_filter.h
   - src/core/ext/filters/message_size/message_size_filter.h
@@ -795,6 +796,7 @@ libs:
   - src/core/ext/filters/http/client_authority_filter.cc
   - src/core/ext/filters/http/http_filters_plugin.cc
   - src/core/ext/filters/http/message_compress/message_compress_filter.cc
+  - src/core/ext/filters/http/message_compress/message_decompress_filter.cc
   - src/core/ext/filters/http/server/http_server_filter.cc
   - src/core/ext/filters/max_age/max_age_filter.cc
   - src/core/ext/filters/message_size/message_size_filter.cc
@@ -1325,6 +1327,7 @@ libs:
   - src/core/ext/filters/http/client/http_client_filter.h
   - src/core/ext/filters/http/client_authority_filter.h
   - src/core/ext/filters/http/message_compress/message_compress_filter.h
+  - src/core/ext/filters/http/message_compress/message_decompress_filter.h
   - src/core/ext/filters/http/server/http_server_filter.h
   - src/core/ext/filters/max_age/max_age_filter.h
   - src/core/ext/filters/message_size/message_size_filter.h
@@ -1632,6 +1635,7 @@ libs:
   - src/core/ext/filters/http/client_authority_filter.cc
   - src/core/ext/filters/http/http_filters_plugin.cc
   - src/core/ext/filters/http/message_compress/message_compress_filter.cc
+  - src/core/ext/filters/http/message_compress/message_decompress_filter.cc
   - src/core/ext/filters/http/server/http_server_filter.cc
   - src/core/ext/filters/max_age/max_age_filter.cc
   - src/core/ext/filters/message_size/message_size_filter.cc

+ 1 - 0
config.m4

@@ -104,6 +104,7 @@ if test "$PHP_GRPC" != "no"; then
     src/core/ext/filters/http/client_authority_filter.cc \
     src/core/ext/filters/http/http_filters_plugin.cc \
     src/core/ext/filters/http/message_compress/message_compress_filter.cc \
+    src/core/ext/filters/http/message_compress/message_decompress_filter.cc \
     src/core/ext/filters/http/server/http_server_filter.cc \
     src/core/ext/filters/max_age/max_age_filter.cc \
     src/core/ext/filters/message_size/message_size_filter.cc \

+ 1 - 0
config.w32

@@ -73,6 +73,7 @@ if (PHP_GRPC != "no") {
     "src\\core\\ext\\filters\\http\\client_authority_filter.cc " +
     "src\\core\\ext\\filters\\http\\http_filters_plugin.cc " +
     "src\\core\\ext\\filters\\http\\message_compress\\message_compress_filter.cc " +
+    "src\\core\\ext\\filters\\http\\message_compress\\message_decompress_filter.cc " +
     "src\\core\\ext\\filters\\http\\server\\http_server_filter.cc " +
     "src\\core\\ext\\filters\\max_age\\max_age_filter.cc " +
     "src\\core\\ext\\filters\\message_size\\message_size_filter.cc " +

+ 2 - 0
gRPC-C++.podspec

@@ -274,6 +274,7 @@ Pod::Spec.new do |s|
                       'src/core/ext/filters/http/client/http_client_filter.h',
                       'src/core/ext/filters/http/client_authority_filter.h',
                       'src/core/ext/filters/http/message_compress/message_compress_filter.h',
+                      'src/core/ext/filters/http/message_compress/message_decompress_filter.h',
                       'src/core/ext/filters/http/server/http_server_filter.h',
                       'src/core/ext/filters/max_age/max_age_filter.h',
                       'src/core/ext/filters/message_size/message_size_filter.h',
@@ -725,6 +726,7 @@ Pod::Spec.new do |s|
                               'src/core/ext/filters/http/client/http_client_filter.h',
                               'src/core/ext/filters/http/client_authority_filter.h',
                               'src/core/ext/filters/http/message_compress/message_compress_filter.h',
+                              'src/core/ext/filters/http/message_compress/message_decompress_filter.h',
                               'src/core/ext/filters/http/server/http_server_filter.h',
                               'src/core/ext/filters/max_age/max_age_filter.h',
                               'src/core/ext/filters/message_size/message_size_filter.h',

+ 3 - 0
gRPC-Core.podspec

@@ -301,6 +301,8 @@ Pod::Spec.new do |s|
                       'src/core/ext/filters/http/http_filters_plugin.cc',
                       'src/core/ext/filters/http/message_compress/message_compress_filter.cc',
                       'src/core/ext/filters/http/message_compress/message_compress_filter.h',
+                      'src/core/ext/filters/http/message_compress/message_decompress_filter.cc',
+                      'src/core/ext/filters/http/message_compress/message_decompress_filter.h',
                       'src/core/ext/filters/http/server/http_server_filter.cc',
                       'src/core/ext/filters/http/server/http_server_filter.h',
                       'src/core/ext/filters/max_age/max_age_filter.cc',
@@ -1078,6 +1080,7 @@ Pod::Spec.new do |s|
                               'src/core/ext/filters/http/client/http_client_filter.h',
                               'src/core/ext/filters/http/client_authority_filter.h',
                               'src/core/ext/filters/http/message_compress/message_compress_filter.h',
+                              'src/core/ext/filters/http/message_compress/message_decompress_filter.h',
                               'src/core/ext/filters/http/server/http_server_filter.h',
                               'src/core/ext/filters/max_age/max_age_filter.h',
                               'src/core/ext/filters/message_size/message_size_filter.h',

+ 2 - 0
grpc.gemspec

@@ -223,6 +223,8 @@ Gem::Specification.new do |s|
   s.files += %w( src/core/ext/filters/http/http_filters_plugin.cc )
   s.files += %w( src/core/ext/filters/http/message_compress/message_compress_filter.cc )
   s.files += %w( src/core/ext/filters/http/message_compress/message_compress_filter.h )
+  s.files += %w( src/core/ext/filters/http/message_compress/message_decompress_filter.cc )
+  s.files += %w( src/core/ext/filters/http/message_compress/message_decompress_filter.h )
   s.files += %w( src/core/ext/filters/http/server/http_server_filter.cc )
   s.files += %w( src/core/ext/filters/http/server/http_server_filter.h )
   s.files += %w( src/core/ext/filters/max_age/max_age_filter.cc )

+ 2 - 0
grpc.gyp

@@ -497,6 +497,7 @@
         'src/core/ext/filters/http/client_authority_filter.cc',
         'src/core/ext/filters/http/http_filters_plugin.cc',
         'src/core/ext/filters/http/message_compress/message_compress_filter.cc',
+        'src/core/ext/filters/http/message_compress/message_decompress_filter.cc',
         'src/core/ext/filters/http/server/http_server_filter.cc',
         'src/core/ext/filters/max_age/max_age_filter.cc',
         'src/core/ext/filters/message_size/message_size_filter.cc',
@@ -992,6 +993,7 @@
         'src/core/ext/filters/http/client_authority_filter.cc',
         'src/core/ext/filters/http/http_filters_plugin.cc',
         'src/core/ext/filters/http/message_compress/message_compress_filter.cc',
+        'src/core/ext/filters/http/message_compress/message_decompress_filter.cc',
         'src/core/ext/filters/http/server/http_server_filter.cc',
         'src/core/ext/filters/max_age/max_age_filter.cc',
         'src/core/ext/filters/message_size/message_size_filter.cc',

+ 5 - 0
include/grpc/impl/codegen/grpc_types.h

@@ -174,6 +174,11 @@ typedef struct {
 /** Enable/disable support for per-message compression. Defaults to 1, unless
     GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0. */
 #define GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION "grpc.per_message_compression"
+/** Experimental Arg. Enable/disable support for per-message decompression.
+   Defaults to 1. If disabled, decompression will not be performed and the
+   application will see the compressed message in the byte buffer. */
+#define GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION \
+  "grpc.per_message_decompression"
 /** Enable/disable support for deadline checking. Defaults to 1, unless
     GRPC_ARG_MINIMAL_STACK is enabled, in which case it defaults to 0 */
 #define GRPC_ARG_ENABLE_DEADLINE_CHECKS "grpc.enable_deadline_checking"

+ 2 - 0
package.xml

@@ -203,6 +203,8 @@
     <file baseinstalldir="/" name="src/core/ext/filters/http/http_filters_plugin.cc" role="src" />
     <file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/message_compress_filter.cc" role="src" />
     <file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/message_compress_filter.h" role="src" />
+    <file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/message_decompress_filter.cc" role="src" />
+    <file baseinstalldir="/" name="src/core/ext/filters/http/message_compress/message_decompress_filter.h" role="src" />
     <file baseinstalldir="/" name="src/core/ext/filters/http/server/http_server_filter.cc" role="src" />
     <file baseinstalldir="/" name="src/core/ext/filters/http/server/http_server_filter.h" role="src" />
     <file baseinstalldir="/" name="src/core/ext/filters/max_age/max_age_filter.cc" role="src" />

+ 25 - 10
src/core/ext/filters/http/http_filters_plugin.cc

@@ -22,6 +22,7 @@
 
 #include "src/core/ext/filters/http/client/http_client_filter.h"
 #include "src/core/ext/filters/http/message_compress/message_compress_filter.h"
+#include "src/core/ext/filters/http/message_compress/message_decompress_filter.h"
 #include "src/core/ext/filters/http/server/http_server_filter.h"
 #include "src/core/lib/channel/channel_stack_builder.h"
 #include "src/core/lib/surface/call.h"
@@ -36,12 +37,16 @@ typedef struct {
 static optional_filter compress_filter = {
     &grpc_message_compress_filter, GRPC_ARG_ENABLE_PER_MESSAGE_COMPRESSION};
 
+static optional_filter decompress_filter = {
+    &grpc_message_decompress_filter, GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION};
+
 static bool is_building_http_like_transport(
     grpc_channel_stack_builder* builder) {
   grpc_transport* t = grpc_channel_stack_builder_get_transport(builder);
   return t != nullptr && strstr(t->vtable->name, "http");
 }
 
+template <bool enable_in_minimal_stack>
 static bool maybe_add_optional_filter(grpc_channel_stack_builder* builder,
                                       void* arg) {
   if (!is_building_http_like_transport(builder)) return true;
@@ -50,7 +55,8 @@ static bool maybe_add_optional_filter(grpc_channel_stack_builder* builder,
       grpc_channel_stack_builder_get_channel_arguments(builder);
   bool enable = grpc_channel_arg_get_bool(
       grpc_channel_args_find(channel_args, filtarg->control_channel_arg),
-      !grpc_channel_args_want_minimal_stack(channel_args));
+      enable_in_minimal_stack ||
+          !grpc_channel_args_want_minimal_stack(channel_args));
   return enable ? grpc_channel_stack_builder_prepend_filter(
                       builder, filtarg->filter, nullptr, nullptr)
                 : true;
@@ -66,15 +72,24 @@ static bool maybe_add_required_filter(grpc_channel_stack_builder* builder,
 }
 
 void grpc_http_filters_init(void) {
-  grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
-                                   GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
-                                   maybe_add_optional_filter, &compress_filter);
-  grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
-                                   GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
-                                   maybe_add_optional_filter, &compress_filter);
-  grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
-                                   GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
-                                   maybe_add_optional_filter, &compress_filter);
+  grpc_channel_init_register_stage(
+      GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+      maybe_add_optional_filter<false>, &compress_filter);
+  grpc_channel_init_register_stage(
+      GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+      maybe_add_optional_filter<false>, &compress_filter);
+  grpc_channel_init_register_stage(
+      GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+      maybe_add_optional_filter<false>, &compress_filter);
+  grpc_channel_init_register_stage(
+      GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+      maybe_add_optional_filter<true>, &decompress_filter);
+  grpc_channel_init_register_stage(
+      GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+      maybe_add_optional_filter<true>, &decompress_filter);
+  grpc_channel_init_register_stage(
+      GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
+      maybe_add_optional_filter<true>, &decompress_filter);
   grpc_channel_init_register_stage(
       GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
       maybe_add_required_filter, (void*)&grpc_http_client_filter);

+ 358 - 0
src/core/ext/filters/http/message_compress/message_decompress_filter.cc

@@ -0,0 +1,358 @@
+//
+//
+// Copyright 2020 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+//
+
+#include <grpc/support/port_platform.h>
+
+#include <assert.h>
+#include <string.h>
+
+#include <grpc/compression.h>
+#include <grpc/slice_buffer.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+
+#include "src/core/ext/filters/http/message_compress/message_decompress_filter.h"
+#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/compression/algorithm_metadata.h"
+#include "src/core/lib/compression/compression_args.h"
+#include "src/core/lib/compression/compression_internal.h"
+#include "src/core/lib/compression/message_compress.h"
+#include "src/core/lib/gpr/string.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
+
+namespace {
+
+class ChannelData {};
+
+class CallData {
+ public:
+  explicit CallData(const grpc_call_element_args& args)
+      : call_combiner_(args.call_combiner) {
+    // Initialize state for recv_initial_metadata_ready callback
+    GRPC_CLOSURE_INIT(&on_recv_initial_metadata_ready_,
+                      OnRecvInitialMetadataReady, this,
+                      grpc_schedule_on_exec_ctx);
+    // Initialize state for recv_message_ready callback
+    grpc_slice_buffer_init(&recv_slices_);
+    GRPC_CLOSURE_INIT(&on_recv_message_next_done_, OnRecvMessageNextDone, this,
+                      grpc_schedule_on_exec_ctx);
+    GRPC_CLOSURE_INIT(&on_recv_message_ready_, OnRecvMessageReady, this,
+                      grpc_schedule_on_exec_ctx);
+    // Initialize state for recv_trailing_metadata_ready callback
+    GRPC_CLOSURE_INIT(&on_recv_trailing_metadata_ready_,
+                      OnRecvTrailingMetadataReady, this,
+                      grpc_schedule_on_exec_ctx);
+  }
+
+  ~CallData() { grpc_slice_buffer_destroy_internal(&recv_slices_); }
+
+  void DecompressStartTransportStreamOpBatch(
+      grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
+
+ private:
+  static void OnRecvInitialMetadataReady(void* arg, grpc_error* error);
+
+  // Methods for processing a receive message event
+  void MaybeResumeOnRecvMessageReady();
+  static void OnRecvMessageReady(void* arg, grpc_error* error);
+  static void OnRecvMessageNextDone(void* arg, grpc_error* error);
+  grpc_error* PullSliceFromRecvMessage();
+  void ContinueReadingRecvMessage();
+  void FinishRecvMessage();
+  void ContinueRecvMessageReadyCallback(grpc_error* error);
+
+  // Methods for processing a recv_trailing_metadata event
+  void MaybeResumeOnRecvTrailingMetadataReady();
+  static void OnRecvTrailingMetadataReady(void* arg, grpc_error* error);
+
+  grpc_core::CallCombiner* call_combiner_;
+  // Overall error for the call
+  grpc_error* error_ = GRPC_ERROR_NONE;
+  // Fields for handling recv_initial_metadata_ready callback
+  grpc_closure on_recv_initial_metadata_ready_;
+  grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
+  grpc_metadata_batch* recv_initial_metadata_ = nullptr;
+  // Fields for handling recv_message_ready callback
+  bool seen_recv_message_ready_ = false;
+  grpc_message_compression_algorithm algorithm_ = GRPC_MESSAGE_COMPRESS_NONE;
+  grpc_closure on_recv_message_ready_;
+  grpc_closure* original_recv_message_ready_ = nullptr;
+  grpc_closure on_recv_message_next_done_;
+  grpc_core::OrphanablePtr<grpc_core::ByteStream>* recv_message_ = nullptr;
+  // recv_slices_ holds the slices read from the original recv_message stream.
+  // It is initialized during construction and reset when a new stream is
+  // created using it.
+  grpc_slice_buffer recv_slices_;
+  std::aligned_storage<sizeof(grpc_core::SliceBufferByteStream),
+                       alignof(grpc_core::SliceBufferByteStream)>::type
+      recv_replacement_stream_;
+  // Fields for handling recv_trailing_metadata_ready callback
+  bool seen_recv_trailing_metadata_ready_ = false;
+  grpc_closure on_recv_trailing_metadata_ready_;
+  grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
+  grpc_error* on_recv_trailing_metadata_ready_error_ = GRPC_ERROR_NONE;
+};
+
+grpc_message_compression_algorithm DecodeMessageCompressionAlgorithm(
+    grpc_mdelem md) {
+  grpc_message_compression_algorithm algorithm =
+      grpc_message_compression_algorithm_from_slice(GRPC_MDVALUE(md));
+  if (algorithm == GRPC_MESSAGE_COMPRESS_ALGORITHMS_COUNT) {
+    char* md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+    gpr_log(GPR_ERROR,
+            "Invalid incoming message compression algorithm: '%s'. "
+            "Interpreting incoming data as uncompressed.",
+            md_c_str);
+    gpr_free(md_c_str);
+    return GRPC_MESSAGE_COMPRESS_NONE;
+  }
+  return algorithm;
+}
+
+void CallData::OnRecvInitialMetadataReady(void* arg, grpc_error* error) {
+  CallData* calld = static_cast<CallData*>(arg);
+  if (error == GRPC_ERROR_NONE) {
+    grpc_linked_mdelem* grpc_encoding =
+        calld->recv_initial_metadata_->idx.named.grpc_encoding;
+    if (grpc_encoding != nullptr) {
+      calld->algorithm_ = DecodeMessageCompressionAlgorithm(grpc_encoding->md);
+    }
+  }
+  calld->MaybeResumeOnRecvMessageReady();
+  calld->MaybeResumeOnRecvTrailingMetadataReady();
+  grpc_closure* closure = calld->original_recv_initial_metadata_ready_;
+  calld->original_recv_initial_metadata_ready_ = nullptr;
+  grpc_core::Closure::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(error));
+}
+
+void CallData::MaybeResumeOnRecvMessageReady() {
+  if (seen_recv_message_ready_) {
+    seen_recv_message_ready_ = false;
+    GRPC_CALL_COMBINER_START(call_combiner_, &on_recv_message_ready_,
+                             GRPC_ERROR_NONE,
+                             "continue recv_message_ready callback");
+  }
+}
+
+void CallData::OnRecvMessageReady(void* arg, grpc_error* error) {
+  CallData* calld = static_cast<CallData*>(arg);
+  if (error == GRPC_ERROR_NONE) {
+    if (calld->original_recv_initial_metadata_ready_ != nullptr) {
+      calld->seen_recv_message_ready_ = true;
+      GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
+                              "Deferring OnRecvMessageReady until after "
+                              "OnRecvInitialMetadataReady");
+      return;
+    }
+    if (calld->algorithm_ != GRPC_MESSAGE_COMPRESS_NONE) {
+      // recv_message can be NULL if trailing metadata is received instead of
+      // message, or it's possible that the message was not compressed.
+      if (*calld->recv_message_ == nullptr ||
+          (*calld->recv_message_)->length() == 0 ||
+          ((*calld->recv_message_)->flags() & GRPC_WRITE_INTERNAL_COMPRESS) ==
+              0) {
+        return calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_NONE);
+      }
+      grpc_slice_buffer_destroy_internal(&calld->recv_slices_);
+      grpc_slice_buffer_init(&calld->recv_slices_);
+      return calld->ContinueReadingRecvMessage();
+    }
+  }
+  calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error));
+}
+
+void CallData::ContinueReadingRecvMessage() {
+  while ((*recv_message_)
+             ->Next((*recv_message_)->length() - recv_slices_.length,
+                    &on_recv_message_next_done_)) {
+    grpc_error* error = PullSliceFromRecvMessage();
+    if (error != GRPC_ERROR_NONE) {
+      return ContinueRecvMessageReadyCallback(error);
+    }
+    // We have read the entire message.
+    if (recv_slices_.length == (*recv_message_)->length()) {
+      return FinishRecvMessage();
+    }
+  }
+}
+
+grpc_error* CallData::PullSliceFromRecvMessage() {
+  grpc_slice incoming_slice;
+  grpc_error* error = (*recv_message_)->Pull(&incoming_slice);
+  if (error == GRPC_ERROR_NONE) {
+    grpc_slice_buffer_add(&recv_slices_, incoming_slice);
+  }
+  return error;
+}
+
+void CallData::OnRecvMessageNextDone(void* arg, grpc_error* error) {
+  CallData* calld = static_cast<CallData*>(arg);
+  if (error != GRPC_ERROR_NONE) {
+    return calld->ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error));
+  }
+  error = calld->PullSliceFromRecvMessage();
+  if (error != GRPC_ERROR_NONE) {
+    return calld->ContinueRecvMessageReadyCallback(error);
+  }
+  if (calld->recv_slices_.length == (*calld->recv_message_)->length()) {
+    calld->FinishRecvMessage();
+  } else {
+    calld->ContinueReadingRecvMessage();
+  }
+}
+
+void CallData::FinishRecvMessage() {
+  grpc_slice_buffer decompressed_slices;
+  grpc_slice_buffer_init(&decompressed_slices);
+  if (grpc_msg_decompress(algorithm_, &recv_slices_, &decompressed_slices) ==
+      0) {
+    char* msg;
+    gpr_asprintf(
+        &msg,
+        "Unexpected error decompressing data for algorithm with enum value %d",
+        algorithm_);
+    GPR_DEBUG_ASSERT(error_ == GRPC_ERROR_NONE);
+    error_ = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg);
+    gpr_free(msg);
+    grpc_slice_buffer_destroy_internal(&decompressed_slices);
+  } else {
+    uint32_t recv_flags =
+        ((*recv_message_)->flags() & (~GRPC_WRITE_INTERNAL_COMPRESS)) |
+        GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED;
+    // Swap out the original receive byte stream with our new one and send the
+    // batch down.
+    // Initializing recv_replacement_stream_ with decompressed_slices removes
+    // all the slices from decompressed_slices leaving it empty.
+    new (&recv_replacement_stream_)
+        grpc_core::SliceBufferByteStream(&decompressed_slices, recv_flags);
+    recv_message_->reset(reinterpret_cast<grpc_core::SliceBufferByteStream*>(
+        &recv_replacement_stream_));
+    recv_message_ = nullptr;
+  }
+  ContinueRecvMessageReadyCallback(GRPC_ERROR_REF(error_));
+}
+
+void CallData::ContinueRecvMessageReadyCallback(grpc_error* error) {
+  MaybeResumeOnRecvTrailingMetadataReady();
+  // The surface will clean up the receiving stream if there is an error.
+  grpc_closure* closure = original_recv_message_ready_;
+  original_recv_message_ready_ = nullptr;
+  grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
+}
+
+void CallData::MaybeResumeOnRecvTrailingMetadataReady() {
+  if (seen_recv_trailing_metadata_ready_) {
+    seen_recv_trailing_metadata_ready_ = false;
+    grpc_error* error = on_recv_trailing_metadata_ready_error_;
+    on_recv_trailing_metadata_ready_error_ = GRPC_ERROR_NONE;
+    GRPC_CALL_COMBINER_START(call_combiner_, &on_recv_trailing_metadata_ready_,
+                             error, "Continuing OnRecvTrailingMetadataReady");
+  }
+}
+
+void CallData::OnRecvTrailingMetadataReady(void* arg, grpc_error* error) {
+  CallData* calld = static_cast<CallData*>(arg);
+  if (calld->original_recv_initial_metadata_ready_ != nullptr ||
+      calld->original_recv_message_ready_ != nullptr) {
+    calld->seen_recv_trailing_metadata_ready_ = true;
+    calld->on_recv_trailing_metadata_ready_error_ = GRPC_ERROR_REF(error);
+    GRPC_CALL_COMBINER_STOP(
+        calld->call_combiner_,
+        "Deferring OnRecvTrailingMetadataReady until after "
+        "OnRecvInitialMetadataReady and OnRecvMessageReady");
+    return;
+  }
+  error = grpc_error_add_child(GRPC_ERROR_REF(error), calld->error_);
+  calld->error_ = GRPC_ERROR_NONE;
+  grpc_closure* closure = calld->original_recv_trailing_metadata_ready_;
+  calld->original_recv_trailing_metadata_ready_ = nullptr;
+  grpc_core::Closure::Run(DEBUG_LOCATION, closure, error);
+}
+
+void CallData::DecompressStartTransportStreamOpBatch(
+    grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
+  // Handle recv_initial_metadata.
+  if (batch->recv_initial_metadata) {
+    recv_initial_metadata_ =
+        batch->payload->recv_initial_metadata.recv_initial_metadata;
+    original_recv_initial_metadata_ready_ =
+        batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
+    batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
+        &on_recv_initial_metadata_ready_;
+  }
+  // Handle recv_message
+  if (batch->recv_message) {
+    recv_message_ = batch->payload->recv_message.recv_message;
+    original_recv_message_ready_ =
+        batch->payload->recv_message.recv_message_ready;
+    batch->payload->recv_message.recv_message_ready = &on_recv_message_ready_;
+  }
+  // Handle recv_trailing_metadata
+  if (batch->recv_trailing_metadata) {
+    original_recv_trailing_metadata_ready_ =
+        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
+    batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
+        &on_recv_trailing_metadata_ready_;
+  }
+  // Pass control down the stack.
+  grpc_call_next_op(elem, batch);
+}
+
+void DecompressStartTransportStreamOpBatch(
+    grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
+  GPR_TIMER_SCOPE("decompress_start_transport_stream_op_batch", 0);
+  CallData* calld = static_cast<CallData*>(elem->call_data);
+  calld->DecompressStartTransportStreamOpBatch(elem, batch);
+}
+
+static grpc_error* DecompressInitCallElem(grpc_call_element* elem,
+                                          const grpc_call_element_args* args) {
+  new (elem->call_data) CallData(*args);
+  return GRPC_ERROR_NONE;
+}
+
+static void DecompressDestroyCallElem(
+    grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
+    grpc_closure* /*ignored*/) {
+  CallData* calld = static_cast<CallData*>(elem->call_data);
+  calld->~CallData();
+}
+
+static grpc_error* DecompressInitChannelElem(
+    grpc_channel_element* /*elem*/, grpc_channel_element_args* /*args*/) {
+  return GRPC_ERROR_NONE;
+}
+
+void DecompressDestroyChannelElem(grpc_channel_element* /*elem*/) {}
+
+}  // namespace
+
+const grpc_channel_filter grpc_message_decompress_filter = {
+    DecompressStartTransportStreamOpBatch,
+    grpc_channel_next_op,
+    sizeof(CallData),
+    DecompressInitCallElem,
+    grpc_call_stack_ignore_set_pollset_or_pollset_set,
+    DecompressDestroyCallElem,
+    0,  // sizeof(ChannelData)
+    DecompressInitChannelElem,
+    DecompressDestroyChannelElem,
+    grpc_channel_next_get_info,
+    "message_decompress"};

+ 29 - 0
src/core/ext/filters/http/message_compress/message_decompress_filter.h

@@ -0,0 +1,29 @@
+//
+//
+// Copyright 2020 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+//
+
+#ifndef GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H
+#define GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H
+
+#include <grpc/support/port_platform.h>
+
+#include "src/core/lib/channel/channel_stack.h"
+
+extern const grpc_channel_filter grpc_message_decompress_filter;
+
+#endif /* GRPC_CORE_EXT_FILTERS_HTTP_MESSAGE_COMPRESS_MESSAGE_DECOMPRESS_FILTER_H \
+        */

+ 2 - 47
src/core/lib/surface/byte_buffer_reader.cc

@@ -22,73 +22,28 @@
 #include <string.h>
 
 #include <grpc/byte_buffer.h>
-#include <grpc/compression.h>
 #include <grpc/grpc.h>
 #include <grpc/slice_buffer.h>
 #include <grpc/support/alloc.h>
 #include <grpc/support/log.h>
 
-#include "src/core/lib/compression/message_compress.h"
 #include "src/core/lib/iomgr/exec_ctx.h"
 #include "src/core/lib/slice/slice_internal.h"
 
-static int is_compressed(grpc_byte_buffer* buffer) {
-  switch (buffer->type) {
-    case GRPC_BB_RAW:
-      if (buffer->data.raw.compression == GRPC_COMPRESS_NONE) {
-        return 0 /* GPR_FALSE */;
-      }
-      break;
-  }
-  return 1 /* GPR_TRUE */;
-}
-
 int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
                                  grpc_byte_buffer* buffer) {
-  grpc_core::ExecCtx exec_ctx;
-  grpc_slice_buffer decompressed_slices_buffer;
   reader->buffer_in = buffer;
   switch (reader->buffer_in->type) {
     case GRPC_BB_RAW:
-      grpc_slice_buffer_init(&decompressed_slices_buffer);
-      if (is_compressed(reader->buffer_in)) {
-        if (grpc_msg_decompress(
-
-                grpc_compression_algorithm_to_message_compression_algorithm(
-                    reader->buffer_in->data.raw.compression),
-                &reader->buffer_in->data.raw.slice_buffer,
-                &decompressed_slices_buffer) == 0) {
-          gpr_log(GPR_ERROR,
-                  "Unexpected error decompressing data for algorithm with enum "
-                  "value '%d'.",
-                  reader->buffer_in->data.raw.compression);
-          memset(reader, 0, sizeof(*reader));
-          return 0;
-        } else { /* all fine */
-          reader->buffer_out =
-              grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices,
-                                          decompressed_slices_buffer.count);
-        }
-        grpc_slice_buffer_destroy_internal(&decompressed_slices_buffer);
-      } else { /* not compressed, use the input buffer as output */
-        reader->buffer_out = reader->buffer_in;
-      }
+      reader->buffer_out = reader->buffer_in;
       reader->current.index = 0;
       break;
   }
-
   return 1;
 }
 
 void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader* reader) {
-  switch (reader->buffer_in->type) {
-    case GRPC_BB_RAW:
-      /* keeping the same if-else structure as in the init function */
-      if (is_compressed(reader->buffer_in)) {
-        grpc_byte_buffer_destroy(reader->buffer_out);
-      }
-      break;
-  }
+  reader->buffer_out = nullptr;
 }
 
 int grpc_byte_buffer_reader_peek(grpc_byte_buffer_reader* reader,

+ 7 - 2
src/core/lib/transport/byte_stream.h

@@ -26,10 +26,15 @@
 #include "src/core/lib/iomgr/closure.h"
 
 /** Internal bit flag for grpc_begin_message's \a flags signaling the use of
- * compression for the message */
+ * compression for the message. (Does not apply for stream compression.) */
 #define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u)
+/** Internal bit flag for determining whether the message was compressed and had
+ * to be decompressed by the message_decompress filter. (Does not apply for
+ * stream compression.) */
+#define GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED (0x40000000u)
 /** Mask of all valid internal flags. */
-#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)
+#define GRPC_WRITE_INTERNAL_USED_MASK \
+  (GRPC_WRITE_INTERNAL_COMPRESS | GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED)
 
 namespace grpc_core {
 

+ 1 - 0
src/python/grpcio/grpc_core_dependencies.py

@@ -82,6 +82,7 @@ CORE_SOURCE_FILES = [
     'src/core/ext/filters/http/client_authority_filter.cc',
     'src/core/ext/filters/http/http_filters_plugin.cc',
     'src/core/ext/filters/http/message_compress/message_compress_filter.cc',
+    'src/core/ext/filters/http/message_compress/message_decompress_filter.cc',
     'src/core/ext/filters/http/server/http_server_filter.cc',
     'src/core/ext/filters/max_age/max_age_filter.cc',
     'src/core/ext/filters/message_size/message_size_filter.cc',

+ 17 - 13
test/core/channel/minimal_stack_is_minimal_test.cc

@@ -73,13 +73,15 @@ int main(int argc, char** argv) {
                         "authority", "connected", NULL);
   errors += CHECK_STACK("unknown", &minimal_stack_args, GRPC_SERVER_CHANNEL,
                         "server", "connected", NULL);
-  errors +=
-      CHECK_STACK("chttp2", &minimal_stack_args, GRPC_CLIENT_DIRECT_CHANNEL,
-                  "authority", "http-client", "connected", NULL);
+  errors += CHECK_STACK("chttp2", &minimal_stack_args,
+                        GRPC_CLIENT_DIRECT_CHANNEL, "authority", "http-client",
+                        "message_decompress", "connected", NULL);
   errors += CHECK_STACK("chttp2", &minimal_stack_args, GRPC_CLIENT_SUBCHANNEL,
-                        "authority", "http-client", "connected", NULL);
-  errors += CHECK_STACK("chttp2", &minimal_stack_args, GRPC_SERVER_CHANNEL,
-                        "server", "http-server", "connected", NULL);
+                        "authority", "http-client", "message_decompress",
+                        "connected", NULL);
+  errors +=
+      CHECK_STACK("chttp2", &minimal_stack_args, GRPC_SERVER_CHANNEL, "server",
+                  "http-server", "message_decompress", "connected", NULL);
   errors += CHECK_STACK(nullptr, &minimal_stack_args, GRPC_CLIENT_CHANNEL,
                         "client-channel", NULL);
 
@@ -91,15 +93,17 @@ int main(int argc, char** argv) {
                         "message_size", "connected", NULL);
   errors += CHECK_STACK("unknown", nullptr, GRPC_SERVER_CHANNEL, "server",
                         "message_size", "deadline", "connected", NULL);
-  errors += CHECK_STACK("chttp2", nullptr, GRPC_CLIENT_DIRECT_CHANNEL,
-                        "authority", "message_size", "deadline", "http-client",
-                        "message_compress", "connected", NULL);
+  errors +=
+      CHECK_STACK("chttp2", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, "authority",
+                  "message_size", "deadline", "http-client",
+                  "message_decompress", "message_compress", "connected", NULL);
   errors += CHECK_STACK("chttp2", nullptr, GRPC_CLIENT_SUBCHANNEL, "authority",
-                        "message_size", "http-client", "message_compress",
-                        "connected", NULL);
-  errors += CHECK_STACK("chttp2", nullptr, GRPC_SERVER_CHANNEL, "server",
-                        "message_size", "deadline", "http-server",
+                        "message_size", "http-client", "message_decompress",
                         "message_compress", "connected", NULL);
+  errors +=
+      CHECK_STACK("chttp2", nullptr, GRPC_SERVER_CHANNEL, "server",
+                  "message_size", "deadline", "http-server",
+                  "message_decompress", "message_compress", "connected", NULL);
   errors += CHECK_STACK(nullptr, nullptr, GRPC_CLIENT_CHANNEL, "client-channel",
                         NULL);
 

+ 18 - 24
test/core/end2end/cq_verifier.cc

@@ -29,6 +29,8 @@
 #include <grpc/support/log.h>
 #include <grpc/support/string_util.h>
 #include <grpc/support/time.h>
+#include "src/core/lib/compression/compression_internal.h"
+#include "src/core/lib/compression/message_compress.h"
 #include "src/core/lib/gpr/string.h"
 #include "src/core/lib/surface/event_string.h"
 
@@ -145,33 +147,25 @@ int raw_byte_buffer_eq_slice(grpc_byte_buffer* rbb, grpc_slice b) {
 }
 
 int byte_buffer_eq_slice(grpc_byte_buffer* bb, grpc_slice b) {
-  grpc_byte_buffer_reader reader;
-  grpc_byte_buffer* rbb;
-  int res;
-
-  GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, bb) &&
-             "Couldn't init byte buffer reader");
-  rbb = grpc_raw_byte_buffer_from_reader(&reader);
-  res = raw_byte_buffer_eq_slice(rbb, b);
-  grpc_byte_buffer_reader_destroy(&reader);
-  grpc_byte_buffer_destroy(rbb);
-
-  return res;
+  if (bb->data.raw.compression > GRPC_COMPRESS_NONE) {
+    grpc_slice_buffer decompressed_buffer;
+    grpc_slice_buffer_init(&decompressed_buffer);
+    GPR_ASSERT(grpc_msg_decompress(
+        grpc_compression_algorithm_to_message_compression_algorithm(
+            bb->data.raw.compression),
+        &bb->data.raw.slice_buffer, &decompressed_buffer));
+    grpc_byte_buffer* rbb = grpc_raw_byte_buffer_create(
+        decompressed_buffer.slices, decompressed_buffer.count);
+    int ret_val = raw_byte_buffer_eq_slice(rbb, b);
+    grpc_byte_buffer_destroy(rbb);
+    grpc_slice_buffer_destroy(&decompressed_buffer);
+    return ret_val;
+  }
+  return raw_byte_buffer_eq_slice(bb, b);
 }
 
 int byte_buffer_eq_string(grpc_byte_buffer* bb, const char* str) {
-  grpc_byte_buffer_reader reader;
-  grpc_byte_buffer* rbb;
-  int res;
-
-  GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, bb) &&
-             "Couldn't init byte buffer reader");
-  rbb = grpc_raw_byte_buffer_from_reader(&reader);
-  res = raw_byte_buffer_eq_slice(rbb, grpc_slice_from_copied_string(str));
-  grpc_byte_buffer_reader_destroy(&reader);
-  grpc_byte_buffer_destroy(rbb);
-
-  return res;
+  return byte_buffer_eq_slice(bb, grpc_slice_from_copied_string(str));
 }
 
 static bool is_probably_integer(void* p) { return ((uintptr_t)p) < 1000000; }

+ 82 - 35
test/core/end2end/tests/compressed_payload.cc

@@ -41,9 +41,12 @@ static void* tag(intptr_t t) { return (void*)t; }
 static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
                                             const char* test_name,
                                             grpc_channel_args* client_args,
-                                            grpc_channel_args* server_args) {
+                                            grpc_channel_args* server_args,
+                                            bool decompress_in_core) {
   grpc_end2end_test_fixture f;
-  gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
+  gpr_log(GPR_INFO, "Running test: %s%s/%s", test_name,
+          decompress_in_core ? "" : "_with_decompression_disabled",
+          config.name);
   f = config.create_fixture(client_args, server_args);
   config.init_server(&f, server_args);
   config.init_client(&f, client_args);
@@ -97,7 +100,8 @@ static void request_for_disabled_algorithm(
     uint32_t send_flags_bitmask,
     grpc_compression_algorithm algorithm_to_disable,
     grpc_compression_algorithm requested_client_compression_algorithm,
-    grpc_status_code expected_error, grpc_metadata* client_metadata) {
+    grpc_status_code expected_error, grpc_metadata* client_metadata,
+    bool decompress_in_core) {
   grpc_call* c;
   grpc_call* s;
   grpc_slice request_payload_slice;
@@ -128,13 +132,24 @@ static void request_for_disabled_algorithm(
       nullptr, requested_client_compression_algorithm);
   server_args = grpc_channel_args_set_channel_default_compression_algorithm(
       nullptr, GRPC_COMPRESS_NONE);
-  {
-    grpc_core::ExecCtx exec_ctx;
-    server_args = grpc_channel_args_compression_algorithm_set_state(
-        &server_args, algorithm_to_disable, false);
+  server_args = grpc_channel_args_compression_algorithm_set_state(
+      &server_args, algorithm_to_disable, false);
+  if (!decompress_in_core) {
+    grpc_arg disable_decompression_in_core_arg =
+        grpc_channel_arg_integer_create(
+            const_cast<char*>(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION), 0);
+    grpc_channel_args* old_client_args = client_args;
+    grpc_channel_args* old_server_args = server_args;
+    client_args = grpc_channel_args_copy_and_add(
+        client_args, &disable_decompression_in_core_arg, 1);
+    server_args = grpc_channel_args_copy_and_add(
+        server_args, &disable_decompression_in_core_arg, 1);
+    grpc_channel_args_destroy(old_client_args);
+    grpc_channel_args_destroy(old_server_args);
   }
 
-  f = begin_test(config, test_name, client_args, server_args);
+  f = begin_test(config, test_name, client_args, server_args,
+                 decompress_in_core);
   cqv = cq_verifier_create(f.cq);
 
   gpr_timespec deadline = five_seconds_from_now();
@@ -253,18 +268,13 @@ static void request_for_disabled_algorithm(
   grpc_slice_unref(request_payload_slice);
   grpc_byte_buffer_destroy(request_payload);
   grpc_byte_buffer_destroy(request_payload_recv);
-
-  {
-    grpc_core::ExecCtx exec_ctx;
-    grpc_channel_args_destroy(client_args);
-    grpc_channel_args_destroy(server_args);
-  }
-
+  grpc_channel_args_destroy(client_args);
+  grpc_channel_args_destroy(server_args);
   end_test(&f);
   config.tear_down_data(&f);
 }
 
-static void request_with_payload_template(
+static void request_with_payload_template_inner(
     grpc_end2end_test_config config, const char* test_name,
     uint32_t client_send_flags_bitmask,
     grpc_compression_algorithm default_client_channel_compression_algorithm,
@@ -273,7 +283,7 @@ static void request_with_payload_template(
     grpc_compression_algorithm expected_algorithm_from_server,
     grpc_metadata* client_init_metadata, bool set_server_level,
     grpc_compression_level server_compression_level,
-    bool send_message_before_initial_metadata) {
+    bool send_message_before_initial_metadata, bool decompress_in_core) {
   grpc_call* c;
   grpc_call* s;
   grpc_slice request_payload_slice;
@@ -312,8 +322,21 @@ static void request_with_payload_template(
       nullptr, default_client_channel_compression_algorithm);
   server_args = grpc_channel_args_set_channel_default_compression_algorithm(
       nullptr, default_server_channel_compression_algorithm);
-
-  f = begin_test(config, test_name, client_args, server_args);
+  if (!decompress_in_core) {
+    grpc_arg disable_decompression_in_core_arg =
+        grpc_channel_arg_integer_create(
+            const_cast<char*>(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION), 0);
+    grpc_channel_args* old_client_args = client_args;
+    grpc_channel_args* old_server_args = server_args;
+    client_args = grpc_channel_args_copy_and_add(
+        client_args, &disable_decompression_in_core_arg, 1);
+    server_args = grpc_channel_args_copy_and_add(
+        server_args, &disable_decompression_in_core_arg, 1);
+    grpc_channel_args_destroy(old_client_args);
+    grpc_channel_args_destroy(old_server_args);
+  }
+  f = begin_test(config, test_name, client_args, server_args,
+                 decompress_in_core);
   cqv = cq_verifier_create(f.cq);
 
   gpr_timespec deadline = five_seconds_from_now();
@@ -341,7 +364,6 @@ static void request_with_payload_template(
     GPR_ASSERT(GRPC_CALL_OK == error);
     CQ_EXPECT_COMPLETION(cqv, tag(2), true);
   }
-
   memset(ops, 0, sizeof(ops));
   op = ops;
   op->op = GRPC_OP_SEND_INITIAL_METADATA;
@@ -385,7 +407,6 @@ static void request_with_payload_template(
                         GRPC_COMPRESS_DEFLATE) != 0);
   GPR_ASSERT(GPR_BITGET(grpc_call_test_only_get_encodings_accepted_by_peer(s),
                         GRPC_COMPRESS_GZIP) != 0);
-
   memset(ops, 0, sizeof(ops));
   op = ops;
   op->op = GRPC_OP_SEND_INITIAL_METADATA;
@@ -406,7 +427,6 @@ static void request_with_payload_template(
   error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(101),
                                 nullptr);
   GPR_ASSERT(GRPC_CALL_OK == error);
-
   for (int i = 0; i < 2; i++) {
     response_payload = grpc_raw_byte_buffer_create(&response_payload_slice, 1);
 
@@ -442,7 +462,8 @@ static void request_with_payload_template(
     GPR_ASSERT(request_payload_recv->type == GRPC_BB_RAW);
     GPR_ASSERT(byte_buffer_eq_string(request_payload_recv, request_str));
     GPR_ASSERT(request_payload_recv->data.raw.compression ==
-               expected_algorithm_from_client);
+               (decompress_in_core ? GRPC_COMPRESS_NONE
+                                   : expected_algorithm_from_client));
 
     memset(ops, 0, sizeof(ops));
     op = ops;
@@ -475,11 +496,13 @@ static void request_with_payload_template(
     if (server_compression_level > GRPC_COMPRESS_LEVEL_NONE) {
       const grpc_compression_algorithm algo_for_server_level =
           grpc_call_compression_for_level(s, server_compression_level);
-      GPR_ASSERT(response_payload_recv->data.raw.compression ==
-                 algo_for_server_level);
+      GPR_ASSERT(
+          response_payload_recv->data.raw.compression ==
+          (decompress_in_core ? GRPC_COMPRESS_NONE : algo_for_server_level));
     } else {
       GPR_ASSERT(response_payload_recv->data.raw.compression ==
-                 expected_algorithm_from_server);
+                 (decompress_in_core ? GRPC_COMPRESS_NONE
+                                     : expected_algorithm_from_server));
     }
 
     grpc_byte_buffer_destroy(request_payload);
@@ -487,7 +510,6 @@ static void request_with_payload_template(
     grpc_byte_buffer_destroy(request_payload_recv);
     grpc_byte_buffer_destroy(response_payload_recv);
   }
-
   grpc_slice_unref(request_payload_slice);
   grpc_slice_unref(response_payload_slice);
 
@@ -536,17 +558,38 @@ static void request_with_payload_template(
   grpc_call_unref(s);
 
   cq_verifier_destroy(cqv);
-
-  {
-    grpc_core::ExecCtx exec_ctx;
-    grpc_channel_args_destroy(client_args);
-    grpc_channel_args_destroy(server_args);
-  }
-
+  grpc_channel_args_destroy(client_args);
+  grpc_channel_args_destroy(server_args);
   end_test(&f);
   config.tear_down_data(&f);
 }
 
+static void request_with_payload_template(
+    grpc_end2end_test_config config, const char* test_name,
+    uint32_t client_send_flags_bitmask,
+    grpc_compression_algorithm default_client_channel_compression_algorithm,
+    grpc_compression_algorithm default_server_channel_compression_algorithm,
+    grpc_compression_algorithm expected_algorithm_from_client,
+    grpc_compression_algorithm expected_algorithm_from_server,
+    grpc_metadata* client_init_metadata, bool set_server_level,
+    grpc_compression_level server_compression_level,
+    bool send_message_before_initial_metadata) {
+  request_with_payload_template_inner(
+      config, test_name, client_send_flags_bitmask,
+      default_client_channel_compression_algorithm,
+      default_server_channel_compression_algorithm,
+      expected_algorithm_from_client, expected_algorithm_from_server,
+      client_init_metadata, set_server_level, server_compression_level,
+      send_message_before_initial_metadata, false);
+  request_with_payload_template_inner(
+      config, test_name, client_send_flags_bitmask,
+      default_client_channel_compression_algorithm,
+      default_server_channel_compression_algorithm,
+      expected_algorithm_from_client, expected_algorithm_from_server,
+      client_init_metadata, set_server_level, server_compression_level,
+      send_message_before_initial_metadata, true);
+}
+
 static void test_invoke_request_with_exceptionally_uncompressed_payload(
     grpc_end2end_test_config config) {
   request_with_payload_template(
@@ -634,7 +677,11 @@ static void test_invoke_request_with_disabled_algorithm(
   request_for_disabled_algorithm(config,
                                  "test_invoke_request_with_disabled_algorithm",
                                  0, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP,
-                                 GRPC_STATUS_UNIMPLEMENTED, nullptr);
+                                 GRPC_STATUS_UNIMPLEMENTED, nullptr, false);
+  request_for_disabled_algorithm(config,
+                                 "test_invoke_request_with_disabled_algorithm",
+                                 0, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP,
+                                 GRPC_STATUS_UNIMPLEMENTED, nullptr, true);
 }
 
 void compressed_payload(grpc_end2end_test_config config) {

+ 32 - 15
test/core/end2end/tests/workaround_cronet_compression.cc

@@ -100,8 +100,8 @@ static void request_with_payload_template(
     grpc_compression_algorithm expected_algorithm_from_client,
     grpc_compression_algorithm expected_algorithm_from_server,
     grpc_metadata* client_init_metadata, bool set_server_level,
-    grpc_compression_level server_compression_level,
-    char* user_agent_override) {
+    grpc_compression_level server_compression_level, char* user_agent_override,
+    bool decompress_in_core) {
   grpc_call* c;
   grpc_call* s;
   grpc_slice request_payload_slice;
@@ -140,9 +140,21 @@ static void request_with_payload_template(
       nullptr, default_client_channel_compression_algorithm);
   server_args = grpc_channel_args_set_channel_default_compression_algorithm(
       nullptr, default_server_channel_compression_algorithm);
+  if (!decompress_in_core) {
+    grpc_arg disable_decompression_in_core_arg =
+        grpc_channel_arg_integer_create(
+            const_cast<char*>(GRPC_ARG_ENABLE_PER_MESSAGE_DECOMPRESSION), 0);
+    grpc_channel_args* old_client_args = client_args;
+    grpc_channel_args* old_server_args = server_args;
+    client_args = grpc_channel_args_copy_and_add(
+        client_args, &disable_decompression_in_core_arg, 1);
+    server_args = grpc_channel_args_copy_and_add(
+        server_args, &disable_decompression_in_core_arg, 1);
+    grpc_channel_args_destroy(old_client_args);
+    grpc_channel_args_destroy(old_server_args);
+  }
 
   if (user_agent_override) {
-    grpc_core::ExecCtx exec_ctx;
     grpc_channel_args* client_args_old = client_args;
     grpc_arg arg;
     arg.key = const_cast<char*>(GRPC_ARG_PRIMARY_USER_AGENT_STRING);
@@ -267,7 +279,8 @@ static void request_with_payload_template(
     GPR_ASSERT(request_payload_recv->type == GRPC_BB_RAW);
     GPR_ASSERT(byte_buffer_eq_string(request_payload_recv, request_str));
     GPR_ASSERT(request_payload_recv->data.raw.compression ==
-               expected_algorithm_from_client);
+               (decompress_in_core ? GRPC_COMPRESS_NONE
+                                   : expected_algorithm_from_client));
 
     memset(ops, 0, sizeof(ops));
     op = ops;
@@ -288,11 +301,13 @@ static void request_with_payload_template(
     if (server_compression_level > GRPC_COMPRESS_LEVEL_NONE) {
       const grpc_compression_algorithm algo_for_server_level =
           grpc_call_compression_for_level(s, server_compression_level);
-      GPR_ASSERT(response_payload_recv->data.raw.compression ==
-                 algo_for_server_level);
+      GPR_ASSERT(
+          response_payload_recv->data.raw.compression ==
+          (decompress_in_core ? GRPC_COMPRESS_NONE : algo_for_server_level));
     } else {
       GPR_ASSERT(response_payload_recv->data.raw.compression ==
-                 expected_algorithm_from_server);
+                 (decompress_in_core ? GRPC_COMPRESS_NONE
+                                     : expected_algorithm_from_server));
     }
 
     grpc_byte_buffer_destroy(request_payload);
@@ -349,13 +364,8 @@ static void request_with_payload_template(
   grpc_call_unref(s);
 
   cq_verifier_destroy(cqv);
-
-  {
-    grpc_core::ExecCtx exec_ctx;
-    grpc_channel_args_destroy(client_args);
-    grpc_channel_args_destroy(server_args);
-  }
-
+  grpc_channel_args_destroy(client_args);
+  grpc_channel_args_destroy(server_args);
   end_test(&f);
   config.tear_down_data(&f);
 }
@@ -387,7 +397,14 @@ static void test_workaround_cronet_compression(
         GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP,
         workaround_configs[i].expected_algorithm_from_server, nullptr, false,
         /* ignored */ GRPC_COMPRESS_LEVEL_NONE,
-        workaround_configs[i].user_agent_override);
+        workaround_configs[i].user_agent_override, true);
+    request_with_payload_template(
+        config,
+        "test_invoke_request_with_compressed_payload_with_compression_disabled",
+        0, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP, GRPC_COMPRESS_GZIP,
+        workaround_configs[i].expected_algorithm_from_server, nullptr, false,
+        /* ignored */ GRPC_COMPRESS_LEVEL_NONE,
+        workaround_configs[i].user_agent_override, false);
   }
 }
 

+ 0 - 73
test/core/surface/byte_buffer_reader_test.cc

@@ -25,7 +25,6 @@
 #include <grpc/support/log.h>
 #include <grpc/support/time.h>
 
-#include "src/core/lib/compression/message_compress.h"
 #include "src/core/lib/gprpp/thd.h"
 #include "src/core/lib/iomgr/exec_ctx.h"
 #include "test/core/util/test_config.h"
@@ -168,75 +167,6 @@ static void test_peek_none_compressed_slice(void) {
   grpc_byte_buffer_destroy(buffer);
 }
 
-static void test_read_corrupted_slice(void) {
-  grpc_slice slice;
-  grpc_byte_buffer* buffer;
-  grpc_byte_buffer_reader reader;
-
-  LOG_TEST("test_read_corrupted_slice");
-  slice = grpc_slice_from_copied_string("test");
-  buffer = grpc_raw_byte_buffer_create(&slice, 1);
-  buffer->data.raw.compression = GRPC_COMPRESS_GZIP; /* lies! */
-  grpc_slice_unref(slice);
-  GPR_ASSERT(!grpc_byte_buffer_reader_init(&reader, buffer));
-  grpc_byte_buffer_destroy(buffer);
-}
-
-static void read_compressed_slice(grpc_compression_algorithm algorithm,
-                                  size_t input_size) {
-  grpc_slice input_slice;
-  grpc_slice_buffer sliceb_in;
-  grpc_slice_buffer sliceb_out;
-  grpc_byte_buffer* buffer;
-  grpc_byte_buffer_reader reader;
-  grpc_slice read_slice;
-  size_t read_count = 0;
-
-  grpc_slice_buffer_init(&sliceb_in);
-  grpc_slice_buffer_init(&sliceb_out);
-
-  input_slice = grpc_slice_malloc(input_size);
-  memset(GRPC_SLICE_START_PTR(input_slice), 'a', input_size);
-  grpc_slice_buffer_add(&sliceb_in, input_slice); /* takes ownership */
-  {
-    grpc_core::ExecCtx exec_ctx;
-    GPR_ASSERT(grpc_msg_compress(
-
-        grpc_compression_algorithm_to_message_compression_algorithm(algorithm),
-        &sliceb_in, &sliceb_out));
-  }
-
-  buffer = grpc_raw_compressed_byte_buffer_create(sliceb_out.slices,
-                                                  sliceb_out.count, algorithm);
-  GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, buffer) &&
-             "Couldn't init byte buffer reader");
-
-  while (grpc_byte_buffer_reader_next(&reader, &read_slice)) {
-    GPR_ASSERT(memcmp(GRPC_SLICE_START_PTR(read_slice),
-                      GRPC_SLICE_START_PTR(input_slice) + read_count,
-                      GRPC_SLICE_LENGTH(read_slice)) == 0);
-    read_count += GRPC_SLICE_LENGTH(read_slice);
-    grpc_slice_unref(read_slice);
-  }
-  GPR_ASSERT(read_count == input_size);
-  grpc_byte_buffer_reader_destroy(&reader);
-  grpc_byte_buffer_destroy(buffer);
-  grpc_slice_buffer_destroy(&sliceb_out);
-  grpc_slice_buffer_destroy(&sliceb_in);
-}
-
-static void test_read_gzip_compressed_slice(void) {
-  const size_t INPUT_SIZE = 2048;
-  LOG_TEST("test_read_gzip_compressed_slice");
-  read_compressed_slice(GRPC_COMPRESS_GZIP, INPUT_SIZE);
-}
-
-static void test_read_deflate_compressed_slice(void) {
-  const size_t INPUT_SIZE = 2048;
-  LOG_TEST("test_read_deflate_compressed_slice");
-  read_compressed_slice(GRPC_COMPRESS_DEFLATE, INPUT_SIZE);
-}
-
 static void test_byte_buffer_from_reader(void) {
   grpc_slice slice;
   grpc_byte_buffer *buffer, *buffer_from_reader;
@@ -342,9 +272,6 @@ int main(int argc, char** argv) {
   test_peek_one_slice();
   test_peek_one_slice_malloc();
   test_peek_none_compressed_slice();
-  test_read_gzip_compressed_slice();
-  test_read_deflate_compressed_slice();
-  test_read_corrupted_slice();
   test_byte_buffer_from_reader();
   test_byte_buffer_copy();
   test_readall();

+ 6 - 2
test/cpp/interop/client_helper.h

@@ -27,6 +27,7 @@
 #include <grpcpp/client_context.h>
 
 #include "src/core/lib/surface/call_test_only.h"
+#include "src/core/lib/transport/byte_stream.h"
 
 namespace grpc {
 namespace testing {
@@ -54,8 +55,11 @@ class InteropClientContextInspector {
     return grpc_call_test_only_get_compression_algorithm(context_.call_);
   }
 
-  uint32_t GetMessageFlags() const {
-    return grpc_call_test_only_get_message_flags(context_.call_);
+  bool WasCompressed() const {
+    return (grpc_call_test_only_get_message_flags(context_.call_) &
+            GRPC_WRITE_INTERNAL_COMPRESS) ||
+           (grpc_call_test_only_get_message_flags(context_.call_) &
+            GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED);
   }
 
  private:

+ 4 - 5
test/cpp/interop/interop_client.cc

@@ -30,7 +30,6 @@
 #include <grpcpp/client_context.h>
 #include <grpcpp/security/credentials.h>
 
-#include "src/core/lib/transport/byte_stream.h"
 #include "src/proto/grpc/testing/empty.pb.h"
 #include "src/proto/grpc/testing/messages.pb.h"
 #include "src/proto/grpc/testing/test.grpc.pb.h"
@@ -67,10 +66,10 @@ void UnaryCompressionChecks(const InteropClientContextInspector& inspector,
               "from server.");
       abort();
     }
-    GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS);
+    GPR_ASSERT(inspector.WasCompressed());
   } else {
     // Didn't request compression -> make sure the response is uncompressed
-    GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
+    GPR_ASSERT(!(inspector.WasCompressed()));
   }
 }
 }  // namespace
@@ -577,10 +576,10 @@ bool InteropClient::DoServerCompressedStreaming() {
     GPR_ASSERT(request.response_parameters(k).has_compressed());
     if (request.response_parameters(k).compressed().value()) {
       GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE);
-      GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS);
+      GPR_ASSERT(inspector.WasCompressed());
     } else {
       // requested *no* compression.
-      GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
+      GPR_ASSERT(!(inspector.WasCompressed()));
     }
     ++k;
   }

+ 2 - 3
test/cpp/interop/interop_server.cc

@@ -31,7 +31,6 @@
 #include <grpcpp/server_context.h>
 
 #include "src/core/lib/gpr/string.h"
-#include "src/core/lib/transport/byte_stream.h"
 #include "src/proto/grpc/testing/empty.pb.h"
 #include "src/proto/grpc/testing/messages.pb.h"
 #include "src/proto/grpc/testing/test.grpc.pb.h"
@@ -118,7 +117,7 @@ bool CheckExpectedCompression(const ServerContext& context,
               "Expected compression but got uncompressed request from client.");
       return false;
     }
-    if (!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS)) {
+    if (!(inspector.WasCompressed())) {
       gpr_log(GPR_ERROR,
               "Failure: Requested compression in a compressable request, but "
               "compression bit in message flags not set.");
@@ -126,7 +125,7 @@ bool CheckExpectedCompression(const ServerContext& context,
     }
   } else {
     // Didn't expect compression -> make sure the request is uncompressed
-    if (inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS) {
+    if (inspector.WasCompressed()) {
       gpr_log(GPR_ERROR,
               "Failure: Didn't requested compression, but compression bit in "
               "message flags set.");

+ 6 - 2
test/cpp/interop/server_helper.cc

@@ -24,6 +24,7 @@
 #include <grpcpp/security/server_credentials.h>
 
 #include "src/core/lib/surface/call_test_only.h"
+#include "src/core/lib/transport/byte_stream.h"
 #include "test/cpp/util/test_credentials_provider.h"
 
 DECLARE_bool(use_alts);
@@ -60,8 +61,11 @@ uint32_t InteropServerContextInspector::GetEncodingsAcceptedByClient() const {
   return grpc_call_test_only_get_encodings_accepted_by_peer(context_.call_);
 }
 
-uint32_t InteropServerContextInspector::GetMessageFlags() const {
-  return grpc_call_test_only_get_message_flags(context_.call_);
+bool InteropServerContextInspector::WasCompressed() const {
+  return (grpc_call_test_only_get_message_flags(context_.call_) &
+          GRPC_WRITE_INTERNAL_COMPRESS) ||
+         (grpc_call_test_only_get_message_flags(context_.call_) &
+          GRPC_WRITE_INTERNAL_TEST_ONLY_WAS_COMPRESSED);
 }
 
 std::shared_ptr<const AuthContext>

+ 1 - 1
test/cpp/interop/server_helper.h

@@ -44,7 +44,7 @@ class InteropServerContextInspector {
   bool IsCancelled() const;
   grpc_compression_algorithm GetCallCompressionAlgorithm() const;
   uint32_t GetEncodingsAcceptedByClient() const;
-  uint32_t GetMessageFlags() const;
+  bool WasCompressed() const;
 
  private:
   const ::grpc::ServerContext& context_;

+ 2 - 0
tools/doxygen/Doxyfile.c++.internal

@@ -1186,6 +1186,8 @@ src/core/ext/filters/http/client_authority_filter.h \
 src/core/ext/filters/http/http_filters_plugin.cc \
 src/core/ext/filters/http/message_compress/message_compress_filter.cc \
 src/core/ext/filters/http/message_compress/message_compress_filter.h \
+src/core/ext/filters/http/message_compress/message_decompress_filter.cc \
+src/core/ext/filters/http/message_compress/message_decompress_filter.h \
 src/core/ext/filters/http/server/http_server_filter.cc \
 src/core/ext/filters/http/server/http_server_filter.h \
 src/core/ext/filters/max_age/max_age_filter.cc \

+ 2 - 0
tools/doxygen/Doxyfile.core.internal

@@ -986,6 +986,8 @@ src/core/ext/filters/http/client_authority_filter.h \
 src/core/ext/filters/http/http_filters_plugin.cc \
 src/core/ext/filters/http/message_compress/message_compress_filter.cc \
 src/core/ext/filters/http/message_compress/message_compress_filter.h \
+src/core/ext/filters/http/message_compress/message_decompress_filter.cc \
+src/core/ext/filters/http/message_compress/message_decompress_filter.h \
 src/core/ext/filters/http/server/http_server_filter.cc \
 src/core/ext/filters/http/server/http_server_filter.h \
 src/core/ext/filters/max_age/max_age_filter.cc \