GRPC C++  1.24.0
call_op_set.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
20 #define GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
21 
22 #include <assert.h>
23 #include <array>
24 #include <cstring>
25 #include <functional>
26 #include <map>
27 #include <memory>
28 #include <vector>
29 
44 
45 #include <grpc/impl/codegen/atm.h>
48 
49 namespace grpc {
50 
51 extern CoreCodegenInterface* g_core_codegen_interface;
52 
53 namespace internal {
54 class Call;
55 class CallHook;
56 
57 // TODO(yangg) if the map is changed before we send, the pointers will be a
58 // mess. Make sure it does not happen.
60  const std::multimap<grpc::string, grpc::string>& metadata,
61  size_t* metadata_count, const grpc::string& optional_error_details) {
62  *metadata_count = metadata.size() + (optional_error_details.empty() ? 0 : 1);
63  if (*metadata_count == 0) {
64  return nullptr;
65  }
66  grpc_metadata* metadata_array =
67  (grpc_metadata*)(g_core_codegen_interface->gpr_malloc(
68  (*metadata_count) * sizeof(grpc_metadata)));
69  size_t i = 0;
70  for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
71  metadata_array[i].key = SliceReferencingString(iter->first);
72  metadata_array[i].value = SliceReferencingString(iter->second);
73  }
74  if (!optional_error_details.empty()) {
75  metadata_array[i].key =
76  g_core_codegen_interface->grpc_slice_from_static_buffer(
78  metadata_array[i].value = SliceReferencingString(optional_error_details);
79  }
80  return metadata_array;
81 }
82 } // namespace internal
83 
85 class WriteOptions {
86  public:
87  WriteOptions() : flags_(0), last_message_(false) {}
88  WriteOptions(const WriteOptions& other)
89  : flags_(other.flags_), last_message_(other.last_message_) {}
90 
92  WriteOptions& operator=(const WriteOptions& other) = default;
93 
95  inline void Clear() { flags_ = 0; }
96 
98  inline uint32_t flags() const { return flags_; }
99 
104  SetBit(GRPC_WRITE_NO_COMPRESS);
105  return *this;
106  }
107 
112  ClearBit(GRPC_WRITE_NO_COMPRESS);
113  return *this;
114  }
115 
120  inline bool get_no_compression() const {
121  return GetBit(GRPC_WRITE_NO_COMPRESS);
122  }
123 
129  SetBit(GRPC_WRITE_BUFFER_HINT);
130  return *this;
131  }
132 
138  ClearBit(GRPC_WRITE_BUFFER_HINT);
139  return *this;
140  }
141 
146  inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
147 
151  SetBit(GRPC_WRITE_BUFFER_HINT);
152  return *this;
153  }
154 
156  ClearBit(GRPC_WRITE_BUFFER_HINT);
157  return *this;
158  }
159 
160  inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
161 
168  last_message_ = true;
169  return *this;
170  }
171 
175  last_message_ = false;
176  return *this;
177  }
178 
182  SetBit(GRPC_WRITE_THROUGH);
183  return *this;
184  }
185 
186  inline bool is_write_through() const { return GetBit(GRPC_WRITE_THROUGH); }
187 
192  bool is_last_message() const { return last_message_; }
193 
194  private:
195  void SetBit(const uint32_t mask) { flags_ |= mask; }
196 
197  void ClearBit(const uint32_t mask) { flags_ &= ~mask; }
198 
199  bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
200 
201  uint32_t flags_;
202  bool last_message_;
203 };
204 
205 namespace internal {
206 
209 template <int I>
210 class CallNoOp {
211  protected:
212  void AddOp(grpc_op* /*ops*/, size_t* /*nops*/) {}
213  void FinishOp(bool* /*status*/) {}
215  InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
217  InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
218  void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
219  }
220 };
221 
223  public:
224  CallOpSendInitialMetadata() : send_(false) {
225  maybe_compression_level_.is_set = false;
226  }
227 
228  void SendInitialMetadata(std::multimap<grpc::string, grpc::string>* metadata,
229  uint32_t flags) {
230  maybe_compression_level_.is_set = false;
231  send_ = true;
232  flags_ = flags;
233  metadata_map_ = metadata;
234  }
235 
237  maybe_compression_level_.is_set = true;
238  maybe_compression_level_.level = level;
239  }
240 
241  protected:
242  void AddOp(grpc_op* ops, size_t* nops) {
243  if (!send_ || hijacked_) return;
244  grpc_op* op = &ops[(*nops)++];
246  op->flags = flags_;
247  op->reserved = NULL;
248  initial_metadata_ =
249  FillMetadataArray(*metadata_map_, &initial_metadata_count_, "");
250  op->data.send_initial_metadata.count = initial_metadata_count_;
251  op->data.send_initial_metadata.metadata = initial_metadata_;
253  maybe_compression_level_.is_set;
254  if (maybe_compression_level_.is_set) {
256  maybe_compression_level_.level;
257  }
258  }
259  void FinishOp(bool* /*status*/) {
260  if (!send_ || hijacked_) return;
261  g_core_codegen_interface->gpr_free(initial_metadata_);
262  send_ = false;
263  }
264 
266  InterceptorBatchMethodsImpl* interceptor_methods) {
267  if (!send_) return;
268  interceptor_methods->AddInterceptionHookPoint(
270  interceptor_methods->SetSendInitialMetadata(metadata_map_);
271  }
272 
274  InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
275 
276  void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
277  hijacked_ = true;
278  }
279 
280  bool hijacked_ = false;
281  bool send_;
282  uint32_t flags_;
284  std::multimap<grpc::string, grpc::string>* metadata_map_;
286  struct {
287  bool is_set;
289  } maybe_compression_level_;
290 };
291 
293  public:
294  CallOpSendMessage() : send_buf_() {}
295 
298  template <class M>
299  Status SendMessage(const M& message,
301 
302  template <class M>
303  Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
304 
308  template <class M>
309  Status SendMessagePtr(const M* message,
311 
314  template <class M>
315  Status SendMessagePtr(const M* message) GRPC_MUST_USE_RESULT;
316 
317  protected:
318  void AddOp(grpc_op* ops, size_t* nops) {
319  if (msg_ == nullptr && !send_buf_.Valid()) return;
320  if (hijacked_) {
321  serializer_ = nullptr;
322  return;
323  }
324  if (msg_ != nullptr) {
325  GPR_CODEGEN_ASSERT(serializer_(msg_).ok());
326  }
327  serializer_ = nullptr;
328  grpc_op* op = &ops[(*nops)++];
329  op->op = GRPC_OP_SEND_MESSAGE;
330  op->flags = write_options_.flags();
331  op->reserved = NULL;
332  op->data.send_message.send_message = send_buf_.c_buffer();
333  // Flags are per-message: clear them after use.
334  write_options_.Clear();
335  }
336  void FinishOp(bool* status) {
337  if (msg_ == nullptr && !send_buf_.Valid()) return;
338  if (hijacked_ && failed_send_) {
339  // Hijacking interceptor failed this Op
340  *status = false;
341  } else if (!*status) {
342  // This Op was passed down to core and the Op failed
343  failed_send_ = true;
344  }
345  }
346 
348  InterceptorBatchMethodsImpl* interceptor_methods) {
349  if (msg_ == nullptr && !send_buf_.Valid()) return;
350  interceptor_methods->AddInterceptionHookPoint(
352  interceptor_methods->SetSendMessage(&send_buf_, &msg_, &failed_send_,
353  serializer_);
354  }
355 
357  InterceptorBatchMethodsImpl* interceptor_methods) {
358  if (msg_ != nullptr || send_buf_.Valid()) {
359  interceptor_methods->AddInterceptionHookPoint(
361  }
362  send_buf_.Clear();
363  msg_ = nullptr;
364  // The contents of the SendMessage value that was previously set
365  // has had its references stolen by core's operations
366  interceptor_methods->SetSendMessage(nullptr, nullptr, &failed_send_,
367  nullptr);
368  }
369 
370  void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
371  hijacked_ = true;
372  }
373 
374  private:
375  const void* msg_ = nullptr; // The original non-serialized message
376  bool hijacked_ = false;
377  bool failed_send_ = false;
378  ByteBuffer send_buf_;
379  WriteOptions write_options_;
380  std::function<Status(const void*)> serializer_;
381 };
382 
383 template <class M>
385  write_options_ = options;
386  serializer_ = [this](const void* message) {
387  bool own_buf;
388  send_buf_.Clear();
389  // TODO(vjpai): Remove the void below when possible
390  // The void in the template parameter below should not be needed
391  // (since it should be implicit) but is needed due to an observed
392  // difference in behavior between clang and gcc for certain internal users
394  *static_cast<const M*>(message), send_buf_.bbuf_ptr(), &own_buf);
395  if (!own_buf) {
396  send_buf_.Duplicate();
397  }
398  return result;
399  };
400  // Serialize immediately only if we do not have access to the message pointer
401  if (msg_ == nullptr) {
402  Status result = serializer_(&message);
403  serializer_ = nullptr;
404  return result;
405  }
406  return Status();
407 }
408 
409 template <class M>
411  return SendMessage(message, WriteOptions());
412 }
413 
414 template <class M>
416  WriteOptions options) {
417  msg_ = message;
418  return SendMessage(*message, options);
419 }
420 
421 template <class M>
423  msg_ = message;
424  return SendMessage(*message, WriteOptions());
425 }
426 
427 template <class R>
428 class CallOpRecvMessage {
429  public:
431  : got_message(false),
432  message_(nullptr),
433  allow_not_getting_message_(false) {}
434 
435  void RecvMessage(R* message) { message_ = message; }
436 
437  // Do not change status if no message is received.
438  void AllowNoMessage() { allow_not_getting_message_ = true; }
439 
441 
442  protected:
443  void AddOp(grpc_op* ops, size_t* nops) {
444  if (message_ == nullptr || hijacked_) return;
445  grpc_op* op = &ops[(*nops)++];
446  op->op = GRPC_OP_RECV_MESSAGE;
447  op->flags = 0;
448  op->reserved = NULL;
449  op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
450  }
451 
452  void FinishOp(bool* status) {
453  if (message_ == nullptr || hijacked_) return;
454  if (recv_buf_.Valid()) {
455  if (*status) {
456  got_message = *status =
457  SerializationTraits<R>::Deserialize(recv_buf_.bbuf_ptr(), message_)
458  .ok();
459  recv_buf_.Release();
460  } else {
461  got_message = false;
462  recv_buf_.Clear();
463  }
464  } else {
465  got_message = false;
466  if (!allow_not_getting_message_) {
467  *status = false;
468  }
469  }
470  }
471 
473  InterceptorBatchMethodsImpl* interceptor_methods) {
474  if (message_ == nullptr) return;
475  interceptor_methods->SetRecvMessage(message_, &got_message);
476  }
477 
479  InterceptorBatchMethodsImpl* interceptor_methods) {
480  if (message_ == nullptr) return;
481  interceptor_methods->AddInterceptionHookPoint(
483  if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
484  }
485  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
486  hijacked_ = true;
487  if (message_ == nullptr) return;
488  interceptor_methods->AddInterceptionHookPoint(
490  got_message = true;
491  }
492 
493  private:
494  R* message_;
495  ByteBuffer recv_buf_;
496  bool allow_not_getting_message_;
497  bool hijacked_ = false;
498 };
499 
501  public:
502  virtual Status Deserialize(ByteBuffer* buf) = 0;
503  virtual ~DeserializeFunc() {}
504 };
505 
506 template <class R>
507 class DeserializeFuncType final : public DeserializeFunc {
508  public:
509  DeserializeFuncType(R* message) : message_(message) {}
510  Status Deserialize(ByteBuffer* buf) override {
511  return SerializationTraits<R>::Deserialize(buf->bbuf_ptr(), message_);
512  }
513 
514  ~DeserializeFuncType() override {}
515 
516  private:
517  R* message_; // Not a managed pointer because management is external to this
518 };
519 
521  public:
523  : got_message(false), allow_not_getting_message_(false) {}
524 
525  template <class R>
526  void RecvMessage(R* message) {
527  // Use an explicit base class pointer to avoid resolution error in the
528  // following unique_ptr::reset for some old implementations.
529  DeserializeFunc* func = new DeserializeFuncType<R>(message);
530  deserialize_.reset(func);
531  message_ = message;
532  }
533 
534  // Do not change status if no message is received.
535  void AllowNoMessage() { allow_not_getting_message_ = true; }
536 
538 
539  protected:
540  void AddOp(grpc_op* ops, size_t* nops) {
541  if (!deserialize_ || hijacked_) return;
542  grpc_op* op = &ops[(*nops)++];
543  op->op = GRPC_OP_RECV_MESSAGE;
544  op->flags = 0;
545  op->reserved = NULL;
546  op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
547  }
548 
549  void FinishOp(bool* status) {
550  if (!deserialize_ || hijacked_) return;
551  if (recv_buf_.Valid()) {
552  if (*status) {
553  got_message = true;
554  *status = deserialize_->Deserialize(&recv_buf_).ok();
555  recv_buf_.Release();
556  } else {
557  got_message = false;
558  recv_buf_.Clear();
559  }
560  } else {
561  got_message = false;
562  if (!allow_not_getting_message_) {
563  *status = false;
564  }
565  }
566  }
567 
569  InterceptorBatchMethodsImpl* interceptor_methods) {
570  if (!deserialize_) return;
571  interceptor_methods->SetRecvMessage(message_, &got_message);
572  }
573 
575  InterceptorBatchMethodsImpl* interceptor_methods) {
576  if (!deserialize_) return;
577  interceptor_methods->AddInterceptionHookPoint(
579  if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
580  deserialize_.reset();
581  }
582  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
583  hijacked_ = true;
584  if (!deserialize_) return;
585  interceptor_methods->AddInterceptionHookPoint(
587  got_message = true;
588  }
589 
590  private:
591  void* message_;
592  bool hijacked_ = false;
593  std::unique_ptr<DeserializeFunc> deserialize_;
594  ByteBuffer recv_buf_;
595  bool allow_not_getting_message_;
596 };
597 
599  public:
600  CallOpClientSendClose() : send_(false) {}
601 
602  void ClientSendClose() { send_ = true; }
603 
604  protected:
605  void AddOp(grpc_op* ops, size_t* nops) {
606  if (!send_ || hijacked_) return;
607  grpc_op* op = &ops[(*nops)++];
609  op->flags = 0;
610  op->reserved = NULL;
611  }
612  void FinishOp(bool* /*status*/) { send_ = false; }
613 
615  InterceptorBatchMethodsImpl* interceptor_methods) {
616  if (!send_) return;
617  interceptor_methods->AddInterceptionHookPoint(
619  }
620 
622  InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
623 
624  void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
625  hijacked_ = true;
626  }
627 
628  private:
629  bool hijacked_ = false;
630  bool send_;
631 };
632 
634  public:
635  CallOpServerSendStatus() : send_status_available_(false) {}
636 
638  std::multimap<grpc::string, grpc::string>* trailing_metadata,
639  const Status& status) {
640  send_error_details_ = status.error_details();
641  metadata_map_ = trailing_metadata;
642  send_status_available_ = true;
643  send_status_code_ = static_cast<grpc_status_code>(status.error_code());
644  send_error_message_ = status.error_message();
645  }
646 
647  protected:
648  void AddOp(grpc_op* ops, size_t* nops) {
649  if (!send_status_available_ || hijacked_) return;
650  trailing_metadata_ = FillMetadataArray(
651  *metadata_map_, &trailing_metadata_count_, send_error_details_);
652  grpc_op* op = &ops[(*nops)++];
655  trailing_metadata_count_;
656  op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
657  op->data.send_status_from_server.status = send_status_code_;
658  error_message_slice_ = SliceReferencingString(send_error_message_);
660  send_error_message_.empty() ? nullptr : &error_message_slice_;
661  op->flags = 0;
662  op->reserved = NULL;
663  }
664 
665  void FinishOp(bool* /*status*/) {
666  if (!send_status_available_ || hijacked_) return;
667  g_core_codegen_interface->gpr_free(trailing_metadata_);
668  send_status_available_ = false;
669  }
670 
672  InterceptorBatchMethodsImpl* interceptor_methods) {
673  if (!send_status_available_) return;
674  interceptor_methods->AddInterceptionHookPoint(
676  interceptor_methods->SetSendTrailingMetadata(metadata_map_);
677  interceptor_methods->SetSendStatus(&send_status_code_, &send_error_details_,
678  &send_error_message_);
679  }
680 
682  InterceptorBatchMethodsImpl* /*interceptor_methods*/) {}
683 
684  void SetHijackingState(InterceptorBatchMethodsImpl* /*interceptor_methods*/) {
685  hijacked_ = true;
686  }
687 
688  private:
689  bool hijacked_ = false;
690  bool send_status_available_;
691  grpc_status_code send_status_code_;
692  grpc::string send_error_details_;
693  grpc::string send_error_message_;
694  size_t trailing_metadata_count_;
695  std::multimap<grpc::string, grpc::string>* metadata_map_;
696  grpc_metadata* trailing_metadata_;
697  grpc_slice error_message_slice_;
698 };
699 
701  public:
702  CallOpRecvInitialMetadata() : metadata_map_(nullptr) {}
703 
705  context->initial_metadata_received_ = true;
706  metadata_map_ = &context->recv_initial_metadata_;
707  }
708 
709  protected:
710  void AddOp(grpc_op* ops, size_t* nops) {
711  if (metadata_map_ == nullptr || hijacked_) return;
712  grpc_op* op = &ops[(*nops)++];
714  op->data.recv_initial_metadata.recv_initial_metadata = metadata_map_->arr();
715  op->flags = 0;
716  op->reserved = NULL;
717  }
718 
719  void FinishOp(bool* /*status*/) {
720  if (metadata_map_ == nullptr || hijacked_) return;
721  }
722 
724  InterceptorBatchMethodsImpl* interceptor_methods) {
725  interceptor_methods->SetRecvInitialMetadata(metadata_map_);
726  }
727 
729  InterceptorBatchMethodsImpl* interceptor_methods) {
730  if (metadata_map_ == nullptr) return;
731  interceptor_methods->AddInterceptionHookPoint(
733  metadata_map_ = nullptr;
734  }
735 
736  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
737  hijacked_ = true;
738  if (metadata_map_ == nullptr) return;
739  interceptor_methods->AddInterceptionHookPoint(
741  }
742 
743  private:
744  bool hijacked_ = false;
745  MetadataMap* metadata_map_;
746 };
747 
749  public:
751  : recv_status_(nullptr), debug_error_string_(nullptr) {}
752 
754  client_context_ = context;
755  metadata_map_ = &client_context_->trailing_metadata_;
756  recv_status_ = status;
757  error_message_ = g_core_codegen_interface->grpc_empty_slice();
758  }
759 
760  protected:
761  void AddOp(grpc_op* ops, size_t* nops) {
762  if (recv_status_ == nullptr || hijacked_) return;
763  grpc_op* op = &ops[(*nops)++];
765  op->data.recv_status_on_client.trailing_metadata = metadata_map_->arr();
766  op->data.recv_status_on_client.status = &status_code_;
767  op->data.recv_status_on_client.status_details = &error_message_;
768  op->data.recv_status_on_client.error_string = &debug_error_string_;
769  op->flags = 0;
770  op->reserved = NULL;
771  }
772 
773  void FinishOp(bool* /*status*/) {
774  if (recv_status_ == nullptr || hijacked_) return;
775  grpc::string binary_error_details = metadata_map_->GetBinaryErrorDetails();
776  *recv_status_ =
777  Status(static_cast<StatusCode>(status_code_),
778  GRPC_SLICE_IS_EMPTY(error_message_)
779  ? grpc::string()
780  : grpc::string(GRPC_SLICE_START_PTR(error_message_),
781  GRPC_SLICE_END_PTR(error_message_)),
782  binary_error_details);
783  client_context_->set_debug_error_string(
784  debug_error_string_ != nullptr ? debug_error_string_ : "");
785  g_core_codegen_interface->grpc_slice_unref(error_message_);
786  if (debug_error_string_ != nullptr) {
787  g_core_codegen_interface->gpr_free((void*)debug_error_string_);
788  }
789  }
790 
792  InterceptorBatchMethodsImpl* interceptor_methods) {
793  interceptor_methods->SetRecvStatus(recv_status_);
794  interceptor_methods->SetRecvTrailingMetadata(metadata_map_);
795  }
796 
798  InterceptorBatchMethodsImpl* interceptor_methods) {
799  if (recv_status_ == nullptr) return;
800  interceptor_methods->AddInterceptionHookPoint(
802  recv_status_ = nullptr;
803  }
804 
805  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
806  hijacked_ = true;
807  if (recv_status_ == nullptr) return;
808  interceptor_methods->AddInterceptionHookPoint(
810  }
811 
812  private:
813  bool hijacked_ = false;
814  ::grpc_impl::ClientContext* client_context_;
815  MetadataMap* metadata_map_;
816  Status* recv_status_;
817  const char* debug_error_string_;
818  grpc_status_code status_code_;
819  grpc_slice error_message_;
820 };
821 
822 template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
823  class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
824  class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
825 class CallOpSet;
826 
833 template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
834 class CallOpSet : public CallOpSetInterface,
835  public Op1,
836  public Op2,
837  public Op3,
838  public Op4,
839  public Op5,
840  public Op6 {
841  public:
842  CallOpSet() : core_cq_tag_(this), return_tag_(this) {}
843  // The copy constructor and assignment operator reset the value of
844  // core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_
845  // since those are only meaningful on a specific object, not across objects.
846  CallOpSet(const CallOpSet& other)
847  : core_cq_tag_(this),
848  return_tag_(this),
849  call_(other.call_),
850  done_intercepting_(false),
851  interceptor_methods_(InterceptorBatchMethodsImpl()) {}
852 
853  CallOpSet& operator=(const CallOpSet& other) {
854  core_cq_tag_ = this;
855  return_tag_ = this;
856  call_ = other.call_;
857  done_intercepting_ = false;
858  interceptor_methods_ = InterceptorBatchMethodsImpl();
859  return *this;
860  }
861 
862  void FillOps(Call* call) override {
863  done_intercepting_ = false;
864  g_core_codegen_interface->grpc_call_ref(call->call());
865  call_ =
866  *call; // It's fine to create a copy of call since it's just pointers
867 
868  if (RunInterceptors()) {
869  ContinueFillOpsAfterInterception();
870  } else {
871  // After the interceptors are run, ContinueFillOpsAfterInterception will
872  // be run
873  }
874  }
875 
876  bool FinalizeResult(void** tag, bool* status) override {
877  if (done_intercepting_) {
878  // Complete the avalanching since we are done with this batch of ops
879  call_.cq()->CompleteAvalanching();
880  // We have already finished intercepting and filling in the results. This
881  // round trip from the core needed to be made because interceptors were
882  // run
883  *tag = return_tag_;
884  *status = saved_status_;
885  g_core_codegen_interface->grpc_call_unref(call_.call());
886  return true;
887  }
888 
889  this->Op1::FinishOp(status);
890  this->Op2::FinishOp(status);
891  this->Op3::FinishOp(status);
892  this->Op4::FinishOp(status);
893  this->Op5::FinishOp(status);
894  this->Op6::FinishOp(status);
895  saved_status_ = *status;
896  if (RunInterceptorsPostRecv()) {
897  *tag = return_tag_;
898  g_core_codegen_interface->grpc_call_unref(call_.call());
899  return true;
900  }
901  // Interceptors are going to be run, so we can't return the tag just yet.
902  // After the interceptors are run, ContinueFinalizeResultAfterInterception
903  return false;
904  }
905 
906  void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
907 
908  void* core_cq_tag() override { return core_cq_tag_; }
909 
914  void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
915 
916  // This will be called while interceptors are run if the RPC is a hijacked
917  // RPC. This should set hijacking state for each of the ops.
918  void SetHijackingState() override {
919  this->Op1::SetHijackingState(&interceptor_methods_);
920  this->Op2::SetHijackingState(&interceptor_methods_);
921  this->Op3::SetHijackingState(&interceptor_methods_);
922  this->Op4::SetHijackingState(&interceptor_methods_);
923  this->Op5::SetHijackingState(&interceptor_methods_);
924  this->Op6::SetHijackingState(&interceptor_methods_);
925  }
926 
927  // Should be called after interceptors are done running
929  static const size_t MAX_OPS = 6;
930  grpc_op ops[MAX_OPS];
931  size_t nops = 0;
932  this->Op1::AddOp(ops, &nops);
933  this->Op2::AddOp(ops, &nops);
934  this->Op3::AddOp(ops, &nops);
935  this->Op4::AddOp(ops, &nops);
936  this->Op5::AddOp(ops, &nops);
937  this->Op6::AddOp(ops, &nops);
939  g_core_codegen_interface->grpc_call_start_batch(
940  call_.call(), ops, nops, core_cq_tag(), nullptr));
941  }
942 
943  // Should be called after interceptors are done running on the finalize result
944  // path
946  done_intercepting_ = true;
948  g_core_codegen_interface->grpc_call_start_batch(
949  call_.call(), nullptr, 0, core_cq_tag(), nullptr));
950  }
951 
952  private:
953  // Returns true if no interceptors need to be run
954  bool RunInterceptors() {
955  interceptor_methods_.ClearState();
956  interceptor_methods_.SetCallOpSetInterface(this);
957  interceptor_methods_.SetCall(&call_);
958  this->Op1::SetInterceptionHookPoint(&interceptor_methods_);
959  this->Op2::SetInterceptionHookPoint(&interceptor_methods_);
960  this->Op3::SetInterceptionHookPoint(&interceptor_methods_);
961  this->Op4::SetInterceptionHookPoint(&interceptor_methods_);
962  this->Op5::SetInterceptionHookPoint(&interceptor_methods_);
963  this->Op6::SetInterceptionHookPoint(&interceptor_methods_);
964  if (interceptor_methods_.InterceptorsListEmpty()) {
965  return true;
966  }
967  // This call will go through interceptors and would need to
968  // schedule new batches, so delay completion queue shutdown
969  call_.cq()->RegisterAvalanching();
970  return interceptor_methods_.RunInterceptors();
971  }
972  // Returns true if no interceptors need to be run
973  bool RunInterceptorsPostRecv() {
974  // Call and OpSet had already been set on the set state.
975  // SetReverse also clears previously set hook points
976  interceptor_methods_.SetReverse();
977  this->Op1::SetFinishInterceptionHookPoint(&interceptor_methods_);
978  this->Op2::SetFinishInterceptionHookPoint(&interceptor_methods_);
979  this->Op3::SetFinishInterceptionHookPoint(&interceptor_methods_);
980  this->Op4::SetFinishInterceptionHookPoint(&interceptor_methods_);
981  this->Op5::SetFinishInterceptionHookPoint(&interceptor_methods_);
982  this->Op6::SetFinishInterceptionHookPoint(&interceptor_methods_);
983  return interceptor_methods_.RunInterceptors();
984  }
985 
986  void* core_cq_tag_;
987  void* return_tag_;
988  Call call_;
989  bool done_intercepting_ = false;
990  InterceptorBatchMethodsImpl interceptor_methods_;
991  bool saved_status_;
992 };
993 
994 } // namespace internal
995 } // namespace grpc
996 
997 #endif // GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
void ContinueFillOpsAfterInterception() override
Definition: call_op_set.h:928
everything went ok
Definition: grpc_types.h:394
grpc_op_type op
Operation type, as defined by grpc_op_type.
Definition: grpc_types.h:577
grpc_metadata_array * recv_initial_metadata
Definition: grpc_types.h:620
union grpc_op::grpc_op_data data
bool get_no_compression() const
Get value for the flag indicating whether compression for the next message write is forcefully disabl...
Definition: call_op_set.h:120
void * reserved
Reserved for future usage.
Definition: grpc_types.h:581
WriteOptions & clear_buffer_hint()
Clears flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:137
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:681
void ClientRecvStatus(::grpc_impl::ClientContext *context, Status *status)
Definition: call_op_set.h:753
grpc_status_code
Definition: status.h:26
void FinishOp(bool *)
Definition: call_op_set.h:719
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:128
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:145
void SetHijackingState() override
Definition: call_op_set.h:918
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:710
void ClientSendClose()
Definition: call_op_set.h:602
void RecvInitialMetadata(::grpc_impl::ClientContext *context)
Definition: call_op_set.h:704
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:805
CallOpRecvMessage()
Definition: call_op_set.h:430
std::string string
Definition: config.h:35
void SetHijackingState(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:276
bool get_buffer_hint() const
Get value for the flag indicating that the write may be buffered and need not go out on the wire imme...
Definition: call_op_set.h:146
WriteOptions & clear_no_compression()
Clears flag for the disabling of compression for the next message write.
Definition: call_op_set.h:111
struct grpc_byte_buffer ** recv_message
Definition: grpc_types.h:628
struct grpc_byte_buffer * send_message
This op takes ownership of the slices in send_message.
Definition: grpc_types.h:603
void FinishOp(bool *)
Definition: call_op_set.h:213
Send a close from the client: one and only one instance MUST be sent from the client, unless the call was cancelled - in which case this can be skipped.
Definition: grpc_types.h:542
CallOpSet(const CallOpSet &other)
Definition: call_op_set.h:846
Send status from the server: one and only one instance MUST be sent from the server unless the call w...
Definition: grpc_types.h:547
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:318
void AllowNoMessage()
Definition: call_op_set.h:535
void SetHijackingState(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:624
Definition: metadata_map.h:33
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:167
#define GRPC_WRITE_NO_COMPRESS
Force compression to be disabled for a particular write (start_write/add_metadata).
Definition: grpc_types.h:441
grpc_slice key
the key, value values are expected to line up with grpc_mdelem: if changing them, update metadata...
Definition: grpc_types.h:473
Status Deserialize(ByteBuffer *buf) override
Definition: call_op_set.h:510
CallOpRecvInitialMetadata()
Definition: call_op_set.h:702
void SetRecvMessage(void *message, bool *got_message)
Definition: interceptor_common.h:169
struct grpc_op::grpc_op_data::grpc_op_recv_message recv_message
void SetRecvStatus(Status *status)
Definition: interceptor_common.h:178
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:825
#define GRPC_SLICE_IS_EMPTY(slice)
Definition: slice.h:107
grpc_slice value
Definition: grpc_types.h:474
#define GRPC_WRITE_THROUGH
Force this message to be written to the socket before completing it.
Definition: grpc_types.h:443
const char ** error_string
If this is not nullptr, it will be populated with the full fidelity error string for debugging purpos...
Definition: grpc_types.h:642
grpc_slice * status_details
Definition: grpc_types.h:638
void Clear()
Clear all flags.
Definition: call_op_set.h:95
WriteOptions & set_write_through()
Guarantee that all bytes have been written to the socket before completing this write (usually writes...
Definition: call_op_set.h:181
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:568
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1]. ...
Definition: slice.h:60
bool send_
Definition: call_op_set.h:281
#define GRPC_WRITE_BUFFER_HINT
Write Flags:
Definition: grpc_types.h:438
Send a message: 0 or more of these operations can occur for each call.
Definition: grpc_types.h:537
bool got_message
Definition: call_op_set.h:440
The first three in this list are for clients and servers.
bool is_write_through() const
Definition: call_op_set.h:186
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:621
WriteOptions()
Definition: call_op_set.h:87
WriteOptions & clear_last_message()
Clears flag indicating that this is the last message in a stream, disabling coalescing.
Definition: call_op_set.h:174
grpc_slice SliceReferencingString(const grpc::string &str)
Definition: slice.h:131
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata::grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level
grpc_compression_level
Compression levels allow a party with knowledge of its peer&#39;s accepted encodings to request compressi...
Definition: compression_types.h:71
grpc_call * call() const
Definition: call.h:72
#define GRPC_SLICE_START_PTR(slice)
Definition: slice.h:96
void ContinueFinalizeResultAfterInterception() override
Definition: call_op_set.h:945
bool is_set
Definition: call_op_set.h:287
grpc_metadata_array * trailing_metadata
ownership of the array is with the caller, but ownership of the elements stays with the call object (...
Definition: grpc_types.h:636
WriteOptions & clear_corked()
Definition: call_op_set.h:155
WriteOptions & set_no_compression()
Sets flag for the disabling of compression for the next message write.
Definition: call_op_set.h:103
#define GRPC_SLICE_END_PTR(slice)
Definition: slice.h:105
CallOpClientSendClose()
Definition: call_op_set.h:600
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:761
Definition: byte_buffer.h:63
::google::protobuf::util::Status Status
Definition: config_protobuf.h:96
uint32_t flags_
Definition: call_op_set.h:282
Default argument for CallOpSet.
Definition: call_op_set.h:210
void FinishOp(bool *)
Definition: call_op_set.h:773
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:574
void SetHijackingState(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:684
grpc_compression_level level
Definition: call_op_set.h:288
bool got_message
Definition: call_op_set.h:537
grpc::string error_message() const
Return the instance&#39;s error message.
Definition: status.h:112
Definition: call_op_set.h:500
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
The following three are for hijacked clients only.
StatusCode error_code() const
Return the instance&#39;s error code.
Definition: status.h:110
CallOpServerSendStatus()
Definition: call_op_set.h:635
bool FinalizeResult(void **tag, bool *status) override
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
Definition: call_op_set.h:876
Definition: call_op_set.h:633
void set_compression_level(grpc_compression_level level)
Definition: call_op_set.h:236
Definition: call_op_set.h:222
bool is_corked() const
Definition: call_op_set.h:160
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:540
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:736
Definition: call_op_set.h:700
Status SendMessage(const M &message, WriteOptions options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call_op_set.h:384
std::multimap< grpc::string, grpc::string > * metadata_map_
Definition: call_op_set.h:284
Status SendMessagePtr(const M *message, WriteOptions options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call_op_set.h:415
uint32_t flags() const
Returns raw flags bitset.
Definition: call_op_set.h:98
void SetHijackingState(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:218
grpc_metadata * FillMetadataArray(const std::multimap< grpc::string, grpc::string > &metadata, size_t *metadata_count, const grpc::string &optional_error_details)
Definition: call_op_set.h:59
struct grpc_op::grpc_op_data::grpc_op_recv_initial_metadata recv_initial_metadata
A single metadata element.
Definition: grpc_types.h:470
Definition: call_op_set.h:292
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata send_initial_metadata
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:242
void FinishOp(bool *status)
Definition: call_op_set.h:549
Operation data: one field for each op type (except SEND_CLOSE_FROM_CLIENT which has no arguments) ...
Definition: grpc_types.h:575
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:614
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
Receive initial metadata: one and only one MUST be made on the client, must not be made on the server...
Definition: grpc_types.h:552
void RecvMessage(R *message)
Definition: call_op_set.h:526
void FillOps(Call *call) override
Fills in grpc_op, starting from ops[*nops] and moving upwards.
Definition: call_op_set.h:862
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:356
struct grpc_op::grpc_op_data::grpc_op_send_message send_message
void SendInitialMetadata(std::multimap< grpc::string, grpc::string > *metadata, uint32_t flags)
Definition: call_op_set.h:228
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:273
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue_impl.h:91
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:648
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:216
void * core_cq_tag() override
Get the tag to be used at the core completion queue.
Definition: call_op_set.h:908
#define GRPC_MUST_USE_RESULT
Definition: port_platform.h:551
Send initial metadata: one and only one instance MUST be sent for each call, unless the call was canc...
Definition: grpc_types.h:533
WriteOptions(const WriteOptions &other)
Definition: call_op_set.h:88
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:605
CallOpClientRecvStatus()
Definition: call_op_set.h:750
Definition: interceptor_common.h:36
Definition: byte_buffer.h:52
void ServerSendStatus(std::multimap< grpc::string, grpc::string > *trailing_metadata, const Status &status)
Definition: call_op_set.h:637
Per-message write options.
Definition: call_op_set.h:85
grpc_slice * status_details
optional: set to NULL if no details need sending, non-NULL if they do pointer will not be retained pa...
Definition: grpc_types.h:612
CallOpSet & operator=(const CallOpSet &other)
Definition: call_op_set.h:853
void AllowNoMessage()
Definition: call_op_set.h:438
size_t trailing_metadata_count
Definition: grpc_types.h:606
CallOpSendMessage()
Definition: call_op_set.h:294
grpc_metadata * metadata
Definition: grpc_types.h:589
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:347
void SetHijackingState(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:370
void FinishOp(bool *status)
Definition: call_op_set.h:452
Definition: call_op_set.h:598
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:478
void FinishOp(bool *)
Definition: call_op_set.h:612
void AddOp(grpc_op *, size_t *)
Definition: call_op_set.h:212
An abstract collection of call ops, used to generate the grpc_call_op structure to pass down to the l...
Definition: call_op_set_interface.h:34
void SetSendStatus(grpc_status_code *code, grpc::string *error_details, grpc::string *error_message)
Definition: interceptor_common.h:157
void FinishOp(bool *)
Definition: call_op_set.h:665
void SetSendTrailingMetadata(std::multimap< grpc::string, grpc::string > *metadata)
Definition: interceptor_common.h:164
virtual ~DeserializeFunc()
Definition: call_op_set.h:503
void SetRecvInitialMetadata(MetadataMap *map)
Definition: interceptor_common.h:174
void RecvMessage(R *message)
Definition: call_op_set.h:435
struct grpc_op::grpc_op_data::grpc_op_send_status_from_server send_status_from_server
void set_output_tag(void *return_tag)
Definition: call_op_set.h:906
CallOpSendInitialMetadata()
Definition: call_op_set.h:224
void SetSendInitialMetadata(std::multimap< grpc::string, grpc::string > *metadata)
Definition: interceptor_common.h:152
WriteOptions & set_corked()
corked bit: aliases set_buffer_hint currently, with the intent that set_buffer_hint will be removed i...
Definition: call_op_set.h:150
A ClientContext allows the person implementing a service client to:
Definition: client_context_impl.h:181
Did it work? If it didn&#39;t, why?
Definition: status.h:31
void FinishOp(bool *status)
Definition: call_op_set.h:336
Receive status on the client: one and only one must be made on the client.
Definition: grpc_types.h:562
grpc_metadata * initial_metadata_
Definition: call_op_set.h:285
void AddInterceptionHookPoint(experimental::InterceptionHookPoints type)
Definition: interceptor_common.h:78
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:582
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:728
DeserializeFuncType(R *message)
Definition: call_op_set.h:509
size_t initial_metadata_count_
Definition: call_op_set.h:283
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:797
void SetRecvTrailingMetadata(MetadataMap *map)
Definition: interceptor_common.h:180
grpc_status_code status
Definition: grpc_types.h:608
grpc_status_code * status
Definition: grpc_types.h:637
size_t count
Definition: grpc_types.h:588
Definition: call_op_set.h:520
uint32_t flags
Write flags bitset for grpc_begin_messages.
Definition: grpc_types.h:579
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:192
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:723
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:791
Definition: call_op_set.h:748
void FinishOp(bool *)
Definition: call_op_set.h:259
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:671
void SetSendMessage(ByteBuffer *buf, const void **msg, bool *fail_send_message, std::function< Status(const void *)> serializer)
Definition: interceptor_common.h:143
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *)
Definition: call_op_set.h:214
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:485
Receive a message: 0 or more of these operations can occur for each call.
Definition: grpc_types.h:556
A sequence of bytes.
Definition: byte_buffer.h:72
CallOpGenericRecvMessage()
Definition: call_op_set.h:522
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:443
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:472
CallOpSet()
Definition: call_op_set.h:842
const char kBinaryErrorDetailsKey[]
Definition: metadata_map.h:31
The following two are for all clients and servers.
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:265
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:914
grpc::string error_details() const
Return the (binary) error details.
Definition: status.h:115
Straightforward wrapping of the C call object.
Definition: call.h:38
grpc_metadata * trailing_metadata
Definition: grpc_types.h:607
struct grpc_op::grpc_op_data::grpc_op_recv_status_on_client recv_status_on_client
~DeserializeFuncType() override
Definition: call_op_set.h:514