server_context.cc 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. /*
  2. *
  3. * Copyright 2015 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include <grpc++/server_context.h>
  19. #include <algorithm>
  20. #include <mutex>
  21. #include <utility>
  22. #include <grpc++/completion_queue.h>
  23. #include <grpc++/impl/call.h>
  24. #include <grpc++/support/time.h>
  25. #include <grpc/compression.h>
  26. #include <grpc/grpc.h>
  27. #include <grpc/load_reporting.h>
  28. #include <grpc/support/alloc.h>
  29. #include <grpc/support/log.h>
  30. #include "src/core/lib/surface/call.h"
  31. namespace grpc {
  32. // CompletionOp
  33. class ServerContext::CompletionOp final : public CallOpSetInterface {
  34. public:
  35. // initial refs: one in the server context, one in the cq
  36. CompletionOp()
  37. : has_tag_(false),
  38. tag_(nullptr),
  39. refs_(2),
  40. finalized_(false),
  41. cancelled_(0) {}
  42. void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override;
  43. bool FinalizeResult(void** tag, bool* status) override;
  44. bool CheckCancelled(CompletionQueue* cq) {
  45. cq->TryPluck(this);
  46. return CheckCancelledNoPluck();
  47. }
  48. bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
  49. void set_tag(void* tag) {
  50. has_tag_ = true;
  51. tag_ = tag;
  52. }
  53. void Unref();
  54. private:
  55. bool CheckCancelledNoPluck() {
  56. std::lock_guard<std::mutex> g(mu_);
  57. return finalized_ ? (cancelled_ != 0) : false;
  58. }
  59. bool has_tag_;
  60. void* tag_;
  61. std::mutex mu_;
  62. int refs_;
  63. bool finalized_;
  64. int cancelled_;
  65. };
  66. void ServerContext::CompletionOp::Unref() {
  67. std::unique_lock<std::mutex> lock(mu_);
  68. if (--refs_ == 0) {
  69. lock.unlock();
  70. delete this;
  71. }
  72. }
  73. void ServerContext::CompletionOp::FillOps(grpc_call* call, grpc_op* ops,
  74. size_t* nops) {
  75. ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
  76. ops->data.recv_close_on_server.cancelled = &cancelled_;
  77. ops->flags = 0;
  78. ops->reserved = NULL;
  79. *nops = 1;
  80. }
  81. bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
  82. std::unique_lock<std::mutex> lock(mu_);
  83. finalized_ = true;
  84. bool ret = false;
  85. if (has_tag_) {
  86. *tag = tag_;
  87. ret = true;
  88. }
  89. if (!*status) cancelled_ = 1;
  90. if (--refs_ == 0) {
  91. lock.unlock();
  92. delete this;
  93. }
  94. return ret;
  95. }
  96. // ServerContext body
  97. ServerContext::ServerContext()
  98. : completion_op_(nullptr),
  99. has_notify_when_done_tag_(false),
  100. async_notify_when_done_tag_(nullptr),
  101. deadline_(gpr_inf_future(GPR_CLOCK_REALTIME)),
  102. call_(nullptr),
  103. cq_(nullptr),
  104. sent_initial_metadata_(false),
  105. compression_level_set_(false) {}
  106. ServerContext::ServerContext(gpr_timespec deadline, grpc_metadata_array* arr)
  107. : completion_op_(nullptr),
  108. has_notify_when_done_tag_(false),
  109. async_notify_when_done_tag_(nullptr),
  110. deadline_(deadline),
  111. call_(nullptr),
  112. cq_(nullptr),
  113. sent_initial_metadata_(false),
  114. compression_level_set_(false) {
  115. std::swap(*client_metadata_.arr(), *arr);
  116. client_metadata_.FillMap();
  117. }
  118. ServerContext::~ServerContext() {
  119. if (call_) {
  120. grpc_call_unref(call_);
  121. }
  122. if (completion_op_) {
  123. completion_op_->Unref();
  124. }
  125. }
  126. void ServerContext::BeginCompletionOp(Call* call) {
  127. GPR_ASSERT(!completion_op_);
  128. completion_op_ = new CompletionOp();
  129. if (has_notify_when_done_tag_) {
  130. completion_op_->set_tag(async_notify_when_done_tag_);
  131. }
  132. call->PerformOps(completion_op_);
  133. }
  134. CompletionQueueTag* ServerContext::GetCompletionOpTag() {
  135. return static_cast<CompletionQueueTag*>(completion_op_);
  136. }
  137. void ServerContext::AddInitialMetadata(const grpc::string& key,
  138. const grpc::string& value) {
  139. initial_metadata_.insert(std::make_pair(key, value));
  140. }
  141. void ServerContext::AddTrailingMetadata(const grpc::string& key,
  142. const grpc::string& value) {
  143. trailing_metadata_.insert(std::make_pair(key, value));
  144. }
  145. void ServerContext::TryCancel() const {
  146. grpc_call_error err = grpc_call_cancel_with_status(
  147. call_, GRPC_STATUS_CANCELLED, "Cancelled on the server side", NULL);
  148. if (err != GRPC_CALL_OK) {
  149. gpr_log(GPR_ERROR, "TryCancel failed with: %d", err);
  150. }
  151. }
  152. bool ServerContext::IsCancelled() const {
  153. if (has_notify_when_done_tag_) {
  154. // when using async API, but the result is only valid
  155. // if the tag has already been delivered at the completion queue
  156. return completion_op_ && completion_op_->CheckCancelledAsync();
  157. } else {
  158. // when using sync API
  159. return completion_op_ && completion_op_->CheckCancelled(cq_);
  160. }
  161. }
  162. void ServerContext::set_compression_algorithm(
  163. grpc_compression_algorithm algorithm) {
  164. char* algorithm_name = NULL;
  165. if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
  166. gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
  167. algorithm);
  168. abort();
  169. }
  170. GPR_ASSERT(algorithm_name != NULL);
  171. AddInitialMetadata(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, algorithm_name);
  172. }
  173. grpc::string ServerContext::peer() const {
  174. grpc::string peer;
  175. if (call_) {
  176. char* c_peer = grpc_call_get_peer(call_);
  177. peer = c_peer;
  178. gpr_free(c_peer);
  179. }
  180. return peer;
  181. }
  182. const struct census_context* ServerContext::census_context() const {
  183. return grpc_census_call_get_context(call_);
  184. }
  185. void ServerContext::SetLoadReportingCosts(
  186. const std::vector<grpc::string>& cost_data) {
  187. if (call_ == nullptr) return;
  188. for (const auto& cost_datum : cost_data) {
  189. AddTrailingMetadata(GRPC_LB_COST_MD_KEY, cost_datum);
  190. }
  191. }
  192. } // namespace grpc