GRPC C++  1.22.0-dev
call_op_set.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
20 #define GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
21 
22 #include <assert.h>
23 #include <array>
24 #include <cstring>
25 #include <functional>
26 #include <map>
27 #include <memory>
28 #include <vector>
29 
44 
45 #include <grpc/impl/codegen/atm.h>
48 
49 namespace grpc {
50 
52 
53 namespace internal {
54 class Call;
55 class CallHook;
56 
57 // TODO(yangg) if the map is changed before we send, the pointers will be a
58 // mess. Make sure it does not happen.
60  const std::multimap<grpc::string, grpc::string>& metadata,
61  size_t* metadata_count, const grpc::string& optional_error_details) {
62  *metadata_count = metadata.size() + (optional_error_details.empty() ? 0 : 1);
63  if (*metadata_count == 0) {
64  return nullptr;
65  }
66  grpc_metadata* metadata_array =
67  (grpc_metadata*)(g_core_codegen_interface->gpr_malloc(
68  (*metadata_count) * sizeof(grpc_metadata)));
69  size_t i = 0;
70  for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
71  metadata_array[i].key = SliceReferencingString(iter->first);
72  metadata_array[i].value = SliceReferencingString(iter->second);
73  }
74  if (!optional_error_details.empty()) {
75  metadata_array[i].key =
76  g_core_codegen_interface->grpc_slice_from_static_buffer(
78  metadata_array[i].value = SliceReferencingString(optional_error_details);
79  }
80  return metadata_array;
81 }
82 } // namespace internal
83 
85 class WriteOptions {
86  public:
87  WriteOptions() : flags_(0), last_message_(false) {}
88  WriteOptions(const WriteOptions& other)
89  : flags_(other.flags_), last_message_(other.last_message_) {}
90 
92  inline void Clear() { flags_ = 0; }
93 
95  inline uint32_t flags() const { return flags_; }
96 
101  SetBit(GRPC_WRITE_NO_COMPRESS);
102  return *this;
103  }
104 
109  ClearBit(GRPC_WRITE_NO_COMPRESS);
110  return *this;
111  }
112 
117  inline bool get_no_compression() const {
118  return GetBit(GRPC_WRITE_NO_COMPRESS);
119  }
120 
126  SetBit(GRPC_WRITE_BUFFER_HINT);
127  return *this;
128  }
129 
135  ClearBit(GRPC_WRITE_BUFFER_HINT);
136  return *this;
137  }
138 
143  inline bool get_buffer_hint() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
144 
148  SetBit(GRPC_WRITE_BUFFER_HINT);
149  return *this;
150  }
151 
153  ClearBit(GRPC_WRITE_BUFFER_HINT);
154  return *this;
155  }
156 
157  inline bool is_corked() const { return GetBit(GRPC_WRITE_BUFFER_HINT); }
158 
165  last_message_ = true;
166  return *this;
167  }
168 
172  last_message_ = false;
173  return *this;
174  }
175 
179  SetBit(GRPC_WRITE_THROUGH);
180  return *this;
181  }
182 
183  inline bool is_write_through() const { return GetBit(GRPC_WRITE_THROUGH); }
184 
189  bool is_last_message() const { return last_message_; }
190 
192  flags_ = rhs.flags_;
193  return *this;
194  }
195 
196  private:
197  void SetBit(const uint32_t mask) { flags_ |= mask; }
198 
199  void ClearBit(const uint32_t mask) { flags_ &= ~mask; }
200 
201  bool GetBit(const uint32_t mask) const { return (flags_ & mask) != 0; }
202 
203  uint32_t flags_;
204  bool last_message_;
205 };
206 
207 namespace internal {
208 
211 template <int I>
212 class CallNoOp {
213  protected:
214  void AddOp(grpc_op* ops, size_t* nops) {}
215  void FinishOp(bool* status) {}
217  InterceptorBatchMethodsImpl* interceptor_methods) {}
219  InterceptorBatchMethodsImpl* interceptor_methods) {}
220  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {}
221 };
222 
224  public:
225  CallOpSendInitialMetadata() : send_(false) {
226  maybe_compression_level_.is_set = false;
227  }
228 
229  void SendInitialMetadata(std::multimap<grpc::string, grpc::string>* metadata,
230  uint32_t flags) {
231  maybe_compression_level_.is_set = false;
232  send_ = true;
233  flags_ = flags;
234  metadata_map_ = metadata;
235  }
236 
238  maybe_compression_level_.is_set = true;
239  maybe_compression_level_.level = level;
240  }
241 
242  protected:
243  void AddOp(grpc_op* ops, size_t* nops) {
244  if (!send_ || hijacked_) return;
245  grpc_op* op = &ops[(*nops)++];
247  op->flags = flags_;
248  op->reserved = NULL;
249  initial_metadata_ =
250  FillMetadataArray(*metadata_map_, &initial_metadata_count_, "");
251  op->data.send_initial_metadata.count = initial_metadata_count_;
252  op->data.send_initial_metadata.metadata = initial_metadata_;
254  maybe_compression_level_.is_set;
255  if (maybe_compression_level_.is_set) {
257  maybe_compression_level_.level;
258  }
259  }
260  void FinishOp(bool* status) {
261  if (!send_ || hijacked_) return;
262  g_core_codegen_interface->gpr_free(initial_metadata_);
263  send_ = false;
264  }
265 
267  InterceptorBatchMethodsImpl* interceptor_methods) {
268  if (!send_) return;
269  interceptor_methods->AddInterceptionHookPoint(
271  interceptor_methods->SetSendInitialMetadata(metadata_map_);
272  }
273 
275  InterceptorBatchMethodsImpl* interceptor_methods) {}
276 
277  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
278  hijacked_ = true;
279  }
280 
281  bool hijacked_ = false;
282  bool send_;
283  uint32_t flags_;
285  std::multimap<grpc::string, grpc::string>* metadata_map_;
287  struct {
288  bool is_set;
290  } maybe_compression_level_;
291 };
292 
294  public:
295  CallOpSendMessage() : send_buf_() {}
296 
299  template <class M>
300  Status SendMessage(const M& message,
302 
303  template <class M>
304  Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
305 
309  template <class M>
310  Status SendMessagePtr(const M* message,
312 
315  template <class M>
316  Status SendMessagePtr(const M* message) GRPC_MUST_USE_RESULT;
317 
318  protected:
319  void AddOp(grpc_op* ops, size_t* nops) {
320  if (msg_ == nullptr && !send_buf_.Valid()) return;
321  if (hijacked_) {
322  serializer_ = nullptr;
323  return;
324  }
325  if (msg_ != nullptr) {
326  GPR_CODEGEN_ASSERT(serializer_(msg_).ok());
327  }
328  serializer_ = nullptr;
329  grpc_op* op = &ops[(*nops)++];
330  op->op = GRPC_OP_SEND_MESSAGE;
331  op->flags = write_options_.flags();
332  op->reserved = NULL;
333  op->data.send_message.send_message = send_buf_.c_buffer();
334  // Flags are per-message: clear them after use.
335  write_options_.Clear();
336  }
337  void FinishOp(bool* status) {
338  if (msg_ == nullptr && !send_buf_.Valid()) return;
339  if (hijacked_ && failed_send_) {
340  // Hijacking interceptor failed this Op
341  *status = false;
342  } else if (!*status) {
343  // This Op was passed down to core and the Op failed
344  failed_send_ = true;
345  }
346  }
347 
349  InterceptorBatchMethodsImpl* interceptor_methods) {
350  if (msg_ == nullptr && !send_buf_.Valid()) return;
351  interceptor_methods->AddInterceptionHookPoint(
353  interceptor_methods->SetSendMessage(&send_buf_, &msg_, &failed_send_,
354  serializer_);
355  }
356 
358  InterceptorBatchMethodsImpl* interceptor_methods) {
359  if (msg_ != nullptr || send_buf_.Valid()) {
360  interceptor_methods->AddInterceptionHookPoint(
362  }
363  send_buf_.Clear();
364  msg_ = nullptr;
365  // The contents of the SendMessage value that was previously set
366  // has had its references stolen by core's operations
367  interceptor_methods->SetSendMessage(nullptr, nullptr, &failed_send_,
368  nullptr);
369  }
370 
371  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
372  hijacked_ = true;
373  }
374 
375  private:
376  const void* msg_ = nullptr; // The original non-serialized message
377  bool hijacked_ = false;
378  bool failed_send_ = false;
379  ByteBuffer send_buf_;
380  WriteOptions write_options_;
381  std::function<Status(const void*)> serializer_;
382 };
383 
384 template <class M>
386  write_options_ = options;
387  serializer_ = [this](const void* message) {
388  bool own_buf;
389  send_buf_.Clear();
390  // TODO(vjpai): Remove the void below when possible
391  // The void in the template parameter below should not be needed
392  // (since it should be implicit) but is needed due to an observed
393  // difference in behavior between clang and gcc for certain internal users
395  *static_cast<const M*>(message), send_buf_.bbuf_ptr(), &own_buf);
396  if (!own_buf) {
397  send_buf_.Duplicate();
398  }
399  return result;
400  };
401  // Serialize immediately only if we do not have access to the message pointer
402  if (msg_ == nullptr) {
403  Status result = serializer_(&message);
404  serializer_ = nullptr;
405  return result;
406  }
407  return Status();
408 }
409 
410 template <class M>
412  return SendMessage(message, WriteOptions());
413 }
414 
415 template <class M>
417  WriteOptions options) {
418  msg_ = message;
419  return SendMessage(*message, options);
420 }
421 
422 template <class M>
424  msg_ = message;
425  return SendMessage(*message, WriteOptions());
426 }
427 
428 template <class R>
429 class CallOpRecvMessage {
430  public:
432  : got_message(false),
433  message_(nullptr),
434  allow_not_getting_message_(false) {}
435 
436  void RecvMessage(R* message) { message_ = message; }
437 
438  // Do not change status if no message is received.
439  void AllowNoMessage() { allow_not_getting_message_ = true; }
440 
442 
443  protected:
444  void AddOp(grpc_op* ops, size_t* nops) {
445  if (message_ == nullptr || hijacked_) return;
446  grpc_op* op = &ops[(*nops)++];
447  op->op = GRPC_OP_RECV_MESSAGE;
448  op->flags = 0;
449  op->reserved = NULL;
450  op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
451  }
452 
453  void FinishOp(bool* status) {
454  if (message_ == nullptr || hijacked_) return;
455  if (recv_buf_.Valid()) {
456  if (*status) {
457  got_message = *status =
458  SerializationTraits<R>::Deserialize(recv_buf_.bbuf_ptr(), message_)
459  .ok();
460  recv_buf_.Release();
461  } else {
462  got_message = false;
463  recv_buf_.Clear();
464  }
465  } else {
466  got_message = false;
467  if (!allow_not_getting_message_) {
468  *status = false;
469  }
470  }
471  }
472 
474  InterceptorBatchMethodsImpl* interceptor_methods) {
475  if (message_ == nullptr) return;
476  interceptor_methods->SetRecvMessage(message_, &got_message);
477  }
478 
480  InterceptorBatchMethodsImpl* interceptor_methods) {
481  if (message_ == nullptr) return;
482  interceptor_methods->AddInterceptionHookPoint(
484  if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
485  }
486  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
487  hijacked_ = true;
488  if (message_ == nullptr) return;
489  interceptor_methods->AddInterceptionHookPoint(
491  got_message = true;
492  }
493 
494  private:
495  R* message_;
496  ByteBuffer recv_buf_;
497  bool allow_not_getting_message_;
498  bool hijacked_ = false;
499 };
500 
502  public:
503  virtual Status Deserialize(ByteBuffer* buf) = 0;
504  virtual ~DeserializeFunc() {}
505 };
506 
507 template <class R>
508 class DeserializeFuncType final : public DeserializeFunc {
509  public:
510  DeserializeFuncType(R* message) : message_(message) {}
511  Status Deserialize(ByteBuffer* buf) override {
512  return SerializationTraits<R>::Deserialize(buf->bbuf_ptr(), message_);
513  }
514 
515  ~DeserializeFuncType() override {}
516 
517  private:
518  R* message_; // Not a managed pointer because management is external to this
519 };
520 
522  public:
524  : got_message(false), allow_not_getting_message_(false) {}
525 
526  template <class R>
527  void RecvMessage(R* message) {
528  // Use an explicit base class pointer to avoid resolution error in the
529  // following unique_ptr::reset for some old implementations.
530  DeserializeFunc* func = new DeserializeFuncType<R>(message);
531  deserialize_.reset(func);
532  message_ = message;
533  }
534 
535  // Do not change status if no message is received.
536  void AllowNoMessage() { allow_not_getting_message_ = true; }
537 
539 
540  protected:
541  void AddOp(grpc_op* ops, size_t* nops) {
542  if (!deserialize_ || hijacked_) return;
543  grpc_op* op = &ops[(*nops)++];
544  op->op = GRPC_OP_RECV_MESSAGE;
545  op->flags = 0;
546  op->reserved = NULL;
547  op->data.recv_message.recv_message = recv_buf_.c_buffer_ptr();
548  }
549 
550  void FinishOp(bool* status) {
551  if (!deserialize_ || hijacked_) return;
552  if (recv_buf_.Valid()) {
553  if (*status) {
554  got_message = true;
555  *status = deserialize_->Deserialize(&recv_buf_).ok();
556  recv_buf_.Release();
557  } else {
558  got_message = false;
559  recv_buf_.Clear();
560  }
561  } else {
562  got_message = false;
563  if (!allow_not_getting_message_) {
564  *status = false;
565  }
566  }
567  }
568 
570  InterceptorBatchMethodsImpl* interceptor_methods) {
571  if (!deserialize_) return;
572  interceptor_methods->SetRecvMessage(message_, &got_message);
573  }
574 
576  InterceptorBatchMethodsImpl* interceptor_methods) {
577  if (!deserialize_) return;
578  interceptor_methods->AddInterceptionHookPoint(
580  if (!got_message) interceptor_methods->SetRecvMessage(nullptr, nullptr);
581  deserialize_.reset();
582  }
583  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
584  hijacked_ = true;
585  if (!deserialize_) return;
586  interceptor_methods->AddInterceptionHookPoint(
588  got_message = true;
589  }
590 
591  private:
592  void* message_;
593  bool hijacked_ = false;
594  std::unique_ptr<DeserializeFunc> deserialize_;
595  ByteBuffer recv_buf_;
596  bool allow_not_getting_message_;
597 };
598 
600  public:
601  CallOpClientSendClose() : send_(false) {}
602 
603  void ClientSendClose() { send_ = true; }
604 
605  protected:
606  void AddOp(grpc_op* ops, size_t* nops) {
607  if (!send_ || hijacked_) return;
608  grpc_op* op = &ops[(*nops)++];
610  op->flags = 0;
611  op->reserved = NULL;
612  }
613  void FinishOp(bool* status) { send_ = false; }
614 
616  InterceptorBatchMethodsImpl* interceptor_methods) {
617  if (!send_) return;
618  interceptor_methods->AddInterceptionHookPoint(
620  }
621 
623  InterceptorBatchMethodsImpl* interceptor_methods) {}
624 
625  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
626  hijacked_ = true;
627  }
628 
629  private:
630  bool hijacked_ = false;
631  bool send_;
632 };
633 
635  public:
636  CallOpServerSendStatus() : send_status_available_(false) {}
637 
639  std::multimap<grpc::string, grpc::string>* trailing_metadata,
640  const Status& status) {
641  send_error_details_ = status.error_details();
642  metadata_map_ = trailing_metadata;
643  send_status_available_ = true;
644  send_status_code_ = static_cast<grpc_status_code>(status.error_code());
645  send_error_message_ = status.error_message();
646  }
647 
648  protected:
649  void AddOp(grpc_op* ops, size_t* nops) {
650  if (!send_status_available_ || hijacked_) return;
651  trailing_metadata_ = FillMetadataArray(
652  *metadata_map_, &trailing_metadata_count_, send_error_details_);
653  grpc_op* op = &ops[(*nops)++];
656  trailing_metadata_count_;
657  op->data.send_status_from_server.trailing_metadata = trailing_metadata_;
658  op->data.send_status_from_server.status = send_status_code_;
659  error_message_slice_ = SliceReferencingString(send_error_message_);
661  send_error_message_.empty() ? nullptr : &error_message_slice_;
662  op->flags = 0;
663  op->reserved = NULL;
664  }
665 
666  void FinishOp(bool* status) {
667  if (!send_status_available_ || hijacked_) return;
668  g_core_codegen_interface->gpr_free(trailing_metadata_);
669  send_status_available_ = false;
670  }
671 
673  InterceptorBatchMethodsImpl* interceptor_methods) {
674  if (!send_status_available_) return;
675  interceptor_methods->AddInterceptionHookPoint(
677  interceptor_methods->SetSendTrailingMetadata(metadata_map_);
678  interceptor_methods->SetSendStatus(&send_status_code_, &send_error_details_,
679  &send_error_message_);
680  }
681 
683  InterceptorBatchMethodsImpl* interceptor_methods) {}
684 
685  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
686  hijacked_ = true;
687  }
688 
689  private:
690  bool hijacked_ = false;
691  bool send_status_available_;
692  grpc_status_code send_status_code_;
693  grpc::string send_error_details_;
694  grpc::string send_error_message_;
695  size_t trailing_metadata_count_;
696  std::multimap<grpc::string, grpc::string>* metadata_map_;
697  grpc_metadata* trailing_metadata_;
698  grpc_slice error_message_slice_;
699 };
700 
702  public:
703  CallOpRecvInitialMetadata() : metadata_map_(nullptr) {}
704 
706  context->initial_metadata_received_ = true;
707  metadata_map_ = &context->recv_initial_metadata_;
708  }
709 
710  protected:
711  void AddOp(grpc_op* ops, size_t* nops) {
712  if (metadata_map_ == nullptr || hijacked_) return;
713  grpc_op* op = &ops[(*nops)++];
715  op->data.recv_initial_metadata.recv_initial_metadata = metadata_map_->arr();
716  op->flags = 0;
717  op->reserved = NULL;
718  }
719 
720  void FinishOp(bool* status) {
721  if (metadata_map_ == nullptr || hijacked_) return;
722  }
723 
725  InterceptorBatchMethodsImpl* interceptor_methods) {
726  interceptor_methods->SetRecvInitialMetadata(metadata_map_);
727  }
728 
730  InterceptorBatchMethodsImpl* interceptor_methods) {
731  if (metadata_map_ == nullptr) return;
732  interceptor_methods->AddInterceptionHookPoint(
734  metadata_map_ = nullptr;
735  }
736 
737  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
738  hijacked_ = true;
739  if (metadata_map_ == nullptr) return;
740  interceptor_methods->AddInterceptionHookPoint(
742  }
743 
744  private:
745  bool hijacked_ = false;
746  MetadataMap* metadata_map_;
747 };
748 
750  public:
752  : recv_status_(nullptr), debug_error_string_(nullptr) {}
753 
754  void ClientRecvStatus(ClientContext* context, Status* status) {
755  client_context_ = context;
756  metadata_map_ = &client_context_->trailing_metadata_;
757  recv_status_ = status;
758  error_message_ = g_core_codegen_interface->grpc_empty_slice();
759  }
760 
761  protected:
762  void AddOp(grpc_op* ops, size_t* nops) {
763  if (recv_status_ == nullptr || hijacked_) return;
764  grpc_op* op = &ops[(*nops)++];
766  op->data.recv_status_on_client.trailing_metadata = metadata_map_->arr();
767  op->data.recv_status_on_client.status = &status_code_;
768  op->data.recv_status_on_client.status_details = &error_message_;
769  op->data.recv_status_on_client.error_string = &debug_error_string_;
770  op->flags = 0;
771  op->reserved = NULL;
772  }
773 
774  void FinishOp(bool* status) {
775  if (recv_status_ == nullptr || hijacked_) return;
776  grpc::string binary_error_details = metadata_map_->GetBinaryErrorDetails();
777  *recv_status_ =
778  Status(static_cast<StatusCode>(status_code_),
779  GRPC_SLICE_IS_EMPTY(error_message_)
780  ? grpc::string()
781  : grpc::string(GRPC_SLICE_START_PTR(error_message_),
782  GRPC_SLICE_END_PTR(error_message_)),
783  binary_error_details);
784  client_context_->set_debug_error_string(
785  debug_error_string_ != nullptr ? debug_error_string_ : "");
786  g_core_codegen_interface->grpc_slice_unref(error_message_);
787  if (debug_error_string_ != nullptr) {
788  g_core_codegen_interface->gpr_free((void*)debug_error_string_);
789  }
790  }
791 
793  InterceptorBatchMethodsImpl* interceptor_methods) {
794  interceptor_methods->SetRecvStatus(recv_status_);
795  interceptor_methods->SetRecvTrailingMetadata(metadata_map_);
796  }
797 
799  InterceptorBatchMethodsImpl* interceptor_methods) {
800  if (recv_status_ == nullptr) return;
801  interceptor_methods->AddInterceptionHookPoint(
803  recv_status_ = nullptr;
804  }
805 
806  void SetHijackingState(InterceptorBatchMethodsImpl* interceptor_methods) {
807  hijacked_ = true;
808  if (recv_status_ == nullptr) return;
809  interceptor_methods->AddInterceptionHookPoint(
811  }
812 
813  private:
814  bool hijacked_ = false;
815  ClientContext* client_context_;
816  MetadataMap* metadata_map_;
817  Status* recv_status_;
818  const char* debug_error_string_;
819  grpc_status_code status_code_;
820  grpc_slice error_message_;
821 };
822 
823 template <class Op1 = CallNoOp<1>, class Op2 = CallNoOp<2>,
824  class Op3 = CallNoOp<3>, class Op4 = CallNoOp<4>,
825  class Op5 = CallNoOp<5>, class Op6 = CallNoOp<6>>
826 class CallOpSet;
827 
834 template <class Op1, class Op2, class Op3, class Op4, class Op5, class Op6>
835 class CallOpSet : public CallOpSetInterface,
836  public Op1,
837  public Op2,
838  public Op3,
839  public Op4,
840  public Op5,
841  public Op6 {
842  public:
843  CallOpSet() : core_cq_tag_(this), return_tag_(this) {}
844  // The copy constructor and assignment operator reset the value of
845  // core_cq_tag_, return_tag_, done_intercepting_ and interceptor_methods_
846  // since those are only meaningful on a specific object, not across objects.
847  CallOpSet(const CallOpSet& other)
848  : core_cq_tag_(this),
849  return_tag_(this),
850  call_(other.call_),
851  done_intercepting_(false),
852  interceptor_methods_(InterceptorBatchMethodsImpl()) {}
853 
854  CallOpSet& operator=(const CallOpSet& other) {
855  core_cq_tag_ = this;
856  return_tag_ = this;
857  call_ = other.call_;
858  done_intercepting_ = false;
859  interceptor_methods_ = InterceptorBatchMethodsImpl();
860  return *this;
861  }
862 
863  void FillOps(Call* call) override {
864  done_intercepting_ = false;
865  g_core_codegen_interface->grpc_call_ref(call->call());
866  call_ =
867  *call; // It's fine to create a copy of call since it's just pointers
868 
869  if (RunInterceptors()) {
870  ContinueFillOpsAfterInterception();
871  } else {
872  // After the interceptors are run, ContinueFillOpsAfterInterception will
873  // be run
874  }
875  }
876 
877  bool FinalizeResult(void** tag, bool* status) override {
878  if (done_intercepting_) {
879  // Complete the avalanching since we are done with this batch of ops
880  call_.cq()->CompleteAvalanching();
881  // We have already finished intercepting and filling in the results. This
882  // round trip from the core needed to be made because interceptors were
883  // run
884  *tag = return_tag_;
885  *status = saved_status_;
886  g_core_codegen_interface->grpc_call_unref(call_.call());
887  return true;
888  }
889 
890  this->Op1::FinishOp(status);
891  this->Op2::FinishOp(status);
892  this->Op3::FinishOp(status);
893  this->Op4::FinishOp(status);
894  this->Op5::FinishOp(status);
895  this->Op6::FinishOp(status);
896  saved_status_ = *status;
897  if (RunInterceptorsPostRecv()) {
898  *tag = return_tag_;
899  g_core_codegen_interface->grpc_call_unref(call_.call());
900  return true;
901  }
902  // Interceptors are going to be run, so we can't return the tag just yet.
903  // After the interceptors are run, ContinueFinalizeResultAfterInterception
904  return false;
905  }
906 
907  void set_output_tag(void* return_tag) { return_tag_ = return_tag; }
908 
909  void* core_cq_tag() override { return core_cq_tag_; }
910 
915  void set_core_cq_tag(void* core_cq_tag) { core_cq_tag_ = core_cq_tag; }
916 
917  // This will be called while interceptors are run if the RPC is a hijacked
918  // RPC. This should set hijacking state for each of the ops.
919  void SetHijackingState() override {
920  this->Op1::SetHijackingState(&interceptor_methods_);
921  this->Op2::SetHijackingState(&interceptor_methods_);
922  this->Op3::SetHijackingState(&interceptor_methods_);
923  this->Op4::SetHijackingState(&interceptor_methods_);
924  this->Op5::SetHijackingState(&interceptor_methods_);
925  this->Op6::SetHijackingState(&interceptor_methods_);
926  }
927 
928  // Should be called after interceptors are done running
930  static const size_t MAX_OPS = 6;
931  grpc_op ops[MAX_OPS];
932  size_t nops = 0;
933  this->Op1::AddOp(ops, &nops);
934  this->Op2::AddOp(ops, &nops);
935  this->Op3::AddOp(ops, &nops);
936  this->Op4::AddOp(ops, &nops);
937  this->Op5::AddOp(ops, &nops);
938  this->Op6::AddOp(ops, &nops);
940  g_core_codegen_interface->grpc_call_start_batch(
941  call_.call(), ops, nops, core_cq_tag(), nullptr));
942  }
943 
944  // Should be called after interceptors are done running on the finalize result
945  // path
947  done_intercepting_ = true;
949  g_core_codegen_interface->grpc_call_start_batch(
950  call_.call(), nullptr, 0, core_cq_tag(), nullptr));
951  }
952 
953  private:
954  // Returns true if no interceptors need to be run
955  bool RunInterceptors() {
956  interceptor_methods_.ClearState();
957  interceptor_methods_.SetCallOpSetInterface(this);
958  interceptor_methods_.SetCall(&call_);
959  this->Op1::SetInterceptionHookPoint(&interceptor_methods_);
960  this->Op2::SetInterceptionHookPoint(&interceptor_methods_);
961  this->Op3::SetInterceptionHookPoint(&interceptor_methods_);
962  this->Op4::SetInterceptionHookPoint(&interceptor_methods_);
963  this->Op5::SetInterceptionHookPoint(&interceptor_methods_);
964  this->Op6::SetInterceptionHookPoint(&interceptor_methods_);
965  if (interceptor_methods_.InterceptorsListEmpty()) {
966  return true;
967  }
968  // This call will go through interceptors and would need to
969  // schedule new batches, so delay completion queue shutdown
970  call_.cq()->RegisterAvalanching();
971  return interceptor_methods_.RunInterceptors();
972  }
973  // Returns true if no interceptors need to be run
974  bool RunInterceptorsPostRecv() {
975  // Call and OpSet had already been set on the set state.
976  // SetReverse also clears previously set hook points
977  interceptor_methods_.SetReverse();
978  this->Op1::SetFinishInterceptionHookPoint(&interceptor_methods_);
979  this->Op2::SetFinishInterceptionHookPoint(&interceptor_methods_);
980  this->Op3::SetFinishInterceptionHookPoint(&interceptor_methods_);
981  this->Op4::SetFinishInterceptionHookPoint(&interceptor_methods_);
982  this->Op5::SetFinishInterceptionHookPoint(&interceptor_methods_);
983  this->Op6::SetFinishInterceptionHookPoint(&interceptor_methods_);
984  return interceptor_methods_.RunInterceptors();
985  }
986 
987  void* core_cq_tag_;
988  void* return_tag_;
989  Call call_;
990  bool done_intercepting_ = false;
991  InterceptorBatchMethodsImpl interceptor_methods_;
992  bool saved_status_;
993 };
994 
995 } // namespace internal
996 } // namespace grpc
997 
998 #endif // GRPCPP_IMPL_CODEGEN_CALL_OP_SET_H
void ContinueFillOpsAfterInterception() override
Definition: call_op_set.h:929
everything went ok
Definition: grpc_types.h:384
grpc_op_type op
Operation type, as defined by grpc_op_type.
Definition: grpc_types.h:567
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:622
grpc_metadata_array * recv_initial_metadata
Definition: grpc_types.h:610
union grpc_op::grpc_op_data data
bool get_no_compression() const
Get value for the flag indicating whether compression for the next message write is forcefully disabl...
Definition: call_op_set.h:117
void * reserved
Reserved for future usage.
Definition: grpc_types.h:571
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:216
WriteOptions & clear_buffer_hint()
Clears flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:134
grpc_status_code
Definition: status.h:26
WriteOptions & set_buffer_hint()
Sets flag indicating that the write may be buffered and need not go out on the wire immediately...
Definition: call_op_set.h:125
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:145
void FinishOp(bool *status)
Definition: call_op_set.h:666
void SetHijackingState() override
Definition: call_op_set.h:919
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:711
void ClientSendClose()
Definition: call_op_set.h:603
virtual void grpc_call_ref(grpc_call *call)=0
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:625
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:806
CallOpRecvMessage()
Definition: call_op_set.h:431
std::string string
Definition: config.h:35
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:277
bool get_buffer_hint() const
Get value for the flag indicating that the write may be buffered and need not go out on the wire imme...
Definition: call_op_set.h:143
WriteOptions & clear_no_compression()
Clears flag for the disabling of compression for the next message write.
Definition: call_op_set.h:108
struct grpc_byte_buffer ** recv_message
Definition: grpc_types.h:618
void FinishOp(bool *status)
Definition: call_op_set.h:720
struct grpc_byte_buffer * send_message
This op takes ownership of the slices in send_message.
Definition: grpc_types.h:593
Send a close from the client: one and only one instance MUST be sent from the client, unless the call was cancelled - in which case this can be skipped.
Definition: grpc_types.h:532
CallOpSet(const CallOpSet &other)
Definition: call_op_set.h:847
Send status from the server: one and only one instance MUST be sent from the server unless the call w...
Definition: grpc_types.h:537
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:319
void AllowNoMessage()
Definition: call_op_set.h:536
Definition: metadata_map.h:33
WriteOptions & set_last_message()
last-message bit: indicates this is the last message in a stream client-side: makes Write the equival...
Definition: call_op_set.h:164
#define GRPC_WRITE_NO_COMPRESS
Force compression to be disabled for a particular write (start_write/add_metadata).
Definition: grpc_types.h:431
grpc_slice key
the key, value values are expected to line up with grpc_mdelem: if changing them, update metadata...
Definition: grpc_types.h:463
Status Deserialize(ByteBuffer *buf) override
Definition: call_op_set.h:511
CallOpRecvInitialMetadata()
Definition: call_op_set.h:703
void SetRecvMessage(void *message, bool *got_message)
Definition: interceptor_common.h:169
struct grpc_op::grpc_op_data::grpc_op_recv_message recv_message
void SetRecvStatus(Status *status)
Definition: interceptor_common.h:178
Primary implementation of CallOpSetInterface.
Definition: call_op_set.h:826
#define GRPC_SLICE_IS_EMPTY(slice)
Definition: slice.h:107
grpc_slice value
Definition: grpc_types.h:464
#define GRPC_WRITE_THROUGH
Force this message to be written to the socket before completing it.
Definition: grpc_types.h:433
const char ** error_string
If this is not nullptr, it will be populated with the full fidelity error string for debugging purpos...
Definition: grpc_types.h:632
grpc_slice * status_details
Definition: grpc_types.h:628
void Clear()
Clear all flags.
Definition: call_op_set.h:92
virtual grpc_slice grpc_empty_slice()=0
WriteOptions & set_write_through()
Guarantee that all bytes have been written to the socket before completing this write (usually writes...
Definition: call_op_set.h:178
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:569
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1]. ...
Definition: slice.h:60
bool send_
Definition: call_op_set.h:282
#define GRPC_WRITE_BUFFER_HINT
Write Flags:
Definition: grpc_types.h:428
void RecvInitialMetadata(ClientContext *context)
Definition: call_op_set.h:705
Send a message: 0 or more of these operations can occur for each call.
Definition: grpc_types.h:527
bool got_message
Definition: call_op_set.h:441
virtual void grpc_call_unref(grpc_call *call)=0
The first three in this list are for clients and servers.
bool is_write_through() const
Definition: call_op_set.h:183
WriteOptions()
Definition: call_op_set.h:87
WriteOptions & clear_last_message()
Clears flag indicating that this is the last message in a stream, disabling coalescing.
Definition: call_op_set.h:171
grpc_slice SliceReferencingString(const grpc::string &str)
Definition: slice.h:131
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata::grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level
void ClientRecvStatus(ClientContext *context, Status *status)
Definition: call_op_set.h:754
grpc_compression_level
Compression levels allow a party with knowledge of its peer&#39;s accepted encodings to request compressi...
Definition: compression_types.h:71
grpc_call * call() const
Definition: call.h:72
#define GRPC_SLICE_START_PTR(slice)
Definition: slice.h:96
void ContinueFinalizeResultAfterInterception() override
Definition: call_op_set.h:946
bool is_set
Definition: call_op_set.h:288
grpc_metadata_array * trailing_metadata
ownership of the array is with the caller, but ownership of the elements stays with the call object (...
Definition: grpc_types.h:626
WriteOptions & clear_corked()
Definition: call_op_set.h:152
WriteOptions & set_no_compression()
Sets flag for the disabling of compression for the next message write.
Definition: call_op_set.h:100
#define GRPC_SLICE_END_PTR(slice)
Definition: slice.h:105
CallOpClientSendClose()
Definition: call_op_set.h:601
void FinishOp(bool *status)
Definition: call_op_set.h:260
A ClientContext allows the person implementing a service client to:
Definition: client_context.h:179
WriteOptions & operator=(const WriteOptions &rhs)
Definition: call_op_set.h:191
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:762
Definition: byte_buffer.h:56
::google::protobuf::util::Status Status
Definition: config_protobuf.h:96
uint32_t flags_
Definition: call_op_set.h:283
Default argument for CallOpSet.
Definition: call_op_set.h:212
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:575
void FinishOp(bool *status)
Definition: call_op_set.h:774
grpc_compression_level level
Definition: call_op_set.h:289
bool got_message
Definition: call_op_set.h:538
grpc::string error_message() const
Return the instance&#39;s error message.
Definition: status.h:112
Definition: call_op_set.h:501
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
The following three are for hijacked clients only.
StatusCode error_code() const
Return the instance&#39;s error code.
Definition: status.h:110
CallOpServerSendStatus()
Definition: call_op_set.h:636
bool FinalizeResult(void **tag, bool *status) override
FinalizeResult must be called before informing user code that the operation bound to the underlying c...
Definition: call_op_set.h:877
Definition: call_op_set.h:634
void set_compression_level(grpc_compression_level level)
Definition: call_op_set.h:237
Definition: call_op_set.h:223
bool is_corked() const
Definition: call_op_set.h:157
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:541
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:737
Definition: call_op_set.h:701
Status SendMessage(const M &message, WriteOptions options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call_op_set.h:385
std::multimap< grpc::string, grpc::string > * metadata_map_
Definition: call_op_set.h:285
Status SendMessagePtr(const M *message, WriteOptions options) GRPC_MUST_USE_RESULT
Send message using options for the write.
Definition: call_op_set.h:416
uint32_t flags() const
Returns raw flags bitset.
Definition: call_op_set.h:95
grpc_metadata * FillMetadataArray(const std::multimap< grpc::string, grpc::string > &metadata, size_t *metadata_count, const grpc::string &optional_error_details)
Definition: call_op_set.h:59
struct grpc_op::grpc_op_data::grpc_op_recv_initial_metadata recv_initial_metadata
A single metadata element.
Definition: grpc_types.h:460
Definition: call_op_set.h:293
struct grpc_op::grpc_op_data::grpc_op_send_initial_metadata send_initial_metadata
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:243
void FinishOp(bool *status)
Definition: call_op_set.h:550
Operation data: one field for each op type (except SEND_CLOSE_FROM_CLIENT which has no arguments) ...
Definition: grpc_types.h:565
virtual grpc_slice grpc_slice_from_static_buffer(const void *buffer, size_t length)=0
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:615
This header provides an object that reads bytes directly from a grpc::ByteBuffer, via the ZeroCopyInp...
Definition: alarm.h:24
Receive initial metadata: one and only one MUST be made on the client, must not be made on the server...
Definition: grpc_types.h:542
void RecvMessage(R *message)
Definition: call_op_set.h:527
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:218
void FillOps(Call *call) override
Fills in grpc_op, starting from ops[*nops] and moving upwards.
Definition: call_op_set.h:863
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:357
struct grpc_op::grpc_op_data::grpc_op_send_message send_message
void SendInitialMetadata(std::multimap< grpc::string, grpc::string > *metadata, uint32_t flags)
Definition: call_op_set.h:229
CoreCodegenInterface * g_core_codegen_interface
Definition: call_op_set.h:51
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:649
void * core_cq_tag() override
Get the tag to be used at the core completion queue.
Definition: call_op_set.h:909
#define GRPC_MUST_USE_RESULT
Definition: port_platform.h:514
Send initial metadata: one and only one instance MUST be sent for each call, unless the call was canc...
Definition: grpc_types.h:523
WriteOptions(const WriteOptions &other)
Definition: call_op_set.h:88
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:606
CallOpClientRecvStatus()
Definition: call_op_set.h:751
Definition: interceptor_common.h:36
Definition: byte_buffer.h:41
void ServerSendStatus(std::multimap< grpc::string, grpc::string > *trailing_metadata, const Status &status)
Definition: call_op_set.h:638
void FinishOp(bool *status)
Definition: call_op_set.h:613
Per-message write options.
Definition: call_op_set.h:85
grpc_slice * status_details
optional: set to NULL if no details need sending, non-NULL if they do pointer will not be retained pa...
Definition: grpc_types.h:602
CallOpSet & operator=(const CallOpSet &other)
Definition: call_op_set.h:854
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:274
void AllowNoMessage()
Definition: call_op_set.h:439
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:682
size_t trailing_metadata_count
Definition: grpc_types.h:596
CallOpSendMessage()
Definition: call_op_set.h:295
grpc_metadata * metadata
Definition: grpc_types.h:579
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:348
void FinishOp(bool *status)
Definition: call_op_set.h:215
virtual grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t nops, void *tag, void *reserved)=0
void FinishOp(bool *status)
Definition: call_op_set.h:453
Definition: call_op_set.h:599
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:479
An abstract collection of call ops, used to generate the grpc_call_op structure to pass down to the l...
Definition: call_op_set_interface.h:34
virtual void grpc_slice_unref(grpc_slice slice)=0
void SetSendStatus(grpc_status_code *code, grpc::string *error_details, grpc::string *error_message)
Definition: interceptor_common.h:157
void SetSendTrailingMetadata(std::multimap< grpc::string, grpc::string > *metadata)
Definition: interceptor_common.h:164
virtual ~DeserializeFunc()
Definition: call_op_set.h:504
void SetRecvInitialMetadata(MetadataMap *map)
Definition: interceptor_common.h:174
void RecvMessage(R *message)
Definition: call_op_set.h:436
struct grpc_op::grpc_op_data::grpc_op_send_status_from_server send_status_from_server
Interface between the codegen library and the minimal subset of core features required by the generat...
Definition: core_codegen_interface.h:38
void set_output_tag(void *return_tag)
Definition: call_op_set.h:907
CallOpSendInitialMetadata()
Definition: call_op_set.h:225
void SetSendInitialMetadata(std::multimap< grpc::string, grpc::string > *metadata)
Definition: interceptor_common.h:152
WriteOptions & set_corked()
corked bit: aliases set_buffer_hint currently, with the intent that set_buffer_hint will be removed i...
Definition: call_op_set.h:147
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:220
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:214
Did it work? If it didn&#39;t, why?
Definition: status.h:31
void FinishOp(bool *status)
Definition: call_op_set.h:337
Receive status on the client: one and only one must be made on the client.
Definition: grpc_types.h:552
grpc_metadata * initial_metadata_
Definition: call_op_set.h:286
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:685
void AddInterceptionHookPoint(experimental::InterceptionHookPoints type)
Definition: interceptor_common.h:78
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:583
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:729
DeserializeFuncType(R *message)
Definition: call_op_set.h:510
size_t initial_metadata_count_
Definition: call_op_set.h:284
void SetFinishInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:798
void SetRecvTrailingMetadata(MetadataMap *map)
Definition: interceptor_common.h:180
grpc_status_code status
Definition: grpc_types.h:598
grpc_status_code * status
Definition: grpc_types.h:627
size_t count
Definition: grpc_types.h:578
Definition: call_op_set.h:521
uint32_t flags
Write flags bitset for grpc_begin_messages.
Definition: grpc_types.h:569
bool is_last_message() const
Get value for the flag indicating that this is the last message, and should be coalesced with trailin...
Definition: call_op_set.h:189
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:724
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:792
Definition: call_op_set.h:749
virtual void gpr_free(void *p)=0
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:672
void SetSendMessage(ByteBuffer *buf, const void **msg, bool *fail_send_message, std::function< Status(const void *)> serializer)
Definition: interceptor_common.h:143
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:486
Receive a message: 0 or more of these operations can occur for each call.
Definition: grpc_types.h:546
A sequence of bytes.
Definition: byte_buffer.h:65
CallOpGenericRecvMessage()
Definition: call_op_set.h:523
void AddOp(grpc_op *ops, size_t *nops)
Definition: call_op_set.h:444
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:473
CallOpSet()
Definition: call_op_set.h:843
const char kBinaryErrorDetailsKey[]
Definition: metadata_map.h:31
The following two are for all clients and servers.
void SetInterceptionHookPoint(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:266
void set_core_cq_tag(void *core_cq_tag)
set_core_cq_tag is used to provide a different core CQ tag than "this".
Definition: call_op_set.h:915
grpc::string error_details() const
Return the (binary) error details.
Definition: status.h:115
Straightforward wrapping of the C call object.
Definition: call.h:38
grpc_metadata * trailing_metadata
Definition: grpc_types.h:597
void SetHijackingState(InterceptorBatchMethodsImpl *interceptor_methods)
Definition: call_op_set.h:371
struct grpc_op::grpc_op_data::grpc_op_recv_status_on_client recv_status_on_client
virtual void * gpr_malloc(size_t size)=0
~DeserializeFuncType() override
Definition: call_op_set.h:515