Prechádzať zdrojové kódy

Add cancellation notification

Yash Tibrewal 7 rokov pred
rodič
commit
fd88dcaf55

+ 99 - 0
include/grpcpp/impl/codegen/interceptor_common.h

@@ -352,6 +352,105 @@ class InterceptorBatchMethodsImpl
   MetadataMap* recv_trailing_metadata_ = nullptr;
 };
 
+// A special implementation of InterceptorBatchMethods to send a Cancel
+// notification down the interceptor stack
+class CancelInterceptorBatchMethods
+    : public experimental::InterceptorBatchMethods {
+ public:
+  bool QueryInterceptionHookPoint(
+      experimental::InterceptionHookPoints type) override {
+    if (type == experimental::InterceptionHookPoints::PRE_SEND_CANCEL) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  void Proceed() override {
+    // This is a no-op. For actual continuation of the RPC simply needs to
+    // return from the Intercept method
+  }
+
+  void Hijack() override {
+    // Only the client can hijack when sending down initial metadata
+    GPR_CODEGEN_ASSERT(false &&
+                       "It is illegal to call Hijack on a method which has a "
+                       "Cancel notification");
+  }
+
+  ByteBuffer* GetSendMessage() override {
+    GPR_CODEGEN_ASSERT(false &&
+                       "It is illegal to call GetSendMessage on a method which "
+                       "has a Cancel notification");
+    return nullptr;
+  }
+
+  std::multimap<grpc::string, grpc::string>* GetSendInitialMetadata() override {
+    GPR_CODEGEN_ASSERT(false &&
+                       "It is illegal to call GetSendInitialMetadata on a "
+                       "method which has a Cancel notification");
+    return nullptr;
+  }
+
+  Status GetSendStatus() override {
+    GPR_CODEGEN_ASSERT(false &&
+                       "It is illegal to call GetSendStatus on a method which "
+                       "has a Cancel notification");
+    return Status();
+  }
+
+  void ModifySendStatus(const Status& status) override {
+    GPR_CODEGEN_ASSERT(false &&
+                       "It is illegal to call ModifySendStatus on a method "
+                       "which has a Cancel notification");
+    return;
+  }
+
+  std::multimap<grpc::string, grpc::string>* GetSendTrailingMetadata()
+      override {
+    GPR_CODEGEN_ASSERT(false &&
+                       "It is illegal to call GetSendTrailingMetadata on a "
+                       "method which has a Cancel notification");
+    return nullptr;
+  }
+
+  void* GetRecvMessage() override {
+    GPR_CODEGEN_ASSERT(false &&
+                       "It is illegal to call GetRecvMessage on a method which "
+                       "has a Cancel notification");
+    return nullptr;
+  }
+
+  std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvInitialMetadata()
+      override {
+    GPR_CODEGEN_ASSERT(false &&
+                       "It is illegal to call GetRecvInitialMetadata on a "
+                       "method which has a Cancel notification");
+    return nullptr;
+  }
+
+  Status* GetRecvStatus() override {
+    GPR_CODEGEN_ASSERT(false &&
+                       "It is illegal to call GetRecvStatus on a method which "
+                       "has a Cancel notification");
+    return nullptr;
+  }
+
+  std::multimap<grpc::string_ref, grpc::string_ref>* GetRecvTrailingMetadata()
+      override {
+    GPR_CODEGEN_ASSERT(false &&
+                       "It is illegal to call GetRecvTrailingMetadata on a "
+                       "method which has a Cancel notification");
+    return nullptr;
+  }
+
+  std::unique_ptr<ChannelInterface> GetInterceptedChannel() {
+    GPR_CODEGEN_ASSERT(false &&
+                       "It is illegal to call GetInterceptedChannel on a "
+                       "method which has a Cancel notification");
+    return std::unique_ptr<ChannelInterface>(nullptr);
+  }
+};
 }  // namespace internal
 }  // namespace grpc
 

+ 4 - 3
src/cpp/client/client_context.cc

@@ -111,9 +111,10 @@ void ClientContext::set_compression_algorithm(
 void ClientContext::TryCancel() {
   std::unique_lock<std::mutex> lock(mu_);
   if (call_) {
-    // for(size_t i = 0; i < rpc_info_.interceptors_.size(); i++) {
-    // rpc_info_.RunInterceptor(, 0);
-    //}
+    internal::CancelInterceptorBatchMethods cancel_methods;
+    for (size_t i = 0; i < rpc_info_.interceptors_.size(); i++) {
+      rpc_info_.RunInterceptor(&cancel_methods, i);
+    }
     grpc_call_cancel(call_, nullptr);
   } else {
     call_canceled_ = true;

+ 6 - 0
src/cpp/server/server_context.cc

@@ -275,6 +275,12 @@ void ServerContext::AddTrailingMetadata(const grpc::string& key,
 }
 
 void ServerContext::TryCancel() const {
+  internal::CancelInterceptorBatchMethods cancel_methods;
+  if (rpc_info_) {
+    for (size_t i = 0; i < rpc_info_->interceptors_.size(); i++) {
+      rpc_info_->RunInterceptor(&cancel_methods, i);
+    }
+  }
   grpc_call_error err = grpc_call_cancel_with_status(
       call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", nullptr);
   if (err != GRPC_CALL_OK) {