GRPC C++  1.23.0
call_op_set.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
20 #define GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
21 
22 #include <assert.h>
23 #include <array>
24 #include <cstring>
25 #include <functional>
26 #include <map>
27 #include <memory>
28 #include <vector>
29 
44 
45 #include <grpc/impl/codegen/atm.h>
48 
49 namespace grpc {
50 
51 extern CoreCodegenInterface* g_core_codegen_interface;
52 
53 namespace internal {
54 class Call;
55 class CallHook;
56 
57 // TODO(yangg) if the map is changed before we send, the pointers will be a
58 // mess. Make sure it does not happen.
60  const std::multimap<grpc::string, grpc::string>& metadata,
61  size_t* metadata_count, const grpc::string& optional_error_details) {
62  *metadata_count = metadata.size() + (optional_error_details.empty() ? 0 : 1);
63  if (*metadata_count == 0) {
64  return nullptr;
65  }
66  grpc_metadata* metadata_array =
67  (grpc_metadata*)(g_core_codegen_interface->gpr_malloc(
68  (*metadata_count) * sizeof(grpc_metadata)));
69  size_t i = 0;
70  for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
71  metadata_array[i].key = SliceReferencingString(iter->first);
72  metadata_array[i].value = SliceReferencingString(iter->second);
73  }
74  if (!optional_error_details.empty()) {
75  metadata_array[i].key =
76  g_core_codegen_interface->grpc_slice_from_static_buffer(
78  metadata_array[i].value = SliceReferencingString(optional_error_details);
79  }
80  return metadata_array;
81 }
82 } // namespace internal
83 
85 class WriteOptions {
86  public:
87  WriteOptions() : flags_(0), last_message_(false) {}
88  WriteOptions(const WriteOptions& other)
89  : flags_(other.flags_), last_message_(other.last_message_) {}
90 
92  inline void Clear() { flags_ = 0; }
93 
95  inline uint32_t flags() const { return flags_; }
96 
101  SetBit(GRPC_WRITE_NO_COMPRESS);
102  return *this;
103  }
104 
109  ClearBit(GRPC_WRITE_NO_COMPRESS);
110  return *this;
111  }
112 
117  inline bool get_no_compression() const {
118  return GetBit(GRPC_WRITE_NO_COMPRESS);
119  }
120 
126  SetBit(GRPC_WRITE_BUFFER_HINT);
127  return *this;
128  }
129 
135  ClearBit(GRPC_WRITE_BUFFER_HINT);
136  return *this;
137  }
138 
143  inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
144 
148  SetBit(GRPC_WRITE_BUFFER_HINT);
149  return *this;
150  }
151 
153  ClearBit(GRPC_WRITE_BUFFER_HINT);
154  return *this;
155  }
156 
157  inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
158 
165  last_message_ = true;
166  return *this;
167  }
168 
172  last_message_ = false;
173  return *this;
174  }
175 
179  SetBit(GRPC_WRITE_THROUGH);
180  return *this;
181  }
182 
183  inline bool is_write_through() const { return GetBit(GRPC_WRITE_THROUGH); }
184 
189  bool is_last_message() const { return last_message_; }
190 
191  private:
192  void SetBit(const uint32_t mask) { flags_ |= mask; }
193 
194  void ClearBit(const uint32_t mask) { flags_ &= ~mask; }
195 
196  bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
197 
198  uint32_t flags_;
199  bool last_message_;
200 };
201 
202 namespace internal {
203 
206 template <int I>
207 class CallNoOp {
208  protected:
209  void AddOp(grpc_op* ops, size_t* nops) {}
210  void FinishOp(bool* status) {}
212  InterceptorBatchMethodsImpl* interceptor_methods) {}
214  InterceptorBatchMethodsImpl* interceptor_methods) {}
215  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {}
216 };
217 
219  public:
220  CallOpSendInitialMetadata() : send_(false) {
221  maybe_compression_level_.is_set = false;
222  }
223 
224  void SendInitialMetadata(std::multimap<grpc::string, grpc::string>* metadata,
225  uint32_t flags) {
226  maybe_compression_level_.is_set = false;
227  send_ = true;
228  flags_ = flags;
229  metadata_map_ = metadata;
230  }
231 
233  maybe_compression_level_.is_set = true;
234  maybe_compression_level_.level = level;
235  }
236 
237  protected:
238  void AddOp(grpc_op* ops, size_t* nops) {
239  if (!send_ || hijacked_) return;
240  grpc_op* op = &ops[(*nops)++];
242  op->flags = flags_;
243  op->reserved = NULL;
244  initial_metadata_ =
245  FillMetadataArray(*metadata_map_, &initial_metadata_count_, "");
246  op->data.send_initial_metadata.count = initial_metadata_count_;
247  op->data.send_initial_metadata.metadata = initial_metadata_;
249  maybe_compression_level_.is_set;
250  if (maybe_compression_level_.is_set) {
252  maybe_compression_level_.level;
253  }
254  }
255  void FinishOp(bool* status) {
256  if (!send_ || hijacked_) return;
257  g_core_codegen_interface->gpr_free(initial_metadata_);
258  send_ = false;
259  }
260 
262  InterceptorBatchMethodsImpl* interceptor_methods) {
263  if (!send_) return;
264  interceptor_methods->AddInterceptionHookPoint(
266  interceptor_methods->SetSendInitialMetadata(metadata_map_);
267  }
268 
270  InterceptorBatchMethodsImpl* interceptor_methods) {}
271 
272  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
273  hijacked_ = true;
274  }
275 
276  bool hijacked_ = false;
277  bool send_;
278  uint32_t flags_;
280  std::multimap<grpc::string, grpc::string>* metadata_map_;
282  struct {
283  bool is_set;
285  } maybe_compression_level_;
286 };
287 
289  public:
290  CallOpSendMessage() : send_buf_() {}
291 
294  template <class M>
295  Status SendMessage(const M& message,
297 
298  template <class M>
299  Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
300 
304  template <class M>
305  Status SendMessagePtr(const M* message,
307 
310  template <class M>
311  Status SendMessagePtr(const M* message) GRPC_MUST_USE_RESULT;
312 
313  protected:
314  void AddOp(grpc_op* ops, size_t* nops) {
315  if (msg_ == nullptr && !send_buf_.Valid()) return;
316  if (hijacked_) {
317  serializer_ = nullptr;
318  return;
319  }
320  if (msg_ != nullptr) {
321  GPR_CODEGEN_ASSERT(serializer_(msg_).ok());
322  }
323  serializer_ = nullptr;
324  grpc_op* op = &ops[(*nops)++];
325  op->op = GRPC_OP_SEND_MESSAGE;
326  op->flags = write_options_.flags();
327  op->reserved = NULL;
328  op->data.send_message.send_message = send_buf_.c_buffer();
329  // Flags are per-message: clear them after use.
330  write_options_.Clear();
331  }
332  void FinishOp(bool* status) {
333  if (msg_ == nullptr && !send_buf_.Valid()) return;
334  if (hijacked_ && failed_send_) {
335  // Hijacking interceptor failed this Op
336  *status = false;
337  } else if (!*status) {
338  // This Op was passed down to core and the Op failed
339  failed_send_ = true;
340  }
341  }
342 
344  InterceptorBatchMethodsImpl* interceptor_methods) {
345  if (msg_ == nullptr && !send_buf_.Valid()) return;
346  interceptor_methods->AddInterceptionHookPoint(
348  interceptor_methods->SetSendMessage(&send_buf_, &msg_, &failed_send_,
349  serializer_);
350  }
351 
353  InterceptorBatchMethodsImpl* interceptor_methods) {
354  if (msg_ != nullptr || send_buf_.Valid()) {
355  interceptor_methods->AddInterceptionHookPoint(
357  }
358  send_buf_.Clear();
359  msg_ = nullptr;
360  // The contents of the SendMessage value that was previously set
361  // has had its references stolen by core's operations
362  interceptor_methods->SetSendMessage(nullptr, nullptr, &failed_send_,
363  nullptr);
364  }
365 
366  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
367  hijacked_ = true;
368  }
369 
370  private:
371  const void* msg_ = nullptr; // The original non-serialized message
372  bool hijacked_ = false;
373  bool failed_send_ = false;
374  ByteBuffer send_buf_;
375  WriteOptions write_options_;
376  std::function<Status(const void*)> serializer_;
377 };
378 
379 template <class M>
381  write_options_ = options;
382  serializer_ = [this](const void* message) {
383  bool own_buf;
384  send_buf_.Clear();
385  // TODO(vjpai): Remove the void below when possible
386  // The void in the template parameter below should not be needed
387  // (since it should be implicit) but is needed due to an observed
388  // difference in behavior between clang and gcc for certain internal users
390  *static_cast<const M*>(message), send_buf_.bbuf_ptr(), &own_buf);
391  if (!own_buf) {
392  send_buf_.Duplicate();
393  }
394  return result;
395  };
396  // Serialize immediately only if we do not have access to the message pointer
397  if (msg_ == nullptr) {
398  Status result = serializer_(&message);
399  serializer_ = nullptr;
400  return result;
401  }
402  return Status();
403 }
404 
405 template <class M>
407  return SendMessage(message, WriteOptions());
408 }
409 
410 template <class M>
412  WriteOptions options) {
413  msg_ = message;
414  return SendMessage(*message, options);
415 }
416 
417 template <class M>
419  msg_ = message;
420  return SendMessage(*message, WriteOptions());
421 }
422 
423 template <class R>
424 class CallOpRecvMessage {
425  public:
427  : got_message(false),
428  message_(nullptr),
429  allow_not_getting_message_(false) {}
430 
431  void RecvMessage(R* message) { message_ = message; }
432 
433  // Do not change status if no message is received.
434  void AllowNoMessage() { allow_not_getting_message_ = true; }
435 
437 
438  protected:
439  void AddOp(grpc_op* ops, size_t* nops) {
440  if (message_ == nullptr || hijacked_) return;
441  grpc_op* op = &ops[(*nops)++];
442  op->op = GRPC_OP_RECV_MESSAGE;
443  op->flags = 0;
444  op->reserved = NULL;
445  op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
446  }
447 
448  void FinishOp(bool* status) {
449  if (message_ == nullptr || hijacked_) return;
450  if (recv_buf_.Valid()) {
451  if (*status) {
452  got_message = *status =
453  SerializationTraits<R>::Deserialize(recv_buf_.bbuf_ptr(), message_)
454  .ok();
455  recv_buf_.Release();
456  } else {
457  got_message = false;
458  recv_buf_.Clear();
459  }
460  } else {
461  got_message = false;
462  if (!allow_not_getting_message_) {
463  *status = false;
464  }
465  }
466  }
467 
469  InterceptorBatchMethodsImpl* interceptor_methods) {
470  if (message_ == nullptr) return;
471  interceptor_methods->SetRecvMessage(message_, &got_message);
472  }
473 
475  InterceptorBatchMethodsImpl* interceptor_methods) {
476  if (message_ == nullptr) return;
477  interceptor_methods->AddInterceptionHookPoint(
479  if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
480  }
481  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
482  hijacked_ = true;
483  if (message_ == nullptr) return;
484  interceptor_methods->AddInterceptionHookPoint(
486  got_message = true;
487  }
488 
489  private:
490  R* message_;
491  ByteBuffer recv_buf_;
492  bool allow_not_getting_message_;
493  bool hijacked_ = false;
494 };
495 
497  public:
498  virtual Status Deserialize(ByteBuffer* buf) = 0;
499  virtual ~DeserializeFunc() {}
500 };
501 
502 template <class R>
503 class DeserializeFuncType final : public DeserializeFunc {
504  public:
505  DeserializeFuncType(R* message) : message_(message) {}
506  Status Deserialize(ByteBuffer* buf) override {
507  return SerializationTraits<R>::Deserialize(buf->bbuf_ptr(), message_);
508  }
509 
510  ~DeserializeFuncType() override {}
511 
512  private:
513  R* message_; // Not a managed pointer because management is external to this
514 };
515 
517  public:
519  : got_message(false), allow_not_getting_message_(false) {}
520 
521  template <class R>
522  void RecvMessage(R* message) {
523  // Use an explicit base class pointer to avoid resolution error in the
524  // following unique_ptr::reset for some old implementations.
525  DeserializeFunc* func = new DeserializeFuncType<R>(message);
526  deserialize_.reset(func);
527  message_ = message;
528  }
529 
530  // Do not change status if no message is received.
531  void AllowNoMessage() { allow_not_getting_message_ = true; }
532 
534 
535  protected:
536  void AddOp(grpc_op* ops, size_t* nops) {
537  if (!deserialize_ || hijacked_) return;
538  grpc_op* op = &ops[(*nops)++];
539  op->op = GRPC_OP_RECV_MESSAGE;
540  op->flags = 0;
541  op->reserved = NULL;
542  op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
543  }
544 
545  void FinishOp(bool* status) {
546  if (!deserialize_ || hijacked_) return;
547  if (recv_buf_.Valid()) {
548  if (*status) {
549  got_message = true;
550  *status = deserialize_->Deserialize(&recv_buf_).ok();
551  recv_buf_.Release();
552  } else {
553  got_message = false;
554  recv_buf_.Clear();
555  }
556  } else {
557  got_message = false;
558  if (!allow_not_getting_message_) {
559  *status = false;
560  }
561  }
562  }
563 
565  InterceptorBatchMethodsImpl* interceptor_methods) {
566  if (!deserialize_) return;
567  interceptor_methods->SetRecvMessage(message_, &got_message);
568  }
569 
571  InterceptorBatchMethodsImpl* interceptor_methods) {
572  if (!deserialize_) return;
573  interceptor_methods->AddInterceptionHookPoint(
575  if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
576  deserialize_.reset();
577  }
578  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
579  hijacked_ = true;
580  if (!deserialize_) return;
581  interceptor_methods->AddInterceptionHookPoint(
583  got_message = true;
584  }
585 
586  private:
587  void* message_;
588  bool hijacked_ = false;
589  std::unique_ptr<DeserializeFunc> deserialize_;
590  ByteBuffer recv_buf_;
591  bool allow_not_getting_message_;
592 };
593 
595  public:
596  CallOpClientSendClose() : send_(false) {}
597 
598  void ClientSendClose() { send_ = true; }
599 
600  protected:
601  void AddOp(grpc_op* ops, size_t* nops) {
602  if (!send_ || hijacked_) return;
603  grpc_op* op = &ops[(*nops)++];
605  op->flags = 0;
606  op->reserved = NULL;
607  }
608  void FinishOp(bool* status) { send_ = false; }
609 
611  InterceptorBatchMethodsImpl* interceptor_methods) {
612  if (!send_) return;
613  interceptor_methods->AddInterceptionHookPoint(
615  }
616 
618  InterceptorBatchMethodsImpl* interceptor_methods) {}
619 
620  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
621  hijacked_ = true;
622  }
623 
624  private:
625  bool hijacked_ = false;
626  bool send_;
627 };
628 
630  public:
631  CallOpServerSendStatus() : send_status_available_(false) {}
632 
634  std::multimap<grpc::string, grpc::string>* trailing_metadata,
635  const Status& status) {
636  send_error_details_ = status.error_details();
637  metadata_map_ = trailing_metadata;
638  send_status_available_ = true;
639  send_status_code_ = static_cast<grpc_status_code>(status.error_code());
640  send_error_message_ = status.error_message();
641  }
642 
643  protected:
644  void AddOp(grpc_op* ops, size_t* nops) {
645  if (!send_status_available_ || hijacked_) return;
646  trailing_metadata_ = FillMetadataArray(
647  *metadata_map_, &trailing_metadata_count_, send_error_details_);
648  grpc_op* op = &ops[(*nops)++];
651  trailing_metadata_count_;
652  op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
653  op->data.send_status_from_server.status = send_status_code_;
654  error_message_slice_ = SliceReferencingString(send_error_message_);
656  send_error_message_.empty() ? nullptr : &error_message_slice_;
657  op->flags = 0;
658  op->reserved = NULL;
659  }
660 
661  void FinishOp(bool* status) {
662  if (!send_status_available_ || hijacked_) return;
663  g_core_codegen_interface->gpr_free(trailing_metadata_);
664  send_status_available_ = false;
665  }
666 
668  InterceptorBatchMethodsImpl* interceptor_methods) {
669  if (!send_status_available_) return;
670  interceptor_methods->AddInterceptionHookPoint(
672  interceptor_methods->SetSendTrailingMetadata(metadata_map_);
673  interceptor_methods->SetSendStatus(&send_status_code_, &send_error_details_,
674  &send_error_message_);
675  }
676 
678  InterceptorBatchMethodsImpl* interceptor_methods) {}
679 
680  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
681  hijacked_ = true;
682  }
683 
684  private:
685  bool hijacked_ = false;
686  bool send_status_available_;
687  grpc_status_code send_status_code_;
688  grpc::string send_error_details_;
689  grpc::string send_error_message_;
690  size_t trailing_metadata_count_;
691  std::multimap<grpc::string, grpc::string>* metadata_map_;
692  grpc_metadata* trailing_metadata_;
693  grpc_slice error_message_slice_;
694 };
695 
697  public:
698  CallOpRecvInitialMetadata() : metadata_map_(nullptr) {}
699 
701  context->initial_metadata_received_ = true;
702  metadata_map_ = &context->recv_initial_metadata_;
703  }
704 
705  protected:
706  void AddOp(grpc_op* ops, size_t* nops) {
707  if (metadata_map_ == nullptr || hijacked_) return;
708  grpc_op* op = &ops[(*nops)++];
710  op->data.recv_initial_metadata.recv_initial_metadata = metadata_map_->arr();
711  op->flags = 0;
712  op->reserved = NULL;
713  }
714 
715  void FinishOp(bool* status) {
716  if (metadata_map_ == nullptr || hijacked_) return;
717  }
718 
720  InterceptorBatchMethodsImpl* interceptor_methods) {
721  interceptor_methods->SetRecvInitialMetadata(metadata_map_);
722  }
723 
725  InterceptorBatchMethodsImpl* interceptor_methods) {
726  if (metadata_map_ == nullptr) return;
727  interceptor_methods->AddInterceptionHookPoint(
729  metadata_map_ = nullptr;
730  }
731 
732  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
733  hijacked_ = true;
734  if (metadata_map_ == nullptr) return;
735  interceptor_methods->AddInterceptionHookPoint(
737  }
738 
739  private:
740  bool hijacked_ = false;
741  MetadataMap* metadata_map_;
742 };
743 
745  public:
747  : recv_status_(nullptr), debug_error_string_(nullptr) {}
748 
750  client_context_ = context;
751  metadata_map_ = &client_context_->trailing_metadata_;
752  recv_status_ = status;
753  error_message_ = g_core_codegen_interface->grpc_empty_slice();
754  }
755 
756  protected:
757  void AddOp(grpc_op* ops, size_t* nops) {
758  if (recv_status_ == nullptr || hijacked_) return;
759  grpc_op* op = &ops[(*nops)++];
761  op->data.recv_status_on_client.trailing_metadata = metadata_map_->arr();
762  op->data.recv_status_on_client.status = &status_code_;
763  op->data.recv_status_on_client.status_details = &error_message_;
764  op->data.recv_status_on_client.error_string = &debug_error_string_;
765  op->flags = 0;
766  op->reserved = NULL;
767  }
768 
769  void FinishOp(bool* status) {
770  if (recv_status_ == nullptr || hijacked_) return;
771  grpc::string binary_error_details = metadata_map_->GetBinaryErrorDetails();
772  *recv_status_ =
773  Status(static_cast<StatusCode>(status_code_),
774  GRPC_SLICE_IS_EMPTY(error_message_)
775  ? grpc::string()
776  : grpc::string(GRPC_SLICE_START_PTR(error_message_),
777  GRPC_SLICE_END_PTR(error_message_)),
778  binary_error_details);
779  client_context_->set_debug_error_string(
780  debug_error_string_ != nullptr ? debug_error_string_ : "");
781  g_core_codegen_interface->grpc_slice_unref(error_message_);
782  if (debug_error_string_ != nullptr) {
783  g_core_codegen_interface->gpr_free((void*)debug_error_string_);
784  }
785  }
786 
788  InterceptorBatchMethodsImpl* interceptor_methods) {
789  interceptor_methods->SetRecvStatus(recv_status_);
790  interceptor_methods->SetRecvTrailingMetadata(metadata_map_);
791  }
792 
794  InterceptorBatchMethodsImpl* interceptor_methods) {
795  if (recv_status_ == nullptr) return;
796  interceptor_methods->AddInterceptionHookPoint(
798  recv_status_ = nullptr;
799  }
800 
801  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
802  hijacked_ = true;
803  if (recv_status_ == nullptr) return;
804  interceptor_methods->AddInterceptionHookPoint(
806  }
807 
808  private:
809  bool hijacked_ = false;
810  ::grpc_impl::ClientContext* client_context_;
811  MetadataMap* metadata_map_;
812  Status* recv_status_;
813  const char* debug_error_string_;
814  grpc_status_code status_code_;
815  grpc_slice error_message_;
816 };
817 
818 template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
819  class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
820  class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
821 class CallOpSet;
822 
829 template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
830 class CallOpSet : public CallOpSetInterface,
831  public Op1,
832  public Op2,
833  public Op3,
834  public Op4,
835  public Op5,
836  public Op6 {
837  public:
838  CallOpSet() : core_cq_tag_(this), return_tag_(this) {}
839  // The copy constructor and assignment operator reset the value of
840  // core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_
841  // since those are only meaningful on a specific object, not across objects.
842  CallOpSet(const CallOpSet& other)
843  : core_cq_tag_(this),
844  return_tag_(this),
845  call_(other.call_),
846  done_intercepting_(false),
847  interceptor_methods_(InterceptorBatchMethodsImpl()) {}
848 
849  CallOpSet& operator=(const CallOpSet& other) {
850  core_cq_tag_ = this;
851  return_tag_ = this;
852  call_ = other.call_;
853  done_intercepting_ = false;
854  interceptor_methods_ = InterceptorBatchMethodsImpl();
855  return *this;
856  }
857 
858  void FillOps(Call* call) override {
859  done_intercepting_ = false;
860  g_core_codegen_interface->grpc_call_ref(call->call());
861  call_ =
862  *call; // It's fine to create a copy of call since it's just pointers
863 
864  if (RunInterceptors()) {
865  ContinueFillOpsAfterInterception();
866  } else {
867  // After the interceptors are run, ContinueFillOpsAfterInterception will
868  // be run
869  }
870  }
871 
872  bool FinalizeResult(void** tag, bool* status) override {
873  if (done_intercepting_) {
874  // Complete the avalanching since we are done with this batch of ops
875  call_.cq()->CompleteAvalanching();
876  // We have already finished intercepting and filling in the results. This
877  // round trip from the core needed to be made because interceptors were
878  // run
879  *tag = return_tag_;
880  *status = saved_status_;
881  g_core_codegen_interface->grpc_call_unref(call_.call());
882  return true;
883  }
884 
885  this->Op1::FinishOp(status);
886  this->Op2::FinishOp(status);
887  this->Op3::FinishOp(status);
888  this->Op4::FinishOp(status);
889  this->Op5::FinishOp(status);
890  this->Op6::FinishOp(status);
891  saved_status_ = *status;
892  if (RunInterceptorsPostRecv()) {
893  *tag = return_tag_;
894  g_core_codegen_interface->grpc_call_unref(call_.call());
895  return true;
896  }
897  // Interceptors are going to be run, so we can't return the tag just yet.
898  // After the interceptors are run, ContinueFinalizeResultAfterInterception
899  return false;
900  }
901 
902  void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
903 
904  void* core_cq_tag() override { return core_cq_tag_; }
905 
910  void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
911 
912  // This will be called while interceptors are run if the RPC is a hijacked
913  // RPC. This should set hijacking state for each of the ops.
914  void SetHijackingState() override {
915  this->Op1::SetHijackingState(&interceptor_methods_);
916  this->Op2::SetHijackingState(&interceptor_methods_);
917  this->Op3::SetHijackingState(&interceptor_methods_);
918  this->Op4::SetHijackingState(&interceptor_methods_);
919  this->Op5::SetHijackingState(&interceptor_methods_);
920  this->Op6::SetHijackingState(&interceptor_methods_);
921  }
922 
923  // Should be called after interceptors are done running
925  static const size_t MAX_OPS = 6;
926  grpc_op ops[MAX_OPS];
927  size_t nops = 0;
928  this->Op1::AddOp(ops, &nops);
929  this->Op2::AddOp(ops, &nops);
930  this->Op3::AddOp(ops, &nops);
931  this->Op4::AddOp(ops, &nops);
932  this->Op5::AddOp(ops, &nops);
933  this->Op6::AddOp(ops, &nops);
935  g_core_codegen_interface->grpc_call_start_batch(
936  call_.call(), ops, nops, core_cq_tag(), nullptr));
937  }
938 
939  // Should be called after interceptors are done running on the finalize result
940  // path
942  done_intercepting_ = true;
944  g_core_codegen_interface->grpc_call_start_batch(
945  call_.call(), nullptr, 0, core_cq_tag(), nullptr));
946  }
947 
948  private:
949  // Returns true if no interceptors need to be run
950  bool RunInterceptors() {
951  interceptor_methods_.ClearState();
952  interceptor_methods_.SetCallOpSetInterface(this);
953  interceptor_methods_.SetCall(&call_);
954  this->Op1::SetInterceptionHookPoint(&interceptor_methods_);
955  this->Op2::SetInterceptionHookPoint(&interceptor_methods_);
956  this->Op3::SetInterceptionHookPoint(&interceptor_methods_);
957  this->Op4::SetInterceptionHookPoint(&interceptor_methods_);
958  this->Op5::SetInterceptionHookPoint(&interceptor_methods_);
959  this->Op6::SetInterceptionHookPoint(&interceptor_methods_);
960  if (interceptor_methods_.InterceptorsListEmpty()) {
961  return true;
962  }
963  // This call will go through interceptors and would need to
964  // schedule new batches, so delay completion queue shutdown
965  call_.cq()->RegisterAvalanching();
966  return interceptor_methods_.RunInterceptors();
967  }
968  // Returns true if no interceptors need to be run
969  bool RunInterceptorsPostRecv() {
970  // Call and OpSet had already been set on the set state.
971  // SetReverse also clears previously set hook points
972  interceptor_methods_.SetReverse();
973  this->Op1::SetFinishInterceptionHookPoint(&interceptor_methods_);
974  this->Op2::SetFinishInterceptionHookPoint(&interceptor_methods_);
975  this->Op3::SetFinishInterceptionHookPoint(&interceptor_methods_);
976  this->Op4::SetFinishInterceptionHookPoint(&interceptor_methods_);
977  this->Op5::SetFinishInterceptionHookPoint(&interceptor_methods_);
978  this->Op6::SetFinishInterceptionHookPoint(&interceptor_methods_);
979  return interceptor_methods_.RunInterceptors();
980  }
981 
982  void* core_cq_tag_;
983  void* return_tag_;
984  Call call_;
985  bool done_intercepting_ = false;
986  InterceptorBatchMethodsImpl interceptor_methods_;
987  bool saved_status_;
988 };
989 
990 } // namespace internal
991 } // namespace grpc
992 
993 #endif // GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
void ContinueFillOpsAfterInterception() override
Definition: call_op_set.h:924
everything went ok
Definition: grpc_types.h:393
grpc_op_type op
Operation type, as defined by grpc_op_type.
Definition: grpc_types.h:576
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:617
grpc_metadata_array * recv_initial_metadata
Definition: grpc_types.h:619
union grpc_op::grpc_op_data data
bool get_no_compression() const
Get value for the flag indicating whether compression for the next message write is forcefully disabl...
Definition: call_op_set.h:117
void * reserved
Reserved for future usage.
Definition: grpc_types.h:580
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:211
WriteOptions & clear_buffer_hint()
Clears flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:134
void ClientRecvStatus(::grpc_impl::ClientContext *context, Status *status)
Definition: call_op_set.h:749
grpc_status_code
Definition: status.h:26
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:145
void FinishOp(bool *status)
Definition: call_op_set.h:661
void SetHijackingState() override
Definition: call_op_set.h:914
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:706
void ClientSendClose()
Definition: call_op_set.h:598
void RecvInitialMetadata(::grpc_impl::ClientContext *context)
Definition: call_op_set.h:700
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:620
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:801
CallOpRecvMessage()
Definition: call_op_set.h:426
std::string string
Definition: config.h:35
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:272
bool get_buffer_hint() const
Get value for the flag indicating that the write may be buffered and need not go out on the wire imme...
Definition: call_op_set.h:143
WriteOptions & clear_no_compression()
Clears flag for the disabling of compression for the next message write.
Definition: call_op_set.h:108
struct grpc_byte_buffer ** recv_message
Definition: grpc_types.h:627
void FinishOp(bool *status)
Definition: call_op_set.h:715
struct grpc_byte_buffer * send_message
This op takes ownership of the slices in send_message.
Definition: grpc_types.h:602
Send a close from the client: one and only one instance MUST be sent from the client, unless the call was cancelled - in which case this can be skipped.
Definition: grpc_types.h:541
CallOpSet(const CallOpSet &other)
Definition: call_op_set.h:842
Send status from the server: one and only one instance MUST be sent from the server unless the call w...
Definition: grpc_types.h:546
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:314
void AllowNoMessage()
Definition: call_op_set.h:531
Definition: metadata_map.h:33
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
#define GRPC_WRITE_NO_COMPRESS
Force compression to be disabled for a particular write (start_write/add_metadata).
Definition: grpc_types.h:440
grpc_slice key
the key, value values are expected to line up with grpc_mdelem: if changing them, update metadata...
Definition: grpc_types.h:472
Status Deserialize(ByteBuffer *buf) override
Definition: call_op_set.h:506
CallOpRecvInitialMetadata()
Definition: call_op_set.h:698
void SetRecvMessage(void *message, bool *got_message)
Definition: interceptor_common.h:169
struct grpc_op::grpc_op_data::grpc_op_recv_message recv_message
void SetRecvStatus(Status *status)
Definition: interceptor_common.h:178
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:821
#define GRPC_SLICE_IS_EMPTY(slice)
Definition: slice.h:107
grpc_slice value
Definition: grpc_types.h:473
#define GRPC_WRITE_THROUGH
Force this message to be written to the socket before completing it.
Definition: grpc_types.h:442
const char ** error_string
If this is not nullptr, it will be populated with the full fidelity error string for debugging purpos...
Definition: grpc_types.h:641
grpc_slice * status_details
Definition: grpc_types.h:637
void Clear()
Clear all flags.
Definition: call_op_set.h:92
WriteOptions & set_write_through()
Guarantee that all bytes have been written to the socket before completing this write (usually writes...
Definition: call_op_set.h:178
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:564
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1]. ...
Definition: slice.h:60
bool send_
Definition: call_op_set.h:277
#define GRPC_WRITE_BUFFER_HINT
Write Flags:
Definition: grpc_types.h:437
Send a message: 0 or more of these operations can occur for each call.
Definition: grpc_types.h:536
bool got_message
Definition: call_op_set.h:436
The first three in this list are for clients and servers.
bool is_write_through() const
Definition: call_op_set.h:183
WriteOptions()
Definition: call_op_set.h:87
WriteOptions & clear_last_message()
Clears flag indicating that this is the last message in a stream, disabling coalescing.
Definition: call_op_set.h:171
grpc_slice SliceReferencingString(const grpc::string &str)
Definition: slice.h:131
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata::grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level
grpc_compression_level
Compression levels allow a party with knowledge of its peer&#39;s accepted encodings to request compressi...
Definition: compression_types.h:71
grpc_call * call() const
Definition: call.h:72
#define GRPC_SLICE_START_PTR(slice)
Definition: slice.h:96
void ContinueFinalizeResultAfterInterception() override
Definition: call_op_set.h:941
bool is_set
Definition: call_op_set.h:283
grpc_metadata_array * trailing_metadata
ownership of the array is with the caller, but ownership of the elements stays with the call object (...
Definition: grpc_types.h:635
WriteOptions & clear_corked()
Definition: call_op_set.h:152
WriteOptions & set_no_compression()
Sets flag for the disabling of compression for the next message write.
Definition: call_op_set.h:100
#define GRPC_SLICE_END_PTR(slice)
Definition: slice.h:105
CallOpClientSendClose()
Definition: call_op_set.h:596
void FinishOp(bool *status)
Definition: call_op_set.h:255
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:757
Definition: byte_buffer.h:63
::google::protobuf::util::Status Status
Definition: config_protobuf.h:96
uint32_t flags_
Definition: call_op_set.h:278
Default argument for CallOpSet.
Definition: call_op_set.h:207
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:570
void FinishOp(bool *status)
Definition: call_op_set.h:769
grpc_compression_level level
Definition: call_op_set.h:284
bool got_message
Definition: call_op_set.h:533
grpc::string error_message() const
Return the instance&#39;s error message.
Definition: status.h:112
Definition: call_op_set.h:496
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
The following three are for hijacked clients only.
StatusCode error_code() const
Return the instance&#39;s error code.
Definition: status.h:110
CallOpServerSendStatus()
Definition: call_op_set.h:631
bool FinalizeResult(void **tag, bool *status) override
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
Definition: call_op_set.h:872
Definition: call_op_set.h:629
void set_compression_level(grpc_compression_level level)
Definition: call_op_set.h:232
Definition: call_op_set.h:218
bool is_corked() const
Definition: call_op_set.h:157
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:536
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:732
Definition: call_op_set.h:696
Status SendMessage(const M &message, WriteOptions options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call_op_set.h:380
std::multimap< grpc::string, grpc::string > * metadata_map_
Definition: call_op_set.h:280
Status SendMessagePtr(const M *message, WriteOptions options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call_op_set.h:411
uint32_t flags() const
Returns raw flags bitset.
Definition: call_op_set.h:95
grpc_metadata * FillMetadataArray(const std::multimap< grpc::string, grpc::string > &metadata, size_t *metadata_count, const grpc::string &optional_error_details)
Definition: call_op_set.h:59
struct grpc_op::grpc_op_data::grpc_op_recv_initial_metadata recv_initial_metadata
A single metadata element.
Definition: grpc_types.h:469
Definition: call_op_set.h:288
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata send_initial_metadata
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:238
void FinishOp(bool *status)
Definition: call_op_set.h:545
Operation data: one field for each op type (except SEND_CLOSE_FROM_CLIENT which has no arguments) ...
Definition: grpc_types.h:574
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:610
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
Receive initial metadata: one and only one MUST be made on the client, must not be made on the server...
Definition: grpc_types.h:551
void RecvMessage(R *message)
Definition: call_op_set.h:522
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:213
void FillOps(Call *call) override
Fills in grpc_op, starting from ops[*nops] and moving upwards.
Definition: call_op_set.h:858
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:352
struct grpc_op::grpc_op_data::grpc_op_send_message send_message
void SendInitialMetadata(std::multimap< grpc::string, grpc::string > *metadata, uint32_t flags)
Definition: call_op_set.h:224
CoreCodegenInterface * g_core_codegen_interface
Definition: completion_queue_impl.h:91
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:644
void * core_cq_tag() override
Get the tag to be used at the core completion queue.
Definition: call_op_set.h:904
#define GRPC_MUST_USE_RESULT
Definition: port_platform.h:526
Send initial metadata: one and only one instance MUST be sent for each call, unless the call was canc...
Definition: grpc_types.h:532
WriteOptions(const WriteOptions &other)
Definition: call_op_set.h:88
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:601
CallOpClientRecvStatus()
Definition: call_op_set.h:746
Definition: interceptor_common.h:36
Definition: byte_buffer.h:52
void ServerSendStatus(std::multimap< grpc::string, grpc::string > *trailing_metadata, const Status &status)
Definition: call_op_set.h:633
void FinishOp(bool *status)
Definition: call_op_set.h:608
Per-message write options.
Definition: call_op_set.h:85
grpc_slice * status_details
optional: set to NULL if no details need sending, non-NULL if they do pointer will not be retained pa...
Definition: grpc_types.h:611
CallOpSet & operator=(const CallOpSet &other)
Definition: call_op_set.h:849
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:269
void AllowNoMessage()
Definition: call_op_set.h:434
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:677
size_t trailing_metadata_count
Definition: grpc_types.h:605
CallOpSendMessage()
Definition: call_op_set.h:290
grpc_metadata * metadata
Definition: grpc_types.h:588
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:343
void FinishOp(bool *status)
Definition: call_op_set.h:210
void FinishOp(bool *status)
Definition: call_op_set.h:448
Definition: call_op_set.h:594
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:474
An abstract collection of call ops, used to generate the grpc_call_op structure to pass down to the l...
Definition: call_op_set_interface.h:34
void SetSendStatus(grpc_status_code *code, grpc::string *error_details, grpc::string *error_message)
Definition: interceptor_common.h:157
void SetSendTrailingMetadata(std::multimap< grpc::string, grpc::string > *metadata)
Definition: interceptor_common.h:164
virtual ~DeserializeFunc()
Definition: call_op_set.h:499
void SetRecvInitialMetadata(MetadataMap *map)
Definition: interceptor_common.h:174
void RecvMessage(R *message)
Definition: call_op_set.h:431
struct grpc_op::grpc_op_data::grpc_op_send_status_from_server send_status_from_server
void set_output_tag(void *return_tag)
Definition: call_op_set.h:902
CallOpSendInitialMetadata()
Definition: call_op_set.h:220
void SetSendInitialMetadata(std::multimap< grpc::string, grpc::string > *metadata)
Definition: interceptor_common.h:152
WriteOptions & set_corked()
corked bit: aliases set_buffer_hint currently, with the intent that set_buffer_hint will be removed i...
Definition: call_op_set.h:147
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:215
A ClientContext allows the person implementing a service client to:
Definition: client_context_impl.h:180
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:209
Did it work? If it didn&#39;t, why?
Definition: status.h:31
void FinishOp(bool *status)
Definition: call_op_set.h:332
Receive status on the client: one and only one must be made on the client.
Definition: grpc_types.h:561
grpc_metadata * initial_metadata_
Definition: call_op_set.h:281
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:680
void AddInterceptionHookPoint(experimental::InterceptionHookPoints type)
Definition: interceptor_common.h:78
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:578
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:724
DeserializeFuncType(R *message)
Definition: call_op_set.h:505
size_t initial_metadata_count_
Definition: call_op_set.h:279
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:793
void SetRecvTrailingMetadata(MetadataMap *map)
Definition: interceptor_common.h:180
grpc_status_code status
Definition: grpc_types.h:607
grpc_status_code * status
Definition: grpc_types.h:636
size_t count
Definition: grpc_types.h:587
Definition: call_op_set.h:516
uint32_t flags
Write flags bitset for grpc_begin_messages.
Definition: grpc_types.h:578
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:719
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:787
Definition: call_op_set.h:744
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:667
void SetSendMessage(ByteBuffer *buf, const void **msg, bool *fail_send_message, std::function< Status(const void *)> serializer)
Definition: interceptor_common.h:143
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:481
Receive a message: 0 or more of these operations can occur for each call.
Definition: grpc_types.h:555
A sequence of bytes.
Definition: byte_buffer.h:72
CallOpGenericRecvMessage()
Definition: call_op_set.h:518
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:439
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:468
CallOpSet()
Definition: call_op_set.h:838
const char kBinaryErrorDetailsKey[]
Definition: metadata_map.h:31
The following two are for all clients and servers.
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:261
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:910
grpc::string error_details() const
Return the (binary) error details.
Definition: status.h:115
Straightforward wrapping of the C call object.
Definition: call.h:38
grpc_metadata * trailing_metadata
Definition: grpc_types.h:606
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:366
struct grpc_op::grpc_op_data::grpc_op_recv_status_on_client recv_status_on_client
~DeserializeFuncType() override
Definition: call_op_set.h:510