GRPC C++  1.17.0
call_op_set.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
20 #define GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
21 
22 #include <assert.h>
23 #include <array>
24 #include <cstring>
25 #include <functional>
26 #include <map>
27 #include <memory>
28 #include <vector>
29 
43 
44 #include <grpc/impl/codegen/atm.h>
47 
48 namespace grpc {
49 
52 
53 namespace internal {
54 class Call;
55 class CallHook;
56 
57 // TODO(yangg) if the map is changed before we send, the pointers will be a
58 // mess. Make sure it does not happen.
60  const std::multimap<grpc::string, grpc::string>& metadata,
61  size_t* metadata_count, const grpc::string& optional_error_details) {
62  *metadata_count = metadata.size() + (optional_error_details.empty() ? 0 : 1);
63  if (*metadata_count == 0) {
64  return nullptr;
65  }
66  grpc_metadata* metadata_array =
67  (grpc_metadata*)(g_core_codegen_interface->gpr_malloc(
68  (*metadata_count) * sizeof(grpc_metadata)));
69  size_t i = 0;
70  for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
71  metadata_array[i].key = SliceReferencingString(iter->first);
72  metadata_array[i].value = SliceReferencingString(iter->second);
73  }
74  if (!optional_error_details.empty()) {
75  metadata_array[i].key =
76  g_core_codegen_interface->grpc_slice_from_static_buffer(
78  metadata_array[i].value = SliceReferencingString(optional_error_details);
79  }
80  return metadata_array;
81 }
82 } // namespace internal
83 
85 class WriteOptions {
86  public:
87  WriteOptions() : flags_(0), last_message_(false) {}
88  WriteOptions(const WriteOptions& other)
89  : flags_(other.flags_), last_message_(other.last_message_) {}
90 
92  inline void Clear() { flags_ = 0; }
93 
95  inline uint32_t flags() const { return flags_; }
96 
101  SetBit(GRPC_WRITE_NO_COMPRESS);
102  return *this;
103  }
104 
109  ClearBit(GRPC_WRITE_NO_COMPRESS);
110  return *this;
111  }
112 
117  inline bool get_no_compression() const {
118  return GetBit(GRPC_WRITE_NO_COMPRESS);
119  }
120 
126  SetBit(GRPC_WRITE_BUFFER_HINT);
127  return *this;
128  }
129 
135  ClearBit(GRPC_WRITE_BUFFER_HINT);
136  return *this;
137  }
138 
143  inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
144 
148  SetBit(GRPC_WRITE_BUFFER_HINT);
149  return *this;
150  }
151 
153  ClearBit(GRPC_WRITE_BUFFER_HINT);
154  return *this;
155  }
156 
157  inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
158 
165  last_message_ = true;
166  return *this;
167  }
168 
172  last_message_ = false;
173  return *this;
174  }
175 
179  SetBit(GRPC_WRITE_THROUGH);
180  return *this;
181  }
182 
183  inline bool is_write_through() const { return GetBit(GRPC_WRITE_THROUGH); }
184 
189  bool is_last_message() const { return last_message_; }
190 
192  flags_ = rhs.flags_;
193  return *this;
194  }
195 
196  private:
197  void SetBit(const uint32_t mask) { flags_ |= mask; }
198 
199  void ClearBit(const uint32_t mask) { flags_ &= ~mask; }
200 
201  bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
202 
203  uint32_t flags_;
204  bool last_message_;
205 };
206 
207 namespace internal {
208 
211 template <int I>
212 class CallNoOp {
213  protected:
214  void AddOp(grpc_op* ops, size_t* nops) {}
215  void FinishOp(bool* status) {}
217  InterceptorBatchMethodsImpl* interceptor_methods) {}
219  InterceptorBatchMethodsImpl* interceptor_methods) {}
220  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {}
221 };
222 
224  public:
225  CallOpSendInitialMetadata() : send_(false) {
226  maybe_compression_level_.is_set = false;
227  }
228 
229  void SendInitialMetadata(std::multimap<grpc::string, grpc::string>* metadata,
230  uint32_t flags) {
231  maybe_compression_level_.is_set = false;
232  send_ = true;
233  flags_ = flags;
234  metadata_map_ = metadata;
235  }
236 
238  maybe_compression_level_.is_set = true;
239  maybe_compression_level_.level = level;
240  }
241 
242  protected:
243  void AddOp(grpc_op* ops, size_t* nops) {
244  if (!send_ || hijacked_) return;
245  grpc_op* op = &ops[(*nops)++];
247  op->flags = flags_;
248  op->reserved = NULL;
249  initial_metadata_ =
250  FillMetadataArray(*metadata_map_, &initial_metadata_count_, "");
251  op->data.send_initial_metadata.count = initial_metadata_count_;
252  op->data.send_initial_metadata.metadata = initial_metadata_;
254  maybe_compression_level_.is_set;
255  if (maybe_compression_level_.is_set) {
257  maybe_compression_level_.level;
258  }
259  }
260  void FinishOp(bool* status) {
261  if (!send_ || hijacked_) return;
262  g_core_codegen_interface->gpr_free(initial_metadata_);
263  send_ = false;
264  }
265 
267  InterceptorBatchMethodsImpl* interceptor_methods) {
268  if (!send_) return;
269  interceptor_methods->AddInterceptionHookPoint(
271  interceptor_methods->SetSendInitialMetadata(metadata_map_);
272  }
273 
275  InterceptorBatchMethodsImpl* interceptor_methods) {}
276 
277  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
278  hijacked_ = true;
279  }
280 
281  bool hijacked_ = false;
282  bool send_;
283  uint32_t flags_;
285  std::multimap<grpc::string, grpc::string>* metadata_map_;
287  struct {
288  bool is_set;
290  } maybe_compression_level_;
291 };
292 
294  public:
295  CallOpSendMessage() : send_buf_() {}
296 
299  template <class M>
300  Status SendMessage(const M& message,
302 
303  template <class M>
304  Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
305 
306  protected:
307  void AddOp(grpc_op* ops, size_t* nops) {
308  if (!send_buf_.Valid() || hijacked_) return;
309  grpc_op* op = &ops[(*nops)++];
310  op->op = GRPC_OP_SEND_MESSAGE;
311  op->flags = write_options_.flags();
312  op->reserved = NULL;
313  op->data.send_message.send_message = send_buf_.c_buffer();
314  // Flags are per-message: clear them after use.
315  write_options_.Clear();
316  }
317  void FinishOp(bool* status) { send_buf_.Clear(); }
318 
320  InterceptorBatchMethodsImpl* interceptor_methods) {
321  if (!send_buf_.Valid()) return;
322  interceptor_methods->AddInterceptionHookPoint(
324  interceptor_methods->SetSendMessage(&send_buf_);
325  }
326 
328  InterceptorBatchMethodsImpl* interceptor_methods) {}
329 
330  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
331  hijacked_ = true;
332  }
333 
334  private:
335  bool hijacked_ = false;
336  ByteBuffer send_buf_;
337  WriteOptions write_options_;
338 };
339 
340 template <class M>
342  write_options_ = options;
343  bool own_buf;
344  // TODO(vjpai): Remove the void below when possible
345  // The void in the template parameter below should not be needed
346  // (since it should be implicit) but is needed due to an observed
347  // difference in behavior between clang and gcc for certain internal users
349  message, send_buf_.bbuf_ptr(), &own_buf);
350  if (!own_buf) {
351  send_buf_.Duplicate();
352  }
353  return result;
354 }
355 
356 template <class M>
358  return SendMessage(message, WriteOptions());
359 }
360 
361 template <class R>
362 class CallOpRecvMessage {
363  public:
365  : got_message(false),
366  message_(nullptr),
367  allow_not_getting_message_(false) {}
368 
369  void RecvMessage(R* message) { message_ = message; }
370 
371  // Do not change status if no message is received.
372  void AllowNoMessage() { allow_not_getting_message_ = true; }
373 
375 
376  protected:
377  void AddOp(grpc_op* ops, size_t* nops) {
378  if (message_ == nullptr || hijacked_) return;
379  grpc_op* op = &ops[(*nops)++];
380  op->op = GRPC_OP_RECV_MESSAGE;
381  op->flags = 0;
382  op->reserved = NULL;
383  op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
384  }
385 
386  void FinishOp(bool* status) {
387  if (message_ == nullptr || hijacked_) return;
388  if (recv_buf_.Valid()) {
389  if (*status) {
390  got_message = *status =
391  SerializationTraits<R>::Deserialize(recv_buf_.bbuf_ptr(), message_)
392  .ok();
393  recv_buf_.Release();
394  } else {
395  got_message = false;
396  recv_buf_.Clear();
397  }
398  } else {
399  got_message = false;
400  if (!allow_not_getting_message_) {
401  *status = false;
402  }
403  }
404  message_ = nullptr;
405  }
406 
408  InterceptorBatchMethodsImpl* interceptor_methods) {
409  interceptor_methods->SetRecvMessage(message_);
410  }
411 
413  InterceptorBatchMethodsImpl* interceptor_methods) {
414  if (!got_message) return;
415  interceptor_methods->AddInterceptionHookPoint(
417  }
418  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
419  hijacked_ = true;
420  if (message_ == nullptr) return;
421  interceptor_methods->AddInterceptionHookPoint(
423  got_message = true;
424  }
425 
426  private:
427  R* message_;
428  ByteBuffer recv_buf_;
429  bool allow_not_getting_message_;
430  bool hijacked_ = false;
431 };
432 
434  public:
435  virtual Status Deserialize(ByteBuffer* buf) = 0;
436  virtual ~DeserializeFunc() {}
437 };
438 
439 template <class R>
440 class DeserializeFuncType final : public DeserializeFunc {
441  public:
442  DeserializeFuncType(R* message) : message_(message) {}
443  Status Deserialize(ByteBuffer* buf) override {
444  return SerializationTraits<R>::Deserialize(buf->bbuf_ptr(), message_);
445  }
446 
447  ~DeserializeFuncType() override {}
448 
449  private:
450  R* message_; // Not a managed pointer because management is external to this
451 };
452 
454  public:
456  : got_message(false), allow_not_getting_message_(false) {}
457 
458  template <class R>
459  void RecvMessage(R* message) {
460  // Use an explicit base class pointer to avoid resolution error in the
461  // following unique_ptr::reset for some old implementations.
462  DeserializeFunc* func = new DeserializeFuncType<R>(message);
463  deserialize_.reset(func);
464  message_ = message;
465  }
466 
467  // Do not change status if no message is received.
468  void AllowNoMessage() { allow_not_getting_message_ = true; }
469 
471 
472  protected:
473  void AddOp(grpc_op* ops, size_t* nops) {
474  if (!deserialize_ || hijacked_) return;
475  grpc_op* op = &ops[(*nops)++];
476  op->op = GRPC_OP_RECV_MESSAGE;
477  op->flags = 0;
478  op->reserved = NULL;
479  op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
480  }
481 
482  void FinishOp(bool* status) {
483  if (!deserialize_ || hijacked_) return;
484  if (recv_buf_.Valid()) {
485  if (*status) {
486  got_message = true;
487  *status = deserialize_->Deserialize(&recv_buf_).ok();
488  recv_buf_.Release();
489  } else {
490  got_message = false;
491  recv_buf_.Clear();
492  }
493  } else {
494  got_message = false;
495  if (!allow_not_getting_message_) {
496  *status = false;
497  }
498  }
499  deserialize_.reset();
500  }
501 
503  InterceptorBatchMethodsImpl* interceptor_methods) {
504  interceptor_methods->SetRecvMessage(message_);
505  }
506 
508  InterceptorBatchMethodsImpl* interceptor_methods) {
509  if (!got_message) return;
510  interceptor_methods->AddInterceptionHookPoint(
512  }
513  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
514  hijacked_ = true;
515  if (!deserialize_) return;
516  interceptor_methods->AddInterceptionHookPoint(
518  }
519 
520  private:
521  void* message_;
522  bool hijacked_ = false;
523  std::unique_ptr<DeserializeFunc> deserialize_;
524  ByteBuffer recv_buf_;
525  bool allow_not_getting_message_;
526 };
527 
529  public:
530  CallOpClientSendClose() : send_(false) {}
531 
532  void ClientSendClose() { send_ = true; }
533 
534  protected:
535  void AddOp(grpc_op* ops, size_t* nops) {
536  if (!send_ || hijacked_) return;
537  grpc_op* op = &ops[(*nops)++];
539  op->flags = 0;
540  op->reserved = NULL;
541  }
542  void FinishOp(bool* status) { send_ = false; }
543 
545  InterceptorBatchMethodsImpl* interceptor_methods) {
546  if (!send_) return;
547  interceptor_methods->AddInterceptionHookPoint(
549  }
550 
552  InterceptorBatchMethodsImpl* interceptor_methods) {}
553 
554  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
555  hijacked_ = true;
556  }
557 
558  private:
559  bool hijacked_ = false;
560  bool send_;
561 };
562 
564  public:
565  CallOpServerSendStatus() : send_status_available_(false) {}
566 
568  std::multimap<grpc::string, grpc::string>* trailing_metadata,
569  const Status& status) {
570  send_error_details_ = status.error_details();
571  metadata_map_ = trailing_metadata;
572  send_status_available_ = true;
573  send_status_code_ = static_cast<grpc_status_code>(status.error_code());
574  send_error_message_ = status.error_message();
575  }
576 
577  protected:
578  void AddOp(grpc_op* ops, size_t* nops) {
579  if (!send_status_available_ || hijacked_) return;
580  trailing_metadata_ = FillMetadataArray(
581  *metadata_map_, &trailing_metadata_count_, send_error_details_);
582  grpc_op* op = &ops[(*nops)++];
585  trailing_metadata_count_;
586  op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
587  op->data.send_status_from_server.status = send_status_code_;
588  error_message_slice_ = SliceReferencingString(send_error_message_);
590  send_error_message_.empty() ? nullptr : &error_message_slice_;
591  op->flags = 0;
592  op->reserved = NULL;
593  }
594 
595  void FinishOp(bool* status) {
596  if (!send_status_available_ || hijacked_) return;
597  g_core_codegen_interface->gpr_free(trailing_metadata_);
598  send_status_available_ = false;
599  }
600 
602  InterceptorBatchMethodsImpl* interceptor_methods) {
603  if (!send_status_available_) return;
604  interceptor_methods->AddInterceptionHookPoint(
606  interceptor_methods->SetSendTrailingMetadata(metadata_map_);
607  interceptor_methods->SetSendStatus(&send_status_code_, &send_error_details_,
608  &send_error_message_);
609  }
610 
612  InterceptorBatchMethodsImpl* interceptor_methods) {}
613 
614  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
615  hijacked_ = true;
616  }
617 
618  private:
619  bool hijacked_ = false;
620  bool send_status_available_;
621  grpc_status_code send_status_code_;
622  grpc::string send_error_details_;
623  grpc::string send_error_message_;
624  size_t trailing_metadata_count_;
625  std::multimap<grpc::string, grpc::string>* metadata_map_;
626  grpc_metadata* trailing_metadata_;
627  grpc_slice error_message_slice_;
628 };
629 
631  public:
632  CallOpRecvInitialMetadata() : metadata_map_(nullptr) {}
633 
635  context->initial_metadata_received_ = true;
636  metadata_map_ = &context->recv_initial_metadata_;
637  }
638 
639  protected:
640  void AddOp(grpc_op* ops, size_t* nops) {
641  if (metadata_map_ == nullptr || hijacked_) return;
642  grpc_op* op = &ops[(*nops)++];
644  op->data.recv_initial_metadata.recv_initial_metadata = metadata_map_->arr();
645  op->flags = 0;
646  op->reserved = NULL;
647  }
648 
649  void FinishOp(bool* status) {
650  if (metadata_map_ == nullptr || hijacked_) return;
651  }
652 
654  InterceptorBatchMethodsImpl* interceptor_methods) {
655  interceptor_methods->SetRecvInitialMetadata(metadata_map_);
656  }
657 
659  InterceptorBatchMethodsImpl* interceptor_methods) {
660  if (metadata_map_ == nullptr) return;
661  interceptor_methods->AddInterceptionHookPoint(
663  metadata_map_ = nullptr;
664  }
665 
666  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
667  hijacked_ = true;
668  if (metadata_map_ == nullptr) return;
669  interceptor_methods->AddInterceptionHookPoint(
671  }
672 
673  private:
674  bool hijacked_ = false;
675  MetadataMap* metadata_map_;
676 };
677 
679  public:
681  : recv_status_(nullptr), debug_error_string_(nullptr) {}
682 
683  void ClientRecvStatus(ClientContext* context, Status* status) {
684  client_context_ = context;
685  metadata_map_ = &client_context_->trailing_metadata_;
686  recv_status_ = status;
687  error_message_ = g_core_codegen_interface->grpc_empty_slice();
688  }
689 
690  protected:
691  void AddOp(grpc_op* ops, size_t* nops) {
692  if (recv_status_ == nullptr || hijacked_) return;
693  grpc_op* op = &ops[(*nops)++];
695  op->data.recv_status_on_client.trailing_metadata = metadata_map_->arr();
696  op->data.recv_status_on_client.status = &status_code_;
697  op->data.recv_status_on_client.status_details = &error_message_;
698  op->data.recv_status_on_client.error_string = &debug_error_string_;
699  op->flags = 0;
700  op->reserved = NULL;
701  }
702 
703  void FinishOp(bool* status) {
704  if (recv_status_ == nullptr || hijacked_) return;
705  grpc::string binary_error_details = metadata_map_->GetBinaryErrorDetails();
706  *recv_status_ =
707  Status(static_cast<StatusCode>(status_code_),
708  GRPC_SLICE_IS_EMPTY(error_message_)
709  ? grpc::string()
710  : grpc::string(GRPC_SLICE_START_PTR(error_message_),
711  GRPC_SLICE_END_PTR(error_message_)),
712  binary_error_details);
713  client_context_->set_debug_error_string(
714  debug_error_string_ != nullptr ? debug_error_string_ : "");
715  g_core_codegen_interface->grpc_slice_unref(error_message_);
716  if (debug_error_string_ != nullptr) {
717  g_core_codegen_interface->gpr_free((void*)debug_error_string_);
718  }
719  }
720 
722  InterceptorBatchMethodsImpl* interceptor_methods) {
723  interceptor_methods->SetRecvStatus(recv_status_);
724  interceptor_methods->SetRecvTrailingMetadata(metadata_map_);
725  }
726 
728  InterceptorBatchMethodsImpl* interceptor_methods) {
729  if (recv_status_ == nullptr) return;
730  interceptor_methods->AddInterceptionHookPoint(
732  recv_status_ = nullptr;
733  }
734 
735  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
736  hijacked_ = true;
737  if (recv_status_ == nullptr) return;
738  interceptor_methods->AddInterceptionHookPoint(
740  }
741 
742  private:
743  bool hijacked_ = false;
744  ClientContext* client_context_;
745  MetadataMap* metadata_map_;
746  Status* recv_status_;
747  const char* debug_error_string_;
748  grpc_status_code status_code_;
749  grpc_slice error_message_;
750 };
751 
752 template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
753  class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
754  class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
755 class CallOpSet;
756 
763 template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
764 class CallOpSet : public CallOpSetInterface,
765  public Op1,
766  public Op2,
767  public Op3,
768  public Op4,
769  public Op5,
770  public Op6 {
771  public:
772  CallOpSet() : core_cq_tag_(this), return_tag_(this) {}
773  // The copy constructor and assignment operator reset the value of
774  // core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_
775  // since those are only meaningful on a specific object, not across objects.
776  CallOpSet(const CallOpSet& other)
777  : core_cq_tag_(this),
778  return_tag_(this),
779  call_(other.call_),
780  done_intercepting_(false),
781  interceptor_methods_(InterceptorBatchMethodsImpl()) {}
782 
783  CallOpSet& operator=(const CallOpSet& other) {
784  core_cq_tag_ = this;
785  return_tag_ = this;
786  call_ = other.call_;
787  done_intercepting_ = false;
788  interceptor_methods_ = InterceptorBatchMethodsImpl();
789  return *this;
790  }
791 
792  void FillOps(Call* call) override {
793  done_intercepting_ = false;
794  g_core_codegen_interface->grpc_call_ref(call->call());
795  call_ =
796  *call; // It's fine to create a copy of call since it's just pointers
797 
798  if (RunInterceptors()) {
799  ContinueFillOpsAfterInterception();
800  } else {
801  // After the interceptors are run, ContinueFillOpsAfterInterception will
802  // be run
803  }
804  }
805 
806  bool FinalizeResult(void** tag, bool* status) override {
807  if (done_intercepting_) {
808  // We have already finished intercepting and filling in the results. This
809  // round trip from the core needed to be made because interceptors were
810  // run
811  *tag = return_tag_;
812  *status = saved_status_;
813  g_core_codegen_interface->grpc_call_unref(call_.call());
814  return true;
815  }
816 
817  this->Op1::FinishOp(status);
818  this->Op2::FinishOp(status);
819  this->Op3::FinishOp(status);
820  this->Op4::FinishOp(status);
821  this->Op5::FinishOp(status);
822  this->Op6::FinishOp(status);
823  saved_status_ = *status;
824  if (RunInterceptorsPostRecv()) {
825  *tag = return_tag_;
826  g_core_codegen_interface->grpc_call_unref(call_.call());
827  return true;
828  }
829  // Interceptors are going to be run, so we can't return the tag just yet.
830  // After the interceptors are run, ContinueFinalizeResultAfterInterception
831  return false;
832  }
833 
834  void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
835 
836  void* core_cq_tag() override { return core_cq_tag_; }
837 
842  void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
843 
844  // This will be called while interceptors are run if the RPC is a hijacked
845  // RPC. This should set hijacking state for each of the ops.
846  void SetHijackingState() override {
847  this->Op1::SetHijackingState(&interceptor_methods_);
848  this->Op2::SetHijackingState(&interceptor_methods_);
849  this->Op3::SetHijackingState(&interceptor_methods_);
850  this->Op4::SetHijackingState(&interceptor_methods_);
851  this->Op5::SetHijackingState(&interceptor_methods_);
852  this->Op6::SetHijackingState(&interceptor_methods_);
853  }
854 
855  // Should be called after interceptors are done running
857  static const size_t MAX_OPS = 6;
858  grpc_op ops[MAX_OPS];
859  size_t nops = 0;
860  this->Op1::AddOp(ops, &nops);
861  this->Op2::AddOp(ops, &nops);
862  this->Op3::AddOp(ops, &nops);
863  this->Op4::AddOp(ops, &nops);
864  this->Op5::AddOp(ops, &nops);
865  this->Op6::AddOp(ops, &nops);
867  g_core_codegen_interface->grpc_call_start_batch(
868  call_.call(), ops, nops, core_cq_tag(), nullptr));
869  }
870 
871  // Should be called after interceptors are done running on the finalize result
872  // path
874  done_intercepting_ = true;
876  g_core_codegen_interface->grpc_call_start_batch(
877  call_.call(), nullptr, 0, core_cq_tag(), nullptr));
878  }
879 
880  private:
881  // Returns true if no interceptors need to be run
882  bool RunInterceptors() {
883  interceptor_methods_.ClearState();
884  interceptor_methods_.SetCallOpSetInterface(this);
885  interceptor_methods_.SetCall(&call_);
886  this->Op1::SetInterceptionHookPoint(&interceptor_methods_);
887  this->Op2::SetInterceptionHookPoint(&interceptor_methods_);
888  this->Op3::SetInterceptionHookPoint(&interceptor_methods_);
889  this->Op4::SetInterceptionHookPoint(&interceptor_methods_);
890  this->Op5::SetInterceptionHookPoint(&interceptor_methods_);
891  this->Op6::SetInterceptionHookPoint(&interceptor_methods_);
892  return interceptor_methods_.RunInterceptors();
893  }
894  // Returns true if no interceptors need to be run
895  bool RunInterceptorsPostRecv() {
896  // Call and OpSet had already been set on the set state.
897  // SetReverse also clears previously set hook points
898  interceptor_methods_.SetReverse();
899  this->Op1::SetFinishInterceptionHookPoint(&interceptor_methods_);
900  this->Op2::SetFinishInterceptionHookPoint(&interceptor_methods_);
901  this->Op3::SetFinishInterceptionHookPoint(&interceptor_methods_);
902  this->Op4::SetFinishInterceptionHookPoint(&interceptor_methods_);
903  this->Op5::SetFinishInterceptionHookPoint(&interceptor_methods_);
904  this->Op6::SetFinishInterceptionHookPoint(&interceptor_methods_);
905  return interceptor_methods_.RunInterceptors();
906  }
907 
908  void* core_cq_tag_;
909  void* return_tag_;
910  Call call_;
911  bool done_intercepting_ = false;
912  InterceptorBatchMethodsImpl interceptor_methods_;
913  bool saved_status_;
914 };
915 
916 } // namespace internal
917 } // namespace grpc
918 
919 #endif // GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
void ContinueFillOpsAfterInterception() override
Definition: call_op_set.h:856
everything went ok
Definition: grpc_types.h:361
grpc_op_type op
Operation type, as defined by grpc_op_type.
Definition: grpc_types.h:543
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:551
grpc_metadata_array * recv_initial_metadata
Definition: grpc_types.h:586
union grpc_op::grpc_op_data data
bool get_no_compression() const
Get value for the flag indicating whether compression for the next message write is forcefully disabl...
Definition: call_op_set.h:117
void * reserved
Reserved for future usage.
Definition: grpc_types.h:547
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:216
WriteOptions & clear_buffer_hint()
Clears flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:134
void SetRecvMessage(void *message)
Definition: interceptor_common.h:137
grpc_status_code
Definition: status.h:26
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:141
void FinishOp(bool *status)
Definition: call_op_set.h:595
void SetHijackingState() override
Definition: call_op_set.h:846
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:640
void ClientSendClose()
Definition: call_op_set.h:532
virtual void grpc_call_ref(grpc_call *call)=0
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:554
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:735
CallOpRecvMessage()
Definition: call_op_set.h:364
std::string string
Definition: config.h:35
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:277
bool get_buffer_hint() const
Get value for the flag indicating that the write may be buffered and need not go out on the wire imme...
Definition: call_op_set.h:143
WriteOptions & clear_no_compression()
Clears flag for the disabling of compression for the next message write.
Definition: call_op_set.h:108
struct grpc_byte_buffer ** recv_message
Definition: grpc_types.h:594
void FinishOp(bool *status)
Definition: call_op_set.h:649
void SetSendMessage(ByteBuffer *buf)
Definition: interceptor_common.h:118
struct grpc_byte_buffer * send_message
This op takes ownership of the slices in send_message.
Definition: grpc_types.h:569
Send a close from the client: one and only one instance MUST be sent from the client, unless the call was cancelled - in which case this can be skipped.
Definition: grpc_types.h:508
CallOpSet(const CallOpSet &other)
Definition: call_op_set.h:776
Send status from the server: one and only one instance MUST be sent from the server unless the call w...
Definition: grpc_types.h:513
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:307
void AllowNoMessage()
Definition: call_op_set.h:468
Definition: metadata_map.h:33
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
#define GRPC_WRITE_NO_COMPRESS
Force compression to be disabled for a particular write (start_write/add_metadata).
Definition: grpc_types.h:408
grpc_slice key
the key, value values are expected to line up with grpc_mdelem: if changing them, update metadata...
Definition: grpc_types.h:440
Status Deserialize(ByteBuffer *buf) override
Definition: call_op_set.h:443
CallOpRecvInitialMetadata()
Definition: call_op_set.h:632
struct grpc_op::grpc_op_data::grpc_op_recv_message recv_message
void SetRecvStatus(Status *status)
Definition: interceptor_common.h:143
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:755
#define GRPC_SLICE_IS_EMPTY(slice)
Definition: slice.h:127
grpc_slice value
Definition: grpc_types.h:441
#define GRPC_WRITE_THROUGH
Force this message to be written to the socket before completing it.
Definition: grpc_types.h:410
const char ** error_string
If this is not nullptr, it will be populated with the full fidelity error string for debugging purpos...
Definition: grpc_types.h:608
grpc_slice * status_details
Definition: grpc_types.h:604
void Clear()
Clear all flags.
Definition: call_op_set.h:92
virtual grpc_slice grpc_empty_slice()=0
WriteOptions & set_write_through()
Guarantee that all bytes have been written to the socket before completing this write (usually writes...
Definition: call_op_set.h:178
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:502
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1]. ...
Definition: slice.h:80
bool send_
Definition: call_op_set.h:282
#define GRPC_WRITE_BUFFER_HINT
Write Flags:
Definition: grpc_types.h:405
void RecvInitialMetadata(ClientContext *context)
Definition: call_op_set.h:634
Send a message: 0 or more of these operations can occur for each call.
Definition: grpc_types.h:503
bool got_message
Definition: call_op_set.h:374
virtual void grpc_call_unref(grpc_call *call)=0
bool is_write_through() const
Definition: call_op_set.h:183
WriteOptions()
Definition: call_op_set.h:87
WriteOptions & clear_last_message()
Clears flag indicating that this is the last message in a stream, disabling coalescing.
Definition: call_op_set.h:171
grpc_slice SliceReferencingString(const grpc::string &str)
Definition: slice.h:131
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata::grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level
void ClientRecvStatus(ClientContext *context, Status *status)
Definition: call_op_set.h:683
grpc_compression_level
Compression levels allow a party with knowledge of its peer&#39;s accepted encodings to request compressi...
Definition: compression_types.h:70
grpc_call * call() const
Definition: call.h:70
#define GRPC_SLICE_START_PTR(slice)
Definition: slice.h:116
void ContinueFinalizeResultAfterInterception() override
Definition: call_op_set.h:873
bool is_set
Definition: call_op_set.h:288
grpc_metadata_array * trailing_metadata
ownership of the array is with the caller, but ownership of the elements stays with the call object (...
Definition: grpc_types.h:602
WriteOptions & clear_corked()
Definition: call_op_set.h:152
WriteOptions & set_no_compression()
Sets flag for the disabling of compression for the next message write.
Definition: call_op_set.h:100
#define GRPC_SLICE_END_PTR(slice)
Definition: slice.h:125
CallOpClientSendClose()
Definition: call_op_set.h:530
void FinishOp(bool *status)
Definition: call_op_set.h:260
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:165
WriteOptions & operator=(const WriteOptions &rhs)
Definition: call_op_set.h:191
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:691
Definition: byte_buffer.h:53
::google::protobuf::util::Status Status
Definition: config_protobuf.h:93
uint32_t flags_
Definition: call_op_set.h:283
Default argument for CallOpSet.
Definition: call_op_set.h:212
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:507
void FinishOp(bool *status)
Definition: call_op_set.h:703
grpc_compression_level level
Definition: call_op_set.h:289
bool got_message
Definition: call_op_set.h:470
grpc::string error_message() const
Return the instance&#39;s error message.
Definition: status.h:112
Definition: call_op_set.h:433
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
StatusCode error_code() const
Return the instance&#39;s error code.
Definition: status.h:110
CallOpServerSendStatus()
Definition: call_op_set.h:565
bool FinalizeResult(void **tag, bool *status) override
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
Definition: call_op_set.h:806
Definition: call_op_set.h:563
void set_compression_level(grpc_compression_level level)
Definition: call_op_set.h:237
Definition: call_op_set.h:223
bool is_corked() const
Definition: call_op_set.h:157
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:473
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:666
Definition: call_op_set.h:630
Status SendMessage(const M &message, WriteOptions options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call_op_set.h:341
std::multimap< grpc::string, grpc::string > * metadata_map_
Definition: call_op_set.h:285
uint32_t flags() const
Returns raw flags bitset.
Definition: call_op_set.h:95
grpc_metadata * FillMetadataArray(const std::multimap< grpc::string, grpc::string > &metadata, size_t *metadata_count, const grpc::string &optional_error_details)
Definition: call_op_set.h:59
struct grpc_op::grpc_op_data::grpc_op_recv_initial_metadata recv_initial_metadata
A single metadata element.
Definition: grpc_types.h:437
Definition: call_op_set.h:293
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata send_initial_metadata
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:243
void FinishOp(bool *status)
Definition: call_op_set.h:482
Operation data: one field for each op type (except SEND_CLOSE_FROM_CLIENT which has no arguments) ...
Definition: grpc_types.h:541
virtual grpc_slice grpc_slice_from_static_buffer(const void *buffer, size_t length)=0
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:544
An Alarm posts the user provided tag to its associated completion queue upon expiry or cancellation...
Definition: alarm.h:33
Receive initial metadata: one and only one MUST be made on the client, must not be made on the server...
Definition: grpc_types.h:518
void RecvMessage(R *message)
Definition: call_op_set.h:459
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:218
void FillOps(Call *call) override
Fills in grpc_op, starting from ops[*nops] and moving upwards.
Definition: call_op_set.h:792
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:327
struct grpc_op::grpc_op_data::grpc_op_send_message send_message
void SendInitialMetadata(std::multimap< grpc::string, grpc::string > *metadata, uint32_t flags)
Definition: call_op_set.h:229
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:50
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:578
void * core_cq_tag() override
Get the tag to be used at the core completion queue.
Definition: call_op_set.h:836
#define GRPC_MUST_USE_RESULT
Definition: port_platform.h:473
Send initial metadata: one and only one instance MUST be sent for each call, unless the call was canc...
Definition: grpc_types.h:499
WriteOptions(const WriteOptions &other)
Definition: call_op_set.h:88
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:535
CallOpClientRecvStatus()
Definition: call_op_set.h:680
Definition: interceptor_common.h:36
Definition: byte_buffer.h:41
void ServerSendStatus(std::multimap< grpc::string, grpc::string > *trailing_metadata, const Status &status)
Definition: call_op_set.h:567
void FinishOp(bool *status)
Definition: call_op_set.h:542
Per-message write options.
Definition: call_op_set.h:85
grpc_slice * status_details
optional: set to NULL if no details need sending, non-NULL if they do pointer will not be retained pa...
Definition: grpc_types.h:578
CallOpSet & operator=(const CallOpSet &other)
Definition: call_op_set.h:783
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:274
void AllowNoMessage()
Definition: call_op_set.h:372
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:611
size_t trailing_metadata_count
Definition: grpc_types.h:572
CallOpSendMessage()
Definition: call_op_set.h:295
grpc_metadata * metadata
Definition: grpc_types.h:555
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:319
void FinishOp(bool *status)
Definition: call_op_set.h:215
virtual grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *tag, void *reserved)=0
void FinishOp(bool *status)
Definition: call_op_set.h:386
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue.h:95
Definition: call_op_set.h:528
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:412
An abstract collection of call ops, used to generate the grpc_call_op structure to pass down to the l...
Definition: call_op_set_interface.h:34
virtual void grpc_slice_unref(grpc_slice slice)=0
void SetSendStatus(grpc_status_code *code, grpc::string *error_details, grpc::string *error_message)
Definition: interceptor_common.h:125
void SetSendTrailingMetadata(std::multimap< grpc::string, grpc::string > *metadata)
Definition: interceptor_common.h:132
virtual ~DeserializeFunc()
Definition: call_op_set.h:436
void SetRecvInitialMetadata(MetadataMap *map)
Definition: interceptor_common.h:139
void RecvMessage(R *message)
Definition: call_op_set.h:369
struct grpc_op::grpc_op_data::grpc_op_send_status_from_server send_status_from_server
Interface between the codegen library and the minimal subset of core features required by the generat...
Definition: core_codegen_interface.h:37
void set_output_tag(void *return_tag)
Definition: call_op_set.h:834
CallOpSendInitialMetadata()
Definition: call_op_set.h:225
void SetSendInitialMetadata(std::multimap< grpc::string, grpc::string > *metadata)
Definition: interceptor_common.h:120
WriteOptions & set_corked()
corked bit: aliases set_buffer_hint currently, with the intent that set_buffer_hint will be removed i...
Definition: call_op_set.h:147
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:220
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:214
Did it work? If it didn&#39;t, why?
Definition: status.h:31
void FinishOp(bool *status)
Definition: call_op_set.h:317
Receive status on the client: one and only one must be made on the client.
Definition: grpc_types.h:528
grpc_metadata * initial_metadata_
Definition: call_op_set.h:286
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:614
void AddInterceptionHookPoint(experimental::InterceptionHookPoints type)
Definition: interceptor_common.h:78
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:513
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:658
DeserializeFuncType(R *message)
Definition: call_op_set.h:442
size_t initial_metadata_count_
Definition: call_op_set.h:284
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:727
void SetRecvTrailingMetadata(MetadataMap *map)
Definition: interceptor_common.h:145
grpc_status_code status
Definition: grpc_types.h:574
grpc_status_code * status
Definition: grpc_types.h:603
size_t count
Definition: grpc_types.h:554
Definition: call_op_set.h:453
uint32_t flags
Write flags bitset for grpc_begin_messages.
Definition: grpc_types.h:545
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:653
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:721
Definition: call_op_set.h:678
virtual void gpr_free(void *p)=0
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:601
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:418
Receive a message: 0 or more of these operations can occur for each call.
Definition: grpc_types.h:522
A sequence of bytes.
Definition: byte_buffer.h:62
CallOpGenericRecvMessage()
Definition: call_op_set.h:455
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:377
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:407
CallOpSet()
Definition: call_op_set.h:772
const char kBinaryErrorDetailsKey[]
Definition: metadata_map.h:31
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:266
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:842
grpc::string error_details() const
Return the (binary) error details.
Definition: status.h:115
Straightforward wrapping of the C call object.
Definition: call.h:36
grpc_metadata * trailing_metadata
Definition: grpc_types.h:573
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:330
struct grpc_op::grpc_op_data::grpc_op_recv_status_on_client recv_status_on_client
virtual void * gpr_malloc(size_t size)=0
~DeserializeFuncType() override
Definition: call_op_set.h:447