GRPC C++  1.20.0
call_op_set.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
20 #define GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
21 
22 #include <assert.h>
23 #include <array>
24 #include <cstring>
25 #include <functional>
26 #include <map>
27 #include <memory>
28 #include <vector>
29 
44 
45 #include <grpc/impl/codegen/atm.h>
48 
49 namespace grpc {
50 
53 
54 namespace internal {
55 class Call;
56 class CallHook;
57 
58 // TODO(yangg) if the map is changed before we send, the pointers will be a
59 // mess. Make sure it does not happen.
61  const std::multimap<grpc::string, grpc::string>& metadata,
62  size_t* metadata_count, const grpc::string& optional_error_details) {
63  *metadata_count = metadata.size() + (optional_error_details.empty() ? 0 : 1);
64  if (*metadata_count == 0) {
65  return nullptr;
66  }
67  grpc_metadata* metadata_array =
68  (grpc_metadata*)(g_core_codegen_interface->gpr_malloc(
69  (*metadata_count) * sizeof(grpc_metadata)));
70  size_t i = 0;
71  for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
72  metadata_array[i].key = SliceReferencingString(iter->first);
73  metadata_array[i].value = SliceReferencingString(iter->second);
74  }
75  if (!optional_error_details.empty()) {
76  metadata_array[i].key =
77  g_core_codegen_interface->grpc_slice_from_static_buffer(
79  metadata_array[i].value = SliceReferencingString(optional_error_details);
80  }
81  return metadata_array;
82 }
83 } // namespace internal
84 
86 class WriteOptions {
87  public:
88  WriteOptions() : flags_(0), last_message_(false) {}
89  WriteOptions(const WriteOptions& other)
90  : flags_(other.flags_), last_message_(other.last_message_) {}
91 
93  inline void Clear() { flags_ = 0; }
94 
96  inline uint32_t flags() const { return flags_; }
97 
102  SetBit(GRPC_WRITE_NO_COMPRESS);
103  return *this;
104  }
105 
110  ClearBit(GRPC_WRITE_NO_COMPRESS);
111  return *this;
112  }
113 
118  inline bool get_no_compression() const {
119  return GetBit(GRPC_WRITE_NO_COMPRESS);
120  }
121 
127  SetBit(GRPC_WRITE_BUFFER_HINT);
128  return *this;
129  }
130 
136  ClearBit(GRPC_WRITE_BUFFER_HINT);
137  return *this;
138  }
139 
144  inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
145 
149  SetBit(GRPC_WRITE_BUFFER_HINT);
150  return *this;
151  }
152 
154  ClearBit(GRPC_WRITE_BUFFER_HINT);
155  return *this;
156  }
157 
158  inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
159 
166  last_message_ = true;
167  return *this;
168  }
169 
173  last_message_ = false;
174  return *this;
175  }
176 
180  SetBit(GRPC_WRITE_THROUGH);
181  return *this;
182  }
183 
184  inline bool is_write_through() const { return GetBit(GRPC_WRITE_THROUGH); }
185 
190  bool is_last_message() const { return last_message_; }
191 
193  flags_ = rhs.flags_;
194  return *this;
195  }
196 
197  private:
198  void SetBit(const uint32_t mask) { flags_ |= mask; }
199 
200  void ClearBit(const uint32_t mask) { flags_ &= ~mask; }
201 
202  bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
203 
204  uint32_t flags_;
205  bool last_message_;
206 };
207 
208 namespace internal {
209 
212 template <int I>
213 class CallNoOp {
214  protected:
215  void AddOp(grpc_op* ops, size_t* nops) {}
216  void FinishOp(bool* status) {}
218  InterceptorBatchMethodsImpl* interceptor_methods) {}
220  InterceptorBatchMethodsImpl* interceptor_methods) {}
221  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {}
222 };
223 
225  public:
226  CallOpSendInitialMetadata() : send_(false) {
227  maybe_compression_level_.is_set = false;
228  }
229 
230  void SendInitialMetadata(std::multimap<grpc::string, grpc::string>* metadata,
231  uint32_t flags) {
232  maybe_compression_level_.is_set = false;
233  send_ = true;
234  flags_ = flags;
235  metadata_map_ = metadata;
236  }
237 
239  maybe_compression_level_.is_set = true;
240  maybe_compression_level_.level = level;
241  }
242 
243  protected:
244  void AddOp(grpc_op* ops, size_t* nops) {
245  if (!send_ || hijacked_) return;
246  grpc_op* op = &ops[(*nops)++];
248  op->flags = flags_;
249  op->reserved = NULL;
250  initial_metadata_ =
251  FillMetadataArray(*metadata_map_, &initial_metadata_count_, "");
252  op->data.send_initial_metadata.count = initial_metadata_count_;
253  op->data.send_initial_metadata.metadata = initial_metadata_;
255  maybe_compression_level_.is_set;
256  if (maybe_compression_level_.is_set) {
258  maybe_compression_level_.level;
259  }
260  }
261  void FinishOp(bool* status) {
262  if (!send_ || hijacked_) return;
263  g_core_codegen_interface->gpr_free(initial_metadata_);
264  send_ = false;
265  }
266 
268  InterceptorBatchMethodsImpl* interceptor_methods) {
269  if (!send_) return;
270  interceptor_methods->AddInterceptionHookPoint(
272  interceptor_methods->SetSendInitialMetadata(metadata_map_);
273  }
274 
276  InterceptorBatchMethodsImpl* interceptor_methods) {}
277 
278  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
279  hijacked_ = true;
280  }
281 
282  bool hijacked_ = false;
283  bool send_;
284  uint32_t flags_;
286  std::multimap<grpc::string, grpc::string>* metadata_map_;
288  struct {
289  bool is_set;
291  } maybe_compression_level_;
292 };
293 
295  public:
296  CallOpSendMessage() : send_buf_() {}
297 
300  template <class M>
301  Status SendMessage(const M& message,
303 
304  template <class M>
305  Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
306 
310  template <class M>
311  Status SendMessagePtr(const M* message,
313 
316  template <class M>
317  Status SendMessagePtr(const M* message) GRPC_MUST_USE_RESULT;
318 
319  protected:
320  void AddOp(grpc_op* ops, size_t* nops) {
321  if (msg_ == nullptr && !send_buf_.Valid()) return;
322  if (hijacked_) {
323  serializer_ = nullptr;
324  return;
325  }
326  if (msg_ != nullptr) {
327  GPR_CODEGEN_ASSERT(serializer_(msg_).ok());
328  }
329  serializer_ = nullptr;
330  grpc_op* op = &ops[(*nops)++];
331  op->op = GRPC_OP_SEND_MESSAGE;
332  op->flags = write_options_.flags();
333  op->reserved = NULL;
334  op->data.send_message.send_message = send_buf_.c_buffer();
335  // Flags are per-message: clear them after use.
336  write_options_.Clear();
337  }
338  void FinishOp(bool* status) {
339  if (msg_ == nullptr && !send_buf_.Valid()) return;
340  if (hijacked_ && failed_send_) {
341  // Hijacking interceptor failed this Op
342  *status = false;
343  } else if (!*status) {
344  // This Op was passed down to core and the Op failed
345  failed_send_ = true;
346  }
347  }
348 
350  InterceptorBatchMethodsImpl* interceptor_methods) {
351  if (msg_ == nullptr && !send_buf_.Valid()) return;
352  interceptor_methods->AddInterceptionHookPoint(
354  interceptor_methods->SetSendMessage(&send_buf_, &msg_, &failed_send_,
355  serializer_);
356  }
357 
359  InterceptorBatchMethodsImpl* interceptor_methods) {
360  if (msg_ != nullptr || send_buf_.Valid()) {
361  interceptor_methods->AddInterceptionHookPoint(
363  }
364  send_buf_.Clear();
365  msg_ = nullptr;
366  // The contents of the SendMessage value that was previously set
367  // has had its references stolen by core's operations
368  interceptor_methods->SetSendMessage(nullptr, nullptr, &failed_send_,
369  nullptr);
370  }
371 
372  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
373  hijacked_ = true;
374  }
375 
376  private:
377  const void* msg_ = nullptr; // The original non-serialized message
378  bool hijacked_ = false;
379  bool failed_send_ = false;
380  ByteBuffer send_buf_;
381  WriteOptions write_options_;
382  std::function<Status(const void*)> serializer_;
383 };
384 
385 template <class M>
387  write_options_ = options;
388  serializer_ = [this](const void* message) {
389  bool own_buf;
390  send_buf_.Clear();
391  // TODO(vjpai): Remove the void below when possible
392  // The void in the template parameter below should not be needed
393  // (since it should be implicit) but is needed due to an observed
394  // difference in behavior between clang and gcc for certain internal users
396  *static_cast<const M*>(message), send_buf_.bbuf_ptr(), &own_buf);
397  if (!own_buf) {
398  send_buf_.Duplicate();
399  }
400  return result;
401  };
402  // Serialize immediately only if we do not have access to the message pointer
403  if (msg_ == nullptr) {
404  Status result = serializer_(&message);
405  serializer_ = nullptr;
406  return result;
407  }
408  return Status();
409 }
410 
411 template <class M>
413  return SendMessage(message, WriteOptions());
414 }
415 
416 template <class M>
418  WriteOptions options) {
419  msg_ = message;
420  return SendMessage(*message, options);
421 }
422 
423 template <class M>
425  msg_ = message;
426  return SendMessage(*message, WriteOptions());
427 }
428 
429 template <class R>
430 class CallOpRecvMessage {
431  public:
433  : got_message(false),
434  message_(nullptr),
435  allow_not_getting_message_(false) {}
436 
437  void RecvMessage(R* message) { message_ = message; }
438 
439  // Do not change status if no message is received.
440  void AllowNoMessage() { allow_not_getting_message_ = true; }
441 
443 
444  protected:
445  void AddOp(grpc_op* ops, size_t* nops) {
446  if (message_ == nullptr || hijacked_) return;
447  grpc_op* op = &ops[(*nops)++];
448  op->op = GRPC_OP_RECV_MESSAGE;
449  op->flags = 0;
450  op->reserved = NULL;
451  op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
452  }
453 
454  void FinishOp(bool* status) {
455  if (message_ == nullptr || hijacked_) return;
456  if (recv_buf_.Valid()) {
457  if (*status) {
458  got_message = *status =
459  SerializationTraits<R>::Deserialize(recv_buf_.bbuf_ptr(), message_)
460  .ok();
461  recv_buf_.Release();
462  } else {
463  got_message = false;
464  recv_buf_.Clear();
465  }
466  } else {
467  got_message = false;
468  if (!allow_not_getting_message_) {
469  *status = false;
470  }
471  }
472  message_ = nullptr;
473  }
474 
476  InterceptorBatchMethodsImpl* interceptor_methods) {
477  if (message_ == nullptr) return;
478  interceptor_methods->SetRecvMessage(message_, &got_message);
479  }
480 
482  InterceptorBatchMethodsImpl* interceptor_methods) {
483  if (message_ == nullptr) return;
484  interceptor_methods->AddInterceptionHookPoint(
486  if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
487  }
488  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
489  hijacked_ = true;
490  if (message_ == nullptr) return;
491  interceptor_methods->AddInterceptionHookPoint(
493  got_message = true;
494  }
495 
496  private:
497  R* message_;
498  ByteBuffer recv_buf_;
499  bool allow_not_getting_message_;
500  bool hijacked_ = false;
501 };
502 
504  public:
505  virtual Status Deserialize(ByteBuffer* buf) = 0;
506  virtual ~DeserializeFunc() {}
507 };
508 
509 template <class R>
510 class DeserializeFuncType final : public DeserializeFunc {
511  public:
512  DeserializeFuncType(R* message) : message_(message) {}
513  Status Deserialize(ByteBuffer* buf) override {
514  return SerializationTraits<R>::Deserialize(buf->bbuf_ptr(), message_);
515  }
516 
517  ~DeserializeFuncType() override {}
518 
519  private:
520  R* message_; // Not a managed pointer because management is external to this
521 };
522 
524  public:
526  : got_message(false), allow_not_getting_message_(false) {}
527 
528  template <class R>
529  void RecvMessage(R* message) {
530  // Use an explicit base class pointer to avoid resolution error in the
531  // following unique_ptr::reset for some old implementations.
532  DeserializeFunc* func = new DeserializeFuncType<R>(message);
533  deserialize_.reset(func);
534  message_ = message;
535  }
536 
537  // Do not change status if no message is received.
538  void AllowNoMessage() { allow_not_getting_message_ = true; }
539 
541 
542  protected:
543  void AddOp(grpc_op* ops, size_t* nops) {
544  if (!deserialize_ || hijacked_) return;
545  grpc_op* op = &ops[(*nops)++];
546  op->op = GRPC_OP_RECV_MESSAGE;
547  op->flags = 0;
548  op->reserved = NULL;
549  op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
550  }
551 
552  void FinishOp(bool* status) {
553  if (!deserialize_ || hijacked_) return;
554  if (recv_buf_.Valid()) {
555  if (*status) {
556  got_message = true;
557  *status = deserialize_->Deserialize(&recv_buf_).ok();
558  recv_buf_.Release();
559  } else {
560  got_message = false;
561  recv_buf_.Clear();
562  }
563  } else {
564  got_message = false;
565  if (!allow_not_getting_message_) {
566  *status = false;
567  }
568  }
569  deserialize_.reset();
570  }
571 
573  InterceptorBatchMethodsImpl* interceptor_methods) {
574  if (!deserialize_) return;
575  interceptor_methods->SetRecvMessage(message_, &got_message);
576  }
577 
579  InterceptorBatchMethodsImpl* interceptor_methods) {
580  if (!deserialize_) return;
581  interceptor_methods->AddInterceptionHookPoint(
583  if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
584  }
585  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
586  hijacked_ = true;
587  if (!deserialize_) return;
588  interceptor_methods->AddInterceptionHookPoint(
590  got_message = true;
591  }
592 
593  private:
594  void* message_;
595  bool hijacked_ = false;
596  std::unique_ptr<DeserializeFunc> deserialize_;
597  ByteBuffer recv_buf_;
598  bool allow_not_getting_message_;
599 };
600 
602  public:
603  CallOpClientSendClose() : send_(false) {}
604 
605  void ClientSendClose() { send_ = true; }
606 
607  protected:
608  void AddOp(grpc_op* ops, size_t* nops) {
609  if (!send_ || hijacked_) return;
610  grpc_op* op = &ops[(*nops)++];
612  op->flags = 0;
613  op->reserved = NULL;
614  }
615  void FinishOp(bool* status) { send_ = false; }
616 
618  InterceptorBatchMethodsImpl* interceptor_methods) {
619  if (!send_) return;
620  interceptor_methods->AddInterceptionHookPoint(
622  }
623 
625  InterceptorBatchMethodsImpl* interceptor_methods) {}
626 
627  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
628  hijacked_ = true;
629  }
630 
631  private:
632  bool hijacked_ = false;
633  bool send_;
634 };
635 
637  public:
638  CallOpServerSendStatus() : send_status_available_(false) {}
639 
641  std::multimap<grpc::string, grpc::string>* trailing_metadata,
642  const Status& status) {
643  send_error_details_ = status.error_details();
644  metadata_map_ = trailing_metadata;
645  send_status_available_ = true;
646  send_status_code_ = static_cast<grpc_status_code>(status.error_code());
647  send_error_message_ = status.error_message();
648  }
649 
650  protected:
651  void AddOp(grpc_op* ops, size_t* nops) {
652  if (!send_status_available_ || hijacked_) return;
653  trailing_metadata_ = FillMetadataArray(
654  *metadata_map_, &trailing_metadata_count_, send_error_details_);
655  grpc_op* op = &ops[(*nops)++];
658  trailing_metadata_count_;
659  op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
660  op->data.send_status_from_server.status = send_status_code_;
661  error_message_slice_ = SliceReferencingString(send_error_message_);
663  send_error_message_.empty() ? nullptr : &error_message_slice_;
664  op->flags = 0;
665  op->reserved = NULL;
666  }
667 
668  void FinishOp(bool* status) {
669  if (!send_status_available_ || hijacked_) return;
670  g_core_codegen_interface->gpr_free(trailing_metadata_);
671  send_status_available_ = false;
672  }
673 
675  InterceptorBatchMethodsImpl* interceptor_methods) {
676  if (!send_status_available_) return;
677  interceptor_methods->AddInterceptionHookPoint(
679  interceptor_methods->SetSendTrailingMetadata(metadata_map_);
680  interceptor_methods->SetSendStatus(&send_status_code_, &send_error_details_,
681  &send_error_message_);
682  }
683 
685  InterceptorBatchMethodsImpl* interceptor_methods) {}
686 
687  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
688  hijacked_ = true;
689  }
690 
691  private:
692  bool hijacked_ = false;
693  bool send_status_available_;
694  grpc_status_code send_status_code_;
695  grpc::string send_error_details_;
696  grpc::string send_error_message_;
697  size_t trailing_metadata_count_;
698  std::multimap<grpc::string, grpc::string>* metadata_map_;
699  grpc_metadata* trailing_metadata_;
700  grpc_slice error_message_slice_;
701 };
702 
704  public:
705  CallOpRecvInitialMetadata() : metadata_map_(nullptr) {}
706 
708  context->initial_metadata_received_ = true;
709  metadata_map_ = &context->recv_initial_metadata_;
710  }
711 
712  protected:
713  void AddOp(grpc_op* ops, size_t* nops) {
714  if (metadata_map_ == nullptr || hijacked_) return;
715  grpc_op* op = &ops[(*nops)++];
717  op->data.recv_initial_metadata.recv_initial_metadata = metadata_map_->arr();
718  op->flags = 0;
719  op->reserved = NULL;
720  }
721 
722  void FinishOp(bool* status) {
723  if (metadata_map_ == nullptr || hijacked_) return;
724  }
725 
727  InterceptorBatchMethodsImpl* interceptor_methods) {
728  interceptor_methods->SetRecvInitialMetadata(metadata_map_);
729  }
730 
732  InterceptorBatchMethodsImpl* interceptor_methods) {
733  if (metadata_map_ == nullptr) return;
734  interceptor_methods->AddInterceptionHookPoint(
736  metadata_map_ = nullptr;
737  }
738 
739  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
740  hijacked_ = true;
741  if (metadata_map_ == nullptr) return;
742  interceptor_methods->AddInterceptionHookPoint(
744  }
745 
746  private:
747  bool hijacked_ = false;
748  MetadataMap* metadata_map_;
749 };
750 
752  public:
754  : recv_status_(nullptr), debug_error_string_(nullptr) {}
755 
756  void ClientRecvStatus(ClientContext* context, Status* status) {
757  client_context_ = context;
758  metadata_map_ = &client_context_->trailing_metadata_;
759  recv_status_ = status;
760  error_message_ = g_core_codegen_interface->grpc_empty_slice();
761  }
762 
763  protected:
764  void AddOp(grpc_op* ops, size_t* nops) {
765  if (recv_status_ == nullptr || hijacked_) return;
766  grpc_op* op = &ops[(*nops)++];
768  op->data.recv_status_on_client.trailing_metadata = metadata_map_->arr();
769  op->data.recv_status_on_client.status = &status_code_;
770  op->data.recv_status_on_client.status_details = &error_message_;
771  op->data.recv_status_on_client.error_string = &debug_error_string_;
772  op->flags = 0;
773  op->reserved = NULL;
774  }
775 
776  void FinishOp(bool* status) {
777  if (recv_status_ == nullptr || hijacked_) return;
778  grpc::string binary_error_details = metadata_map_->GetBinaryErrorDetails();
779  *recv_status_ =
780  Status(static_cast<StatusCode>(status_code_),
781  GRPC_SLICE_IS_EMPTY(error_message_)
782  ? grpc::string()
783  : grpc::string(GRPC_SLICE_START_PTR(error_message_),
784  GRPC_SLICE_END_PTR(error_message_)),
785  binary_error_details);
786  client_context_->set_debug_error_string(
787  debug_error_string_ != nullptr ? debug_error_string_ : "");
788  g_core_codegen_interface->grpc_slice_unref(error_message_);
789  if (debug_error_string_ != nullptr) {
790  g_core_codegen_interface->gpr_free((void*)debug_error_string_);
791  }
792  }
793 
795  InterceptorBatchMethodsImpl* interceptor_methods) {
796  interceptor_methods->SetRecvStatus(recv_status_);
797  interceptor_methods->SetRecvTrailingMetadata(metadata_map_);
798  }
799 
801  InterceptorBatchMethodsImpl* interceptor_methods) {
802  if (recv_status_ == nullptr) return;
803  interceptor_methods->AddInterceptionHookPoint(
805  recv_status_ = nullptr;
806  }
807 
808  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
809  hijacked_ = true;
810  if (recv_status_ == nullptr) return;
811  interceptor_methods->AddInterceptionHookPoint(
813  }
814 
815  private:
816  bool hijacked_ = false;
817  ClientContext* client_context_;
818  MetadataMap* metadata_map_;
819  Status* recv_status_;
820  const char* debug_error_string_;
821  grpc_status_code status_code_;
822  grpc_slice error_message_;
823 };
824 
825 template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
826  class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
827  class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
828 class CallOpSet;
829 
836 template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
837 class CallOpSet : public CallOpSetInterface,
838  public Op1,
839  public Op2,
840  public Op3,
841  public Op4,
842  public Op5,
843  public Op6 {
844  public:
845  CallOpSet() : core_cq_tag_(this), return_tag_(this) {}
846  // The copy constructor and assignment operator reset the value of
847  // core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_
848  // since those are only meaningful on a specific object, not across objects.
849  CallOpSet(const CallOpSet& other)
850  : core_cq_tag_(this),
851  return_tag_(this),
852  call_(other.call_),
853  done_intercepting_(false),
854  interceptor_methods_(InterceptorBatchMethodsImpl()) {}
855 
856  CallOpSet& operator=(const CallOpSet& other) {
857  core_cq_tag_ = this;
858  return_tag_ = this;
859  call_ = other.call_;
860  done_intercepting_ = false;
861  interceptor_methods_ = InterceptorBatchMethodsImpl();
862  return *this;
863  }
864 
865  void FillOps(Call* call) override {
866  done_intercepting_ = false;
867  g_core_codegen_interface->grpc_call_ref(call->call());
868  call_ =
869  *call; // It's fine to create a copy of call since it's just pointers
870 
871  if (RunInterceptors()) {
872  ContinueFillOpsAfterInterception();
873  } else {
874  // After the interceptors are run, ContinueFillOpsAfterInterception will
875  // be run
876  }
877  }
878 
879  bool FinalizeResult(void** tag, bool* status) override {
880  if (done_intercepting_) {
881  // Complete the avalanching since we are done with this batch of ops
882  call_.cq()->CompleteAvalanching();
883  // We have already finished intercepting and filling in the results. This
884  // round trip from the core needed to be made because interceptors were
885  // run
886  *tag = return_tag_;
887  *status = saved_status_;
888  g_core_codegen_interface->grpc_call_unref(call_.call());
889  return true;
890  }
891 
892  this->Op1::FinishOp(status);
893  this->Op2::FinishOp(status);
894  this->Op3::FinishOp(status);
895  this->Op4::FinishOp(status);
896  this->Op5::FinishOp(status);
897  this->Op6::FinishOp(status);
898  saved_status_ = *status;
899  if (RunInterceptorsPostRecv()) {
900  *tag = return_tag_;
901  g_core_codegen_interface->grpc_call_unref(call_.call());
902  return true;
903  }
904  // Interceptors are going to be run, so we can't return the tag just yet.
905  // After the interceptors are run, ContinueFinalizeResultAfterInterception
906  return false;
907  }
908 
909  void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
910 
911  void* core_cq_tag() override { return core_cq_tag_; }
912 
917  void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
918 
919  // This will be called while interceptors are run if the RPC is a hijacked
920  // RPC. This should set hijacking state for each of the ops.
921  void SetHijackingState() override {
922  this->Op1::SetHijackingState(&interceptor_methods_);
923  this->Op2::SetHijackingState(&interceptor_methods_);
924  this->Op3::SetHijackingState(&interceptor_methods_);
925  this->Op4::SetHijackingState(&interceptor_methods_);
926  this->Op5::SetHijackingState(&interceptor_methods_);
927  this->Op6::SetHijackingState(&interceptor_methods_);
928  }
929 
930  // Should be called after interceptors are done running
932  static const size_t MAX_OPS = 6;
933  grpc_op ops[MAX_OPS];
934  size_t nops = 0;
935  this->Op1::AddOp(ops, &nops);
936  this->Op2::AddOp(ops, &nops);
937  this->Op3::AddOp(ops, &nops);
938  this->Op4::AddOp(ops, &nops);
939  this->Op5::AddOp(ops, &nops);
940  this->Op6::AddOp(ops, &nops);
942  g_core_codegen_interface->grpc_call_start_batch(
943  call_.call(), ops, nops, core_cq_tag(), nullptr));
944  }
945 
946  // Should be called after interceptors are done running on the finalize result
947  // path
949  done_intercepting_ = true;
951  g_core_codegen_interface->grpc_call_start_batch(
952  call_.call(), nullptr, 0, core_cq_tag(), nullptr));
953  }
954 
955  private:
956  // Returns true if no interceptors need to be run
957  bool RunInterceptors() {
958  interceptor_methods_.ClearState();
959  interceptor_methods_.SetCallOpSetInterface(this);
960  interceptor_methods_.SetCall(&call_);
961  this->Op1::SetInterceptionHookPoint(&interceptor_methods_);
962  this->Op2::SetInterceptionHookPoint(&interceptor_methods_);
963  this->Op3::SetInterceptionHookPoint(&interceptor_methods_);
964  this->Op4::SetInterceptionHookPoint(&interceptor_methods_);
965  this->Op5::SetInterceptionHookPoint(&interceptor_methods_);
966  this->Op6::SetInterceptionHookPoint(&interceptor_methods_);
967  if (interceptor_methods_.InterceptorsListEmpty()) {
968  return true;
969  }
970  // This call will go through interceptors and would need to
971  // schedule new batches, so delay completion queue shutdown
972  call_.cq()->RegisterAvalanching();
973  return interceptor_methods_.RunInterceptors();
974  }
975  // Returns true if no interceptors need to be run
976  bool RunInterceptorsPostRecv() {
977  // Call and OpSet had already been set on the set state.
978  // SetReverse also clears previously set hook points
979  interceptor_methods_.SetReverse();
980  this->Op1::SetFinishInterceptionHookPoint(&interceptor_methods_);
981  this->Op2::SetFinishInterceptionHookPoint(&interceptor_methods_);
982  this->Op3::SetFinishInterceptionHookPoint(&interceptor_methods_);
983  this->Op4::SetFinishInterceptionHookPoint(&interceptor_methods_);
984  this->Op5::SetFinishInterceptionHookPoint(&interceptor_methods_);
985  this->Op6::SetFinishInterceptionHookPoint(&interceptor_methods_);
986  return interceptor_methods_.RunInterceptors();
987  }
988 
989  void* core_cq_tag_;
990  void* return_tag_;
991  Call call_;
992  bool done_intercepting_ = false;
993  InterceptorBatchMethodsImpl interceptor_methods_;
994  bool saved_status_;
995 };
996 
997 } // namespace internal
998 } // namespace grpc
999 
1000 #endif // GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
void ContinueFillOpsAfterInterception() override
Definition: call_op_set.h:931
everything went ok
Definition: grpc_types.h:382
grpc_op_type op
Operation type, as defined by grpc_op_type.
Definition: grpc_types.h:564
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:624
grpc_metadata_array * recv_initial_metadata
Definition: grpc_types.h:607
union grpc_op::grpc_op_data data
bool get_no_compression() const
Get value for the flag indicating whether compression for the next message write is forcefully disabl...
Definition: call_op_set.h:118
void * reserved
Reserved for future usage.
Definition: grpc_types.h:568
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:217
WriteOptions & clear_buffer_hint()
Clears flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:135
grpc_status_code
Definition: status.h:26
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:126
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:144
void FinishOp(bool *status)
Definition: call_op_set.h:668
void SetHijackingState() override
Definition: call_op_set.h:921
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:713
void ClientSendClose()
Definition: call_op_set.h:605
virtual void grpc_call_ref(grpc_call *call)=0
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:627
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:808
CallOpRecvMessage()
Definition: call_op_set.h:432
std::string string
Definition: config.h:35
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:278
bool get_buffer_hint() const
Get value for the flag indicating that the write may be buffered and need not go out on the wire imme...
Definition: call_op_set.h:144
WriteOptions & clear_no_compression()
Clears flag for the disabling of compression for the next message write.
Definition: call_op_set.h:109
struct grpc_byte_buffer ** recv_message
Definition: grpc_types.h:615
void FinishOp(bool *status)
Definition: call_op_set.h:722
struct grpc_byte_buffer * send_message
This op takes ownership of the slices in send_message.
Definition: grpc_types.h:590
Send a close from the client: one and only one instance MUST be sent from the client, unless the call was cancelled - in which case this can be skipped.
Definition: grpc_types.h:529
CallOpSet(const CallOpSet &other)
Definition: call_op_set.h:849
Send status from the server: one and only one instance MUST be sent from the server unless the call w...
Definition: grpc_types.h:534
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:320
void AllowNoMessage()
Definition: call_op_set.h:538
Definition: metadata_map.h:33
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:165
#define GRPC_WRITE_NO_COMPRESS
Force compression to be disabled for a particular write (start_write/add_metadata).
Definition: grpc_types.h:429
grpc_slice key
the key, value values are expected to line up with grpc_mdelem: if changing them, update metadata...
Definition: grpc_types.h:461
Status Deserialize(ByteBuffer *buf) override
Definition: call_op_set.h:513
CallOpRecvInitialMetadata()
Definition: call_op_set.h:705
void SetRecvMessage(void *message, bool *got_message)
Definition: interceptor_common.h:169
struct grpc_op::grpc_op_data::grpc_op_recv_message recv_message
void SetRecvStatus(Status *status)
Definition: interceptor_common.h:178
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:828
#define GRPC_SLICE_IS_EMPTY(slice)
Definition: slice.h:127
grpc_slice value
Definition: grpc_types.h:462
#define GRPC_WRITE_THROUGH
Force this message to be written to the socket before completing it.
Definition: grpc_types.h:431
const char ** error_string
If this is not nullptr, it will be populated with the full fidelity error string for debugging purpos...
Definition: grpc_types.h:629
grpc_slice * status_details
Definition: grpc_types.h:625
void Clear()
Clear all flags.
Definition: call_op_set.h:93
virtual grpc_slice grpc_empty_slice()=0
WriteOptions & set_write_through()
Guarantee that all bytes have been written to the socket before completing this write (usually writes...
Definition: call_op_set.h:179
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:572
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1]. ...
Definition: slice.h:80
bool send_
Definition: call_op_set.h:283
#define GRPC_WRITE_BUFFER_HINT
Write Flags:
Definition: grpc_types.h:426
void RecvInitialMetadata(ClientContext *context)
Definition: call_op_set.h:707
Send a message: 0 or more of these operations can occur for each call.
Definition: grpc_types.h:524
bool got_message
Definition: call_op_set.h:442
virtual void grpc_call_unref(grpc_call *call)=0
The first three in this list are for clients and servers.
bool is_write_through() const
Definition: call_op_set.h:184
WriteOptions()
Definition: call_op_set.h:88
WriteOptions & clear_last_message()
Clears flag indicating that this is the last message in a stream, disabling coalescing.
Definition: call_op_set.h:172
grpc_slice SliceReferencingString(const grpc::string &str)
Definition: slice.h:131
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata::grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level
void ClientRecvStatus(ClientContext *context, Status *status)
Definition: call_op_set.h:756
grpc_compression_level
Compression levels allow a party with knowledge of its peer&#39;s accepted encodings to request compressi...
Definition: compression_types.h:71
grpc_call * call() const
Definition: call.h:70
#define GRPC_SLICE_START_PTR(slice)
Definition: slice.h:116
void ContinueFinalizeResultAfterInterception() override
Definition: call_op_set.h:948
bool is_set
Definition: call_op_set.h:289
grpc_metadata_array * trailing_metadata
ownership of the array is with the caller, but ownership of the elements stays with the call object (...
Definition: grpc_types.h:623
WriteOptions & clear_corked()
Definition: call_op_set.h:153
WriteOptions & set_no_compression()
Sets flag for the disabling of compression for the next message write.
Definition: call_op_set.h:101
#define GRPC_SLICE_END_PTR(slice)
Definition: slice.h:125
CallOpClientSendClose()
Definition: call_op_set.h:603
void FinishOp(bool *status)
Definition: call_op_set.h:261
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:174
WriteOptions & operator=(const WriteOptions &rhs)
Definition: call_op_set.h:192
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:764
Definition: byte_buffer.h:55
::google::protobuf::util::Status Status
Definition: config_protobuf.h:93
uint32_t flags_
Definition: call_op_set.h:284
Default argument for CallOpSet.
Definition: call_op_set.h:213
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:578
void FinishOp(bool *status)
Definition: call_op_set.h:776
grpc_compression_level level
Definition: call_op_set.h:290
bool got_message
Definition: call_op_set.h:540
grpc::string error_message() const
Return the instance&#39;s error message.
Definition: status.h:112
Definition: call_op_set.h:503
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
The following three are for hijacked clients only.
StatusCode error_code() const
Return the instance&#39;s error code.
Definition: status.h:110
CallOpServerSendStatus()
Definition: call_op_set.h:638
bool FinalizeResult(void **tag, bool *status) override
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
Definition: call_op_set.h:879
Definition: call_op_set.h:636
void set_compression_level(grpc_compression_level level)
Definition: call_op_set.h:238
Definition: call_op_set.h:224
bool is_corked() const
Definition: call_op_set.h:158
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:543
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:739
Definition: call_op_set.h:703
Status SendMessage(const M &message, WriteOptions options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call_op_set.h:386
std::multimap< grpc::string, grpc::string > * metadata_map_
Definition: call_op_set.h:286
Status SendMessagePtr(const M *message, WriteOptions options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call_op_set.h:417
uint32_t flags() const
Returns raw flags bitset.
Definition: call_op_set.h:96
grpc_metadata * FillMetadataArray(const std::multimap< grpc::string, grpc::string > &metadata, size_t *metadata_count, const grpc::string &optional_error_details)
Definition: call_op_set.h:60
struct grpc_op::grpc_op_data::grpc_op_recv_initial_metadata recv_initial_metadata
A single metadata element.
Definition: grpc_types.h:458
Definition: call_op_set.h:294
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata send_initial_metadata
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:244
void FinishOp(bool *status)
Definition: call_op_set.h:552
Operation data: one field for each op type (except SEND_CLOSE_FROM_CLIENT which has no arguments) ...
Definition: grpc_types.h:562
virtual grpc_slice grpc_slice_from_static_buffer(const void *buffer, size_t length)=0
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:617
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
Receive initial metadata: one and only one MUST be made on the client, must not be made on the server...
Definition: grpc_types.h:539
void RecvMessage(R *message)
Definition: call_op_set.h:529
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:219
void FillOps(Call *call) override
Fills in grpc_op, starting from ops[*nops] and moving upwards.
Definition: call_op_set.h:865
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:358
struct grpc_op::grpc_op_data::grpc_op_send_message send_message
void SendInitialMetadata(std::multimap< grpc::string, grpc::string > *metadata, uint32_t flags)
Definition: call_op_set.h:230
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:51
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:651
void * core_cq_tag() override
Get the tag to be used at the core completion queue.
Definition: call_op_set.h:911
#define GRPC_MUST_USE_RESULT
Definition: port_platform.h:504
Send initial metadata: one and only one instance MUST be sent for each call, unless the call was canc...
Definition: grpc_types.h:520
WriteOptions(const WriteOptions &other)
Definition: call_op_set.h:89
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:608
CallOpClientRecvStatus()
Definition: call_op_set.h:753
Definition: interceptor_common.h:36
Definition: byte_buffer.h:41
void ServerSendStatus(std::multimap< grpc::string, grpc::string > *trailing_metadata, const Status &status)
Definition: call_op_set.h:640
void FinishOp(bool *status)
Definition: call_op_set.h:615
Per-message write options.
Definition: call_op_set.h:86
grpc_slice * status_details
optional: set to NULL if no details need sending, non-NULL if they do pointer will not be retained pa...
Definition: grpc_types.h:599
CallOpSet & operator=(const CallOpSet &other)
Definition: call_op_set.h:856
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:275
void AllowNoMessage()
Definition: call_op_set.h:440
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:684
size_t trailing_metadata_count
Definition: grpc_types.h:593
CallOpSendMessage()
Definition: call_op_set.h:296
grpc_metadata * metadata
Definition: grpc_types.h:576
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:349
void FinishOp(bool *status)
Definition: call_op_set.h:216
virtual grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *tag, void *reserved)=0
void FinishOp(bool *status)
Definition: call_op_set.h:454
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue.h:97
Definition: call_op_set.h:601
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:481
An abstract collection of call ops, used to generate the grpc_call_op structure to pass down to the l...
Definition: call_op_set_interface.h:34
virtual void grpc_slice_unref(grpc_slice slice)=0
void SetSendStatus(grpc_status_code *code, grpc::string *error_details, grpc::string *error_message)
Definition: interceptor_common.h:157
void SetSendTrailingMetadata(std::multimap< grpc::string, grpc::string > *metadata)
Definition: interceptor_common.h:164
virtual ~DeserializeFunc()
Definition: call_op_set.h:506
void SetRecvInitialMetadata(MetadataMap *map)
Definition: interceptor_common.h:174
void RecvMessage(R *message)
Definition: call_op_set.h:437
struct grpc_op::grpc_op_data::grpc_op_send_status_from_server send_status_from_server
Interface between the codegen library and the minimal subset of core features required by the generat...
Definition: core_codegen_interface.h:37
void set_output_tag(void *return_tag)
Definition: call_op_set.h:909
CallOpSendInitialMetadata()
Definition: call_op_set.h:226
void SetSendInitialMetadata(std::multimap< grpc::string, grpc::string > *metadata)
Definition: interceptor_common.h:152
WriteOptions & set_corked()
corked bit: aliases set_buffer_hint currently, with the intent that set_buffer_hint will be removed i...
Definition: call_op_set.h:148
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:221
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:215
Did it work? If it didn&#39;t, why?
Definition: status.h:31
void FinishOp(bool *status)
Definition: call_op_set.h:338
Receive status on the client: one and only one must be made on the client.
Definition: grpc_types.h:549
grpc_metadata * initial_metadata_
Definition: call_op_set.h:287
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:687
void AddInterceptionHookPoint(experimental::InterceptionHookPoints type)
Definition: interceptor_common.h:78
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:585
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:731
DeserializeFuncType(R *message)
Definition: call_op_set.h:512
size_t initial_metadata_count_
Definition: call_op_set.h:285
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:800
void SetRecvTrailingMetadata(MetadataMap *map)
Definition: interceptor_common.h:180
grpc_status_code status
Definition: grpc_types.h:595
grpc_status_code * status
Definition: grpc_types.h:624
size_t count
Definition: grpc_types.h:575
Definition: call_op_set.h:523
uint32_t flags
Write flags bitset for grpc_begin_messages.
Definition: grpc_types.h:566
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:190
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:726
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:794
Definition: call_op_set.h:751
virtual void gpr_free(void *p)=0
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:674
void SetSendMessage(ByteBuffer *buf, const void **msg, bool *fail_send_message, std::function< Status(const void *)> serializer)
Definition: interceptor_common.h:143
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:488
Receive a message: 0 or more of these operations can occur for each call.
Definition: grpc_types.h:543
A sequence of bytes.
Definition: byte_buffer.h:64
CallOpGenericRecvMessage()
Definition: call_op_set.h:525
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:445
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:475
CallOpSet()
Definition: call_op_set.h:845
const char kBinaryErrorDetailsKey[]
Definition: metadata_map.h:31
The following two are for all clients and servers.
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:267
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:917
grpc::string error_details() const
Return the (binary) error details.
Definition: status.h:115
Straightforward wrapping of the C call object.
Definition: call.h:36
grpc_metadata * trailing_metadata
Definition: grpc_types.h:594
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:372
struct grpc_op::grpc_op_data::grpc_op_recv_status_on_client recv_status_on_client
virtual void * gpr_malloc(size_t size)=0
~DeserializeFuncType() override
Definition: call_op_set.h:517