GRPC C++  1.19.0-dev
call_op_set.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
20 #define GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
21 
22 #include <assert.h>
23 #include <array>
24 #include <cstring>
25 #include <functional>
26 #include <map>
27 #include <memory>
28 #include <vector>
29 
43 
44 #include <grpc/impl/codegen/atm.h>
47 
48 namespace grpc {
49 
52 
53 namespace internal {
54 class Call;
55 class CallHook;
56 
57 // TODO(yangg) if the map is changed before we send, the pointers will be a
58 // mess. Make sure it does not happen.
60  const std::multimap<grpc::string, grpc::string>& metadata,
61  size_t* metadata_count, const grpc::string& optional_error_details) {
62  *metadata_count = metadata.size() + (optional_error_details.empty() ? 0 : 1);
63  if (*metadata_count == 0) {
64  return nullptr;
65  }
66  grpc_metadata* metadata_array =
67  (grpc_metadata*)(g_core_codegen_interface->gpr_malloc(
68  (*metadata_count) * sizeof(grpc_metadata)));
69  size_t i = 0;
70  for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
71  metadata_array[i].key = SliceReferencingString(iter->first);
72  metadata_array[i].value = SliceReferencingString(iter->second);
73  }
74  if (!optional_error_details.empty()) {
75  metadata_array[i].key =
76  g_core_codegen_interface->grpc_slice_from_static_buffer(
78  metadata_array[i].value = SliceReferencingString(optional_error_details);
79  }
80  return metadata_array;
81 }
82 } // namespace internal
83 
85 class WriteOptions {
86  public:
87  WriteOptions() : flags_(0), last_message_(false) {}
88  WriteOptions(const WriteOptions& other)
89  : flags_(other.flags_), last_message_(other.last_message_) {}
90 
92  inline void Clear() { flags_ = 0; }
93 
95  inline uint32_t flags() const { return flags_; }
96 
101  SetBit(GRPC_WRITE_NO_COMPRESS);
102  return *this;
103  }
104 
109  ClearBit(GRPC_WRITE_NO_COMPRESS);
110  return *this;
111  }
112 
117  inline bool get_no_compression() const {
118  return GetBit(GRPC_WRITE_NO_COMPRESS);
119  }
120 
126  SetBit(GRPC_WRITE_BUFFER_HINT);
127  return *this;
128  }
129 
135  ClearBit(GRPC_WRITE_BUFFER_HINT);
136  return *this;
137  }
138 
143  inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
144 
148  SetBit(GRPC_WRITE_BUFFER_HINT);
149  return *this;
150  }
151 
153  ClearBit(GRPC_WRITE_BUFFER_HINT);
154  return *this;
155  }
156 
157  inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
158 
165  last_message_ = true;
166  return *this;
167  }
168 
172  last_message_ = false;
173  return *this;
174  }
175 
179  SetBit(GRPC_WRITE_THROUGH);
180  return *this;
181  }
182 
183  inline bool is_write_through() const { return GetBit(GRPC_WRITE_THROUGH); }
184 
189  bool is_last_message() const { return last_message_; }
190 
192  flags_ = rhs.flags_;
193  return *this;
194  }
195 
196  private:
197  void SetBit(const uint32_t mask) { flags_ |= mask; }
198 
199  void ClearBit(const uint32_t mask) { flags_ &= ~mask; }
200 
201  bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
202 
203  uint32_t flags_;
204  bool last_message_;
205 };
206 
207 namespace internal {
208 
211 template <int I>
212 class CallNoOp {
213  protected:
214  void AddOp(grpc_op* ops, size_t* nops) {}
215  void FinishOp(bool* status) {}
217  InterceptorBatchMethodsImpl* interceptor_methods) {}
219  InterceptorBatchMethodsImpl* interceptor_methods) {}
220  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {}
221 };
222 
224  public:
225  CallOpSendInitialMetadata() : send_(false) {
226  maybe_compression_level_.is_set = false;
227  }
228 
229  void SendInitialMetadata(std::multimap<grpc::string, grpc::string>* metadata,
230  uint32_t flags) {
231  maybe_compression_level_.is_set = false;
232  send_ = true;
233  flags_ = flags;
234  metadata_map_ = metadata;
235  }
236 
238  maybe_compression_level_.is_set = true;
239  maybe_compression_level_.level = level;
240  }
241 
242  protected:
243  void AddOp(grpc_op* ops, size_t* nops) {
244  if (!send_ || hijacked_) return;
245  grpc_op* op = &ops[(*nops)++];
247  op->flags = flags_;
248  op->reserved = NULL;
249  initial_metadata_ =
250  FillMetadataArray(*metadata_map_, &initial_metadata_count_, "");
251  op->data.send_initial_metadata.count = initial_metadata_count_;
252  op->data.send_initial_metadata.metadata = initial_metadata_;
254  maybe_compression_level_.is_set;
255  if (maybe_compression_level_.is_set) {
257  maybe_compression_level_.level;
258  }
259  }
260  void FinishOp(bool* status) {
261  if (!send_ || hijacked_) return;
262  g_core_codegen_interface->gpr_free(initial_metadata_);
263  send_ = false;
264  }
265 
267  InterceptorBatchMethodsImpl* interceptor_methods) {
268  if (!send_) return;
269  interceptor_methods->AddInterceptionHookPoint(
271  interceptor_methods->SetSendInitialMetadata(metadata_map_);
272  }
273 
275  InterceptorBatchMethodsImpl* interceptor_methods) {}
276 
277  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
278  hijacked_ = true;
279  }
280 
281  bool hijacked_ = false;
282  bool send_;
283  uint32_t flags_;
285  std::multimap<grpc::string, grpc::string>* metadata_map_;
287  struct {
288  bool is_set;
290  } maybe_compression_level_;
291 };
292 
294  public:
295  CallOpSendMessage() : send_buf_() {}
296 
299  template <class M>
300  Status SendMessage(const M& message,
302 
303  template <class M>
304  Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
305 
309  template <class M>
310  Status SendMessagePtr(const M* message,
312 
315  template <class M>
316  Status SendMessagePtr(const M* message) GRPC_MUST_USE_RESULT;
317 
318  protected:
319  void AddOp(grpc_op* ops, size_t* nops) {
320  if (msg_ == nullptr && !send_buf_.Valid()) return;
321  if (hijacked_) {
322  serializer_ = nullptr;
323  return;
324  }
325  if (msg_ != nullptr) {
326  GPR_CODEGEN_ASSERT(serializer_(msg_).ok());
327  }
328  serializer_ = nullptr;
329  grpc_op* op = &ops[(*nops)++];
330  op->op = GRPC_OP_SEND_MESSAGE;
331  op->flags = write_options_.flags();
332  op->reserved = NULL;
333  op->data.send_message.send_message = send_buf_.c_buffer();
334  // Flags are per-message: clear them after use.
335  write_options_.Clear();
336  }
337  void FinishOp(bool* status) {
338  if (msg_ == nullptr && !send_buf_.Valid()) return;
339  if (hijacked_ && failed_send_) {
340  // Hijacking interceptor failed this Op
341  *status = false;
342  } else if (!*status) {
343  // This Op was passed down to core and the Op failed
344  failed_send_ = true;
345  }
346  }
347 
349  InterceptorBatchMethodsImpl* interceptor_methods) {
350  if (msg_ == nullptr && !send_buf_.Valid()) return;
351  interceptor_methods->AddInterceptionHookPoint(
353  interceptor_methods->SetSendMessage(&send_buf_, &msg_, &failed_send_,
354  serializer_);
355  }
356 
358  InterceptorBatchMethodsImpl* interceptor_methods) {
359  if (msg_ != nullptr || send_buf_.Valid()) {
360  interceptor_methods->AddInterceptionHookPoint(
362  }
363  send_buf_.Clear();
364  msg_ = nullptr;
365  // The contents of the SendMessage value that was previously set
366  // has had its references stolen by core's operations
367  interceptor_methods->SetSendMessage(nullptr, nullptr, &failed_send_,
368  nullptr);
369  }
370 
371  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
372  hijacked_ = true;
373  }
374 
375  private:
376  const void* msg_ = nullptr; // The original non-serialized message
377  bool hijacked_ = false;
378  bool failed_send_ = false;
379  ByteBuffer send_buf_;
380  WriteOptions write_options_;
381  std::function<Status(const void*)> serializer_;
382 };
383 
384 template <class M>
386  write_options_ = options;
387  serializer_ = [this](const void* message) {
388  bool own_buf;
389  send_buf_.Clear();
390  // TODO(vjpai): Remove the void below when possible
391  // The void in the template parameter below should not be needed
392  // (since it should be implicit) but is needed due to an observed
393  // difference in behavior between clang and gcc for certain internal users
395  *static_cast<const M*>(message), send_buf_.bbuf_ptr(), &own_buf);
396  if (!own_buf) {
397  send_buf_.Duplicate();
398  }
399  return result;
400  };
401  // Serialize immediately only if we do not have access to the message pointer
402  if (msg_ == nullptr) {
403  Status result = serializer_(&message);
404  serializer_ = nullptr;
405  return result;
406  }
407  return Status();
408 }
409 
410 template <class M>
412  return SendMessage(message, WriteOptions());
413 }
414 
415 template <class M>
417  WriteOptions options) {
418  msg_ = message;
419  return SendMessage(*message, options);
420 }
421 
422 template <class M>
424  msg_ = message;
425  return SendMessage(*message, WriteOptions());
426 }
427 
428 template <class R>
429 class CallOpRecvMessage {
430  public:
432  : got_message(false),
433  message_(nullptr),
434  allow_not_getting_message_(false) {}
435 
436  void RecvMessage(R* message) { message_ = message; }
437 
438  // Do not change status if no message is received.
439  void AllowNoMessage() { allow_not_getting_message_ = true; }
440 
442 
443  protected:
444  void AddOp(grpc_op* ops, size_t* nops) {
445  if (message_ == nullptr || hijacked_) return;
446  grpc_op* op = &ops[(*nops)++];
447  op->op = GRPC_OP_RECV_MESSAGE;
448  op->flags = 0;
449  op->reserved = NULL;
450  op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
451  }
452 
453  void FinishOp(bool* status) {
454  if (message_ == nullptr || hijacked_) return;
455  if (recv_buf_.Valid()) {
456  if (*status) {
457  got_message = *status =
458  SerializationTraits<R>::Deserialize(recv_buf_.bbuf_ptr(), message_)
459  .ok();
460  recv_buf_.Release();
461  } else {
462  got_message = false;
463  recv_buf_.Clear();
464  }
465  } else {
466  got_message = false;
467  if (!allow_not_getting_message_) {
468  *status = false;
469  }
470  }
471  message_ = nullptr;
472  }
473 
475  InterceptorBatchMethodsImpl* interceptor_methods) {
476  if (message_ == nullptr) return;
477  interceptor_methods->SetRecvMessage(message_, &got_message);
478  }
479 
481  InterceptorBatchMethodsImpl* interceptor_methods) {
482  if (message_ == nullptr) return;
483  interceptor_methods->AddInterceptionHookPoint(
485  if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
486  }
487  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
488  hijacked_ = true;
489  if (message_ == nullptr) return;
490  interceptor_methods->AddInterceptionHookPoint(
492  got_message = true;
493  }
494 
495  private:
496  R* message_;
497  ByteBuffer recv_buf_;
498  bool allow_not_getting_message_;
499  bool hijacked_ = false;
500 };
501 
503  public:
504  virtual Status Deserialize(ByteBuffer* buf) = 0;
505  virtual ~DeserializeFunc() {}
506 };
507 
508 template <class R>
509 class DeserializeFuncType final : public DeserializeFunc {
510  public:
511  DeserializeFuncType(R* message) : message_(message) {}
512  Status Deserialize(ByteBuffer* buf) override {
513  return SerializationTraits<R>::Deserialize(buf->bbuf_ptr(), message_);
514  }
515 
516  ~DeserializeFuncType() override {}
517 
518  private:
519  R* message_; // Not a managed pointer because management is external to this
520 };
521 
523  public:
525  : got_message(false), allow_not_getting_message_(false) {}
526 
527  template <class R>
528  void RecvMessage(R* message) {
529  // Use an explicit base class pointer to avoid resolution error in the
530  // following unique_ptr::reset for some old implementations.
531  DeserializeFunc* func = new DeserializeFuncType<R>(message);
532  deserialize_.reset(func);
533  message_ = message;
534  }
535 
536  // Do not change status if no message is received.
537  void AllowNoMessage() { allow_not_getting_message_ = true; }
538 
540 
541  protected:
542  void AddOp(grpc_op* ops, size_t* nops) {
543  if (!deserialize_ || hijacked_) return;
544  grpc_op* op = &ops[(*nops)++];
545  op->op = GRPC_OP_RECV_MESSAGE;
546  op->flags = 0;
547  op->reserved = NULL;
548  op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
549  }
550 
551  void FinishOp(bool* status) {
552  if (!deserialize_ || hijacked_) return;
553  if (recv_buf_.Valid()) {
554  if (*status) {
555  got_message = true;
556  *status = deserialize_->Deserialize(&recv_buf_).ok();
557  recv_buf_.Release();
558  } else {
559  got_message = false;
560  recv_buf_.Clear();
561  }
562  } else {
563  got_message = false;
564  if (!allow_not_getting_message_) {
565  *status = false;
566  }
567  }
568  deserialize_.reset();
569  }
570 
572  InterceptorBatchMethodsImpl* interceptor_methods) {
573  if (!deserialize_) return;
574  interceptor_methods->SetRecvMessage(message_, &got_message);
575  }
576 
578  InterceptorBatchMethodsImpl* interceptor_methods) {
579  if (!deserialize_) return;
580  interceptor_methods->AddInterceptionHookPoint(
582  if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
583  }
584  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
585  hijacked_ = true;
586  if (!deserialize_) return;
587  interceptor_methods->AddInterceptionHookPoint(
589  got_message = true;
590  }
591 
592  private:
593  void* message_;
594  bool hijacked_ = false;
595  std::unique_ptr<DeserializeFunc> deserialize_;
596  ByteBuffer recv_buf_;
597  bool allow_not_getting_message_;
598 };
599 
601  public:
602  CallOpClientSendClose() : send_(false) {}
603 
604  void ClientSendClose() { send_ = true; }
605 
606  protected:
607  void AddOp(grpc_op* ops, size_t* nops) {
608  if (!send_ || hijacked_) return;
609  grpc_op* op = &ops[(*nops)++];
611  op->flags = 0;
612  op->reserved = NULL;
613  }
614  void FinishOp(bool* status) { send_ = false; }
615 
617  InterceptorBatchMethodsImpl* interceptor_methods) {
618  if (!send_) return;
619  interceptor_methods->AddInterceptionHookPoint(
621  }
622 
624  InterceptorBatchMethodsImpl* interceptor_methods) {}
625 
626  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
627  hijacked_ = true;
628  }
629 
630  private:
631  bool hijacked_ = false;
632  bool send_;
633 };
634 
636  public:
637  CallOpServerSendStatus() : send_status_available_(false) {}
638 
640  std::multimap<grpc::string, grpc::string>* trailing_metadata,
641  const Status& status) {
642  send_error_details_ = status.error_details();
643  metadata_map_ = trailing_metadata;
644  send_status_available_ = true;
645  send_status_code_ = static_cast<grpc_status_code>(status.error_code());
646  send_error_message_ = status.error_message();
647  }
648 
649  protected:
650  void AddOp(grpc_op* ops, size_t* nops) {
651  if (!send_status_available_ || hijacked_) return;
652  trailing_metadata_ = FillMetadataArray(
653  *metadata_map_, &trailing_metadata_count_, send_error_details_);
654  grpc_op* op = &ops[(*nops)++];
657  trailing_metadata_count_;
658  op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
659  op->data.send_status_from_server.status = send_status_code_;
660  error_message_slice_ = SliceReferencingString(send_error_message_);
662  send_error_message_.empty() ? nullptr : &error_message_slice_;
663  op->flags = 0;
664  op->reserved = NULL;
665  }
666 
667  void FinishOp(bool* status) {
668  if (!send_status_available_ || hijacked_) return;
669  g_core_codegen_interface->gpr_free(trailing_metadata_);
670  send_status_available_ = false;
671  }
672 
674  InterceptorBatchMethodsImpl* interceptor_methods) {
675  if (!send_status_available_) return;
676  interceptor_methods->AddInterceptionHookPoint(
678  interceptor_methods->SetSendTrailingMetadata(metadata_map_);
679  interceptor_methods->SetSendStatus(&send_status_code_, &send_error_details_,
680  &send_error_message_);
681  }
682 
684  InterceptorBatchMethodsImpl* interceptor_methods) {}
685 
686  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
687  hijacked_ = true;
688  }
689 
690  private:
691  bool hijacked_ = false;
692  bool send_status_available_;
693  grpc_status_code send_status_code_;
694  grpc::string send_error_details_;
695  grpc::string send_error_message_;
696  size_t trailing_metadata_count_;
697  std::multimap<grpc::string, grpc::string>* metadata_map_;
698  grpc_metadata* trailing_metadata_;
699  grpc_slice error_message_slice_;
700 };
701 
703  public:
704  CallOpRecvInitialMetadata() : metadata_map_(nullptr) {}
705 
707  context->initial_metadata_received_ = true;
708  metadata_map_ = &context->recv_initial_metadata_;
709  }
710 
711  protected:
712  void AddOp(grpc_op* ops, size_t* nops) {
713  if (metadata_map_ == nullptr || hijacked_) return;
714  grpc_op* op = &ops[(*nops)++];
716  op->data.recv_initial_metadata.recv_initial_metadata = metadata_map_->arr();
717  op->flags = 0;
718  op->reserved = NULL;
719  }
720 
721  void FinishOp(bool* status) {
722  if (metadata_map_ == nullptr || hijacked_) return;
723  }
724 
726  InterceptorBatchMethodsImpl* interceptor_methods) {
727  interceptor_methods->SetRecvInitialMetadata(metadata_map_);
728  }
729 
731  InterceptorBatchMethodsImpl* interceptor_methods) {
732  if (metadata_map_ == nullptr) return;
733  interceptor_methods->AddInterceptionHookPoint(
735  metadata_map_ = nullptr;
736  }
737 
738  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
739  hijacked_ = true;
740  if (metadata_map_ == nullptr) return;
741  interceptor_methods->AddInterceptionHookPoint(
743  }
744 
745  private:
746  bool hijacked_ = false;
747  MetadataMap* metadata_map_;
748 };
749 
751  public:
753  : recv_status_(nullptr), debug_error_string_(nullptr) {}
754 
755  void ClientRecvStatus(ClientContext* context, Status* status) {
756  client_context_ = context;
757  metadata_map_ = &client_context_->trailing_metadata_;
758  recv_status_ = status;
759  error_message_ = g_core_codegen_interface->grpc_empty_slice();
760  }
761 
762  protected:
763  void AddOp(grpc_op* ops, size_t* nops) {
764  if (recv_status_ == nullptr || hijacked_) return;
765  grpc_op* op = &ops[(*nops)++];
767  op->data.recv_status_on_client.trailing_metadata = metadata_map_->arr();
768  op->data.recv_status_on_client.status = &status_code_;
769  op->data.recv_status_on_client.status_details = &error_message_;
770  op->data.recv_status_on_client.error_string = &debug_error_string_;
771  op->flags = 0;
772  op->reserved = NULL;
773  }
774 
775  void FinishOp(bool* status) {
776  if (recv_status_ == nullptr || hijacked_) return;
777  grpc::string binary_error_details = metadata_map_->GetBinaryErrorDetails();
778  *recv_status_ =
779  Status(static_cast<StatusCode>(status_code_),
780  GRPC_SLICE_IS_EMPTY(error_message_)
781  ? grpc::string()
782  : grpc::string(GRPC_SLICE_START_PTR(error_message_),
783  GRPC_SLICE_END_PTR(error_message_)),
784  binary_error_details);
785  client_context_->set_debug_error_string(
786  debug_error_string_ != nullptr ? debug_error_string_ : "");
787  g_core_codegen_interface->grpc_slice_unref(error_message_);
788  if (debug_error_string_ != nullptr) {
789  g_core_codegen_interface->gpr_free((void*)debug_error_string_);
790  }
791  }
792 
794  InterceptorBatchMethodsImpl* interceptor_methods) {
795  interceptor_methods->SetRecvStatus(recv_status_);
796  interceptor_methods->SetRecvTrailingMetadata(metadata_map_);
797  }
798 
800  InterceptorBatchMethodsImpl* interceptor_methods) {
801  if (recv_status_ == nullptr) return;
802  interceptor_methods->AddInterceptionHookPoint(
804  recv_status_ = nullptr;
805  }
806 
807  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
808  hijacked_ = true;
809  if (recv_status_ == nullptr) return;
810  interceptor_methods->AddInterceptionHookPoint(
812  }
813 
814  private:
815  bool hijacked_ = false;
816  ClientContext* client_context_;
817  MetadataMap* metadata_map_;
818  Status* recv_status_;
819  const char* debug_error_string_;
820  grpc_status_code status_code_;
821  grpc_slice error_message_;
822 };
823 
824 template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
825  class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
826  class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
827 class CallOpSet;
828 
835 template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
836 class CallOpSet : public CallOpSetInterface,
837  public Op1,
838  public Op2,
839  public Op3,
840  public Op4,
841  public Op5,
842  public Op6 {
843  public:
844  CallOpSet() : core_cq_tag_(this), return_tag_(this) {}
845  // The copy constructor and assignment operator reset the value of
846  // core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_
847  // since those are only meaningful on a specific object, not across objects.
848  CallOpSet(const CallOpSet& other)
849  : core_cq_tag_(this),
850  return_tag_(this),
851  call_(other.call_),
852  done_intercepting_(false),
853  interceptor_methods_(InterceptorBatchMethodsImpl()) {}
854 
855  CallOpSet& operator=(const CallOpSet& other) {
856  core_cq_tag_ = this;
857  return_tag_ = this;
858  call_ = other.call_;
859  done_intercepting_ = false;
860  interceptor_methods_ = InterceptorBatchMethodsImpl();
861  return *this;
862  }
863 
864  void FillOps(Call* call) override {
865  done_intercepting_ = false;
866  g_core_codegen_interface->grpc_call_ref(call->call());
867  call_ =
868  *call; // It's fine to create a copy of call since it's just pointers
869 
870  if (RunInterceptors()) {
871  ContinueFillOpsAfterInterception();
872  } else {
873  // After the interceptors are run, ContinueFillOpsAfterInterception will
874  // be run
875  }
876  }
877 
878  bool FinalizeResult(void** tag, bool* status) override {
879  if (done_intercepting_) {
880  // We have already finished intercepting and filling in the results. This
881  // round trip from the core needed to be made because interceptors were
882  // run
883  *tag = return_tag_;
884  *status = saved_status_;
885  g_core_codegen_interface->grpc_call_unref(call_.call());
886  return true;
887  }
888 
889  this->Op1::FinishOp(status);
890  this->Op2::FinishOp(status);
891  this->Op3::FinishOp(status);
892  this->Op4::FinishOp(status);
893  this->Op5::FinishOp(status);
894  this->Op6::FinishOp(status);
895  saved_status_ = *status;
896  if (RunInterceptorsPostRecv()) {
897  *tag = return_tag_;
898  g_core_codegen_interface->grpc_call_unref(call_.call());
899  return true;
900  }
901  // Interceptors are going to be run, so we can't return the tag just yet.
902  // After the interceptors are run, ContinueFinalizeResultAfterInterception
903  return false;
904  }
905 
906  void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
907 
908  void* core_cq_tag() override { return core_cq_tag_; }
909 
914  void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
915 
916  // This will be called while interceptors are run if the RPC is a hijacked
917  // RPC. This should set hijacking state for each of the ops.
918  void SetHijackingState() override {
919  this->Op1::SetHijackingState(&interceptor_methods_);
920  this->Op2::SetHijackingState(&interceptor_methods_);
921  this->Op3::SetHijackingState(&interceptor_methods_);
922  this->Op4::SetHijackingState(&interceptor_methods_);
923  this->Op5::SetHijackingState(&interceptor_methods_);
924  this->Op6::SetHijackingState(&interceptor_methods_);
925  }
926 
927  // Should be called after interceptors are done running
929  static const size_t MAX_OPS = 6;
930  grpc_op ops[MAX_OPS];
931  size_t nops = 0;
932  this->Op1::AddOp(ops, &nops);
933  this->Op2::AddOp(ops, &nops);
934  this->Op3::AddOp(ops, &nops);
935  this->Op4::AddOp(ops, &nops);
936  this->Op5::AddOp(ops, &nops);
937  this->Op6::AddOp(ops, &nops);
939  g_core_codegen_interface->grpc_call_start_batch(
940  call_.call(), ops, nops, core_cq_tag(), nullptr));
941  }
942 
943  // Should be called after interceptors are done running on the finalize result
944  // path
946  done_intercepting_ = true;
948  g_core_codegen_interface->grpc_call_start_batch(
949  call_.call(), nullptr, 0, core_cq_tag(), nullptr));
950  }
951 
952  private:
953  // Returns true if no interceptors need to be run
954  bool RunInterceptors() {
955  interceptor_methods_.ClearState();
956  interceptor_methods_.SetCallOpSetInterface(this);
957  interceptor_methods_.SetCall(&call_);
958  this->Op1::SetInterceptionHookPoint(&interceptor_methods_);
959  this->Op2::SetInterceptionHookPoint(&interceptor_methods_);
960  this->Op3::SetInterceptionHookPoint(&interceptor_methods_);
961  this->Op4::SetInterceptionHookPoint(&interceptor_methods_);
962  this->Op5::SetInterceptionHookPoint(&interceptor_methods_);
963  this->Op6::SetInterceptionHookPoint(&interceptor_methods_);
964  return interceptor_methods_.RunInterceptors();
965  }
966  // Returns true if no interceptors need to be run
967  bool RunInterceptorsPostRecv() {
968  // Call and OpSet had already been set on the set state.
969  // SetReverse also clears previously set hook points
970  interceptor_methods_.SetReverse();
971  this->Op1::SetFinishInterceptionHookPoint(&interceptor_methods_);
972  this->Op2::SetFinishInterceptionHookPoint(&interceptor_methods_);
973  this->Op3::SetFinishInterceptionHookPoint(&interceptor_methods_);
974  this->Op4::SetFinishInterceptionHookPoint(&interceptor_methods_);
975  this->Op5::SetFinishInterceptionHookPoint(&interceptor_methods_);
976  this->Op6::SetFinishInterceptionHookPoint(&interceptor_methods_);
977  return interceptor_methods_.RunInterceptors();
978  }
979 
980  void* core_cq_tag_;
981  void* return_tag_;
982  Call call_;
983  bool done_intercepting_ = false;
984  InterceptorBatchMethodsImpl interceptor_methods_;
985  bool saved_status_;
986 };
987 
988 } // namespace internal
989 } // namespace grpc
990 
991 #endif // GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
void ContinueFillOpsAfterInterception() override
Definition: call_op_set.h:928
everything went ok
Definition: grpc_types.h:373
grpc_op_type op
Operation type, as defined by grpc_op_type.
Definition: grpc_types.h:555
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:623
grpc_metadata_array * recv_initial_metadata
Definition: grpc_types.h:598
union grpc_op::grpc_op_data data
bool get_no_compression() const
Get value for the flag indicating whether compression for the next message write is forcefully disabl...
Definition: call_op_set.h:117
void * reserved
Reserved for future usage.
Definition: grpc_types.h:559
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:216
WriteOptions & clear_buffer_hint()
Clears flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:134
grpc_status_code
Definition: status.h:26
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:141
void FinishOp(bool *status)
Definition: call_op_set.h:667
void SetHijackingState() override
Definition: call_op_set.h:918
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:712
void ClientSendClose()
Definition: call_op_set.h:604
virtual void grpc_call_ref(grpc_call *call)=0
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:626
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:807
CallOpRecvMessage()
Definition: call_op_set.h:431
std::string string
Definition: config.h:35
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:277
bool get_buffer_hint() const
Get value for the flag indicating that the write may be buffered and need not go out on the wire imme...
Definition: call_op_set.h:143
WriteOptions & clear_no_compression()
Clears flag for the disabling of compression for the next message write.
Definition: call_op_set.h:108
struct grpc_byte_buffer ** recv_message
Definition: grpc_types.h:606
void FinishOp(bool *status)
Definition: call_op_set.h:721
struct grpc_byte_buffer * send_message
This op takes ownership of the slices in send_message.
Definition: grpc_types.h:581
Send a close from the client: one and only one instance MUST be sent from the client, unless the call was cancelled - in which case this can be skipped.
Definition: grpc_types.h:520
CallOpSet(const CallOpSet &other)
Definition: call_op_set.h:848
Send status from the server: one and only one instance MUST be sent from the server unless the call w...
Definition: grpc_types.h:525
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:319
void AllowNoMessage()
Definition: call_op_set.h:537
Definition: metadata_map.h:33
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
#define GRPC_WRITE_NO_COMPRESS
Force compression to be disabled for a particular write (start_write/add_metadata).
Definition: grpc_types.h:420
grpc_slice key
the key, value values are expected to line up with grpc_mdelem: if changing them, update metadata...
Definition: grpc_types.h:452
Status Deserialize(ByteBuffer *buf) override
Definition: call_op_set.h:512
CallOpRecvInitialMetadata()
Definition: call_op_set.h:704
void SetRecvMessage(void *message, bool *got_message)
Definition: interceptor_common.h:169
struct grpc_op::grpc_op_data::grpc_op_recv_message recv_message
void SetRecvStatus(Status *status)
Definition: interceptor_common.h:178
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:827
#define GRPC_SLICE_IS_EMPTY(slice)
Definition: slice.h:127
grpc_slice value
Definition: grpc_types.h:453
#define GRPC_WRITE_THROUGH
Force this message to be written to the socket before completing it.
Definition: grpc_types.h:422
const char ** error_string
If this is not nullptr, it will be populated with the full fidelity error string for debugging purpos...
Definition: grpc_types.h:620
grpc_slice * status_details
Definition: grpc_types.h:616
void Clear()
Clear all flags.
Definition: call_op_set.h:92
virtual grpc_slice grpc_empty_slice()=0
WriteOptions & set_write_through()
Guarantee that all bytes have been written to the socket before completing this write (usually writes...
Definition: call_op_set.h:178
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:571
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1]. ...
Definition: slice.h:80
bool send_
Definition: call_op_set.h:282
#define GRPC_WRITE_BUFFER_HINT
Write Flags:
Definition: grpc_types.h:417
void RecvInitialMetadata(ClientContext *context)
Definition: call_op_set.h:706
Send a message: 0 or more of these operations can occur for each call.
Definition: grpc_types.h:515
bool got_message
Definition: call_op_set.h:441
virtual void grpc_call_unref(grpc_call *call)=0
The first three in this list are for clients and servers.
bool is_write_through() const
Definition: call_op_set.h:183
WriteOptions()
Definition: call_op_set.h:87
WriteOptions & clear_last_message()
Clears flag indicating that this is the last message in a stream, disabling coalescing.
Definition: call_op_set.h:171
grpc_slice SliceReferencingString(const grpc::string &str)
Definition: slice.h:131
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata::grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level
void ClientRecvStatus(ClientContext *context, Status *status)
Definition: call_op_set.h:755
grpc_compression_level
Compression levels allow a party with knowledge of its peer&#39;s accepted encodings to request compressi...
Definition: compression_types.h:71
grpc_call * call() const
Definition: call.h:70
#define GRPC_SLICE_START_PTR(slice)
Definition: slice.h:116
void ContinueFinalizeResultAfterInterception() override
Definition: call_op_set.h:945
bool is_set
Definition: call_op_set.h:288
grpc_metadata_array * trailing_metadata
ownership of the array is with the caller, but ownership of the elements stays with the call object (...
Definition: grpc_types.h:614
WriteOptions & clear_corked()
Definition: call_op_set.h:152
WriteOptions & set_no_compression()
Sets flag for the disabling of compression for the next message write.
Definition: call_op_set.h:100
#define GRPC_SLICE_END_PTR(slice)
Definition: slice.h:125
CallOpClientSendClose()
Definition: call_op_set.h:602
void FinishOp(bool *status)
Definition: call_op_set.h:260
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:174
WriteOptions & operator=(const WriteOptions &rhs)
Definition: call_op_set.h:191
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:763
Definition: byte_buffer.h:55
::google::protobuf::util::Status Status
Definition: config_protobuf.h:93
uint32_t flags_
Definition: call_op_set.h:283
Default argument for CallOpSet.
Definition: call_op_set.h:212
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:577
void FinishOp(bool *status)
Definition: call_op_set.h:775
grpc_compression_level level
Definition: call_op_set.h:289
bool got_message
Definition: call_op_set.h:539
grpc::string error_message() const
Return the instance&#39;s error message.
Definition: status.h:112
Definition: call_op_set.h:502
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
The following three are for hijacked clients only and can only be registered by the global intercepto...
StatusCode error_code() const
Return the instance&#39;s error code.
Definition: status.h:110
CallOpServerSendStatus()
Definition: call_op_set.h:637
bool FinalizeResult(void **tag, bool *status) override
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
Definition: call_op_set.h:878
Definition: call_op_set.h:635
void set_compression_level(grpc_compression_level level)
Definition: call_op_set.h:237
Definition: call_op_set.h:223
bool is_corked() const
Definition: call_op_set.h:157
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:542
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:738
Definition: call_op_set.h:702
Status SendMessage(const M &message, WriteOptions options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call_op_set.h:385
std::multimap< grpc::string, grpc::string > * metadata_map_
Definition: call_op_set.h:285
Status SendMessagePtr(const M *message, WriteOptions options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call_op_set.h:416
uint32_t flags() const
Returns raw flags bitset.
Definition: call_op_set.h:95
grpc_metadata * FillMetadataArray(const std::multimap< grpc::string, grpc::string > &metadata, size_t *metadata_count, const grpc::string &optional_error_details)
Definition: call_op_set.h:59
struct grpc_op::grpc_op_data::grpc_op_recv_initial_metadata recv_initial_metadata
A single metadata element.
Definition: grpc_types.h:449
Definition: call_op_set.h:293
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata send_initial_metadata
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:243
void FinishOp(bool *status)
Definition: call_op_set.h:551
Operation data: one field for each op type (except SEND_CLOSE_FROM_CLIENT which has no arguments) ...
Definition: grpc_types.h:553
virtual grpc_slice grpc_slice_from_static_buffer(const void *buffer, size_t length)=0
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:616
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
Receive initial metadata: one and only one MUST be made on the client, must not be made on the server...
Definition: grpc_types.h:530
void RecvMessage(R *message)
Definition: call_op_set.h:528
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:218
void FillOps(Call *call) override
Fills in grpc_op, starting from ops[*nops] and moving upwards.
Definition: call_op_set.h:864
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:357
struct grpc_op::grpc_op_data::grpc_op_send_message send_message
void SendInitialMetadata(std::multimap< grpc::string, grpc::string > *metadata, uint32_t flags)
Definition: call_op_set.h:229
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:50
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:650
void * core_cq_tag() override
Get the tag to be used at the core completion queue.
Definition: call_op_set.h:908
#define GRPC_MUST_USE_RESULT
Definition: port_platform.h:478
Send initial metadata: one and only one instance MUST be sent for each call, unless the call was canc...
Definition: grpc_types.h:511
WriteOptions(const WriteOptions &other)
Definition: call_op_set.h:88
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:607
CallOpClientRecvStatus()
Definition: call_op_set.h:752
Definition: interceptor_common.h:36
Definition: byte_buffer.h:41
void ServerSendStatus(std::multimap< grpc::string, grpc::string > *trailing_metadata, const Status &status)
Definition: call_op_set.h:639
void FinishOp(bool *status)
Definition: call_op_set.h:614
Per-message write options.
Definition: call_op_set.h:85
grpc_slice * status_details
optional: set to NULL if no details need sending, non-NULL if they do pointer will not be retained pa...
Definition: grpc_types.h:590
CallOpSet & operator=(const CallOpSet &other)
Definition: call_op_set.h:855
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:274
void AllowNoMessage()
Definition: call_op_set.h:439
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:683
size_t trailing_metadata_count
Definition: grpc_types.h:584
CallOpSendMessage()
Definition: call_op_set.h:295
grpc_metadata * metadata
Definition: grpc_types.h:567
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:348
void FinishOp(bool *status)
Definition: call_op_set.h:215
virtual grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *tag, void *reserved)=0
void FinishOp(bool *status)
Definition: call_op_set.h:453
A thin wrapper around grpc_completion_queue (see src/core/lib/surface/completion_queue.h).
Definition: completion_queue.h:95
Definition: call_op_set.h:600
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:480
An abstract collection of call ops, used to generate the grpc_call_op structure to pass down to the l...
Definition: call_op_set_interface.h:34
virtual void grpc_slice_unref(grpc_slice slice)=0
void SetSendStatus(grpc_status_code *code, grpc::string *error_details, grpc::string *error_message)
Definition: interceptor_common.h:157
void SetSendTrailingMetadata(std::multimap< grpc::string, grpc::string > *metadata)
Definition: interceptor_common.h:164
virtual ~DeserializeFunc()
Definition: call_op_set.h:505
void SetRecvInitialMetadata(MetadataMap *map)
Definition: interceptor_common.h:174
void RecvMessage(R *message)
Definition: call_op_set.h:436
struct grpc_op::grpc_op_data::grpc_op_send_status_from_server send_status_from_server
Interface between the codegen library and the minimal subset of core features required by the generat...
Definition: core_codegen_interface.h:37
void set_output_tag(void *return_tag)
Definition: call_op_set.h:906
CallOpSendInitialMetadata()
Definition: call_op_set.h:225
void SetSendInitialMetadata(std::multimap< grpc::string, grpc::string > *metadata)
Definition: interceptor_common.h:152
WriteOptions & set_corked()
corked bit: aliases set_buffer_hint currently, with the intent that set_buffer_hint will be removed i...
Definition: call_op_set.h:147
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:220
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:214
Did it work? If it didn&#39;t, why?
Definition: status.h:31
void FinishOp(bool *status)
Definition: call_op_set.h:337
Receive status on the client: one and only one must be made on the client.
Definition: grpc_types.h:540
grpc_metadata * initial_metadata_
Definition: call_op_set.h:286
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:686
void AddInterceptionHookPoint(experimental::InterceptionHookPoints type)
Definition: interceptor_common.h:78
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:584
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:730
DeserializeFuncType(R *message)
Definition: call_op_set.h:511
size_t initial_metadata_count_
Definition: call_op_set.h:284
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:799
void SetRecvTrailingMetadata(MetadataMap *map)
Definition: interceptor_common.h:180
grpc_status_code status
Definition: grpc_types.h:586
grpc_status_code * status
Definition: grpc_types.h:615
size_t count
Definition: grpc_types.h:566
Definition: call_op_set.h:522
uint32_t flags
Write flags bitset for grpc_begin_messages.
Definition: grpc_types.h:557
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:725
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:793
Definition: call_op_set.h:750
virtual void gpr_free(void *p)=0
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:673
void SetSendMessage(ByteBuffer *buf, const void **msg, bool *fail_send_message, std::function< Status(const void *)> serializer)
Definition: interceptor_common.h:143
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:487
Receive a message: 0 or more of these operations can occur for each call.
Definition: grpc_types.h:534
A sequence of bytes.
Definition: byte_buffer.h:64
CallOpGenericRecvMessage()
Definition: call_op_set.h:524
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:444
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:474
CallOpSet()
Definition: call_op_set.h:844
const char kBinaryErrorDetailsKey[]
Definition: metadata_map.h:31
The following two are for all clients and servers.
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:266
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:914
grpc::string error_details() const
Return the (binary) error details.
Definition: status.h:115
Straightforward wrapping of the C call object.
Definition: call.h:36
grpc_metadata * trailing_metadata
Definition: grpc_types.h:585
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:371
struct grpc_op::grpc_op_data::grpc_op_recv_status_on_client recv_status_on_client
virtual void * gpr_malloc(size_t size)=0
~DeserializeFuncType() override
Definition: call_op_set.h:516