|  | @@ -30,29 +30,159 @@
 | 
	
		
			
				|  |  |  #include "src/cpp/server/health/health.pb.h"
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  namespace grpc {
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +// DefaultHealthCheckService
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +DefaultHealthCheckService::DefaultHealthCheckService() {
 | 
	
		
			
				|  |  | +  services_map_[""].SetServingStatus(SERVING);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::SetServingStatus(
 | 
	
		
			
				|  |  | +    const grpc::string& service_name, bool serving) {
 | 
	
		
			
				|  |  | +  std::unique_lock<std::mutex> lock(mu_);
 | 
	
		
			
				|  |  | +  services_map_[service_name].SetServingStatus(serving ? SERVING : NOT_SERVING);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::SetServingStatus(bool serving) {
 | 
	
		
			
				|  |  | +  const ServingStatus status = serving ? SERVING : NOT_SERVING;
 | 
	
		
			
				|  |  | +  std::unique_lock<std::mutex> lock(mu_);
 | 
	
		
			
				|  |  | +  for (auto& p : services_map_) {
 | 
	
		
			
				|  |  | +    ServiceData& service_data = p.second;
 | 
	
		
			
				|  |  | +    service_data.SetServingStatus(status);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +DefaultHealthCheckService::ServingStatus
 | 
	
		
			
				|  |  | +DefaultHealthCheckService::GetServingStatus(
 | 
	
		
			
				|  |  | +    const grpc::string& service_name) const {
 | 
	
		
			
				|  |  | +  std::lock_guard<std::mutex> lock(mu_);
 | 
	
		
			
				|  |  | +  auto it = services_map_.find(service_name);
 | 
	
		
			
				|  |  | +  if (it == services_map_.end()) {
 | 
	
		
			
				|  |  | +    return NOT_FOUND;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  const ServiceData& service_data = it->second;
 | 
	
		
			
				|  |  | +  return service_data.GetServingStatus();
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::RegisterCallHandler(
 | 
	
		
			
				|  |  | +    const grpc::string& service_name,
 | 
	
		
			
				|  |  | +    std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
 | 
	
		
			
				|  |  | +  std::unique_lock<std::mutex> lock(mu_);
 | 
	
		
			
				|  |  | +  ServiceData& service_data = services_map_[service_name];
 | 
	
		
			
				|  |  | +  service_data.AddCallHandler(handler /* copies ref */);
 | 
	
		
			
				|  |  | +  HealthCheckServiceImpl::CallHandler* h = handler.get();
 | 
	
		
			
				|  |  | +  h->SendHealth(std::move(handler), service_data.GetServingStatus());
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::UnregisterCallHandler(
 | 
	
		
			
				|  |  | +    const grpc::string& service_name,
 | 
	
		
			
				|  |  | +    std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
 | 
	
		
			
				|  |  | +  std::unique_lock<std::mutex> lock(mu_);
 | 
	
		
			
				|  |  | +  auto it = services_map_.find(service_name);
 | 
	
		
			
				|  |  | +  if (it == services_map_.end()) return;
 | 
	
		
			
				|  |  | +  ServiceData& service_data = it->second;
 | 
	
		
			
				|  |  | +  service_data.RemoveCallHandler(std::move(handler));
 | 
	
		
			
				|  |  | +  if (service_data.Unused()) {
 | 
	
		
			
				|  |  | +    services_map_.erase(it);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +DefaultHealthCheckService::HealthCheckServiceImpl*
 | 
	
		
			
				|  |  | +DefaultHealthCheckService::GetHealthCheckService(
 | 
	
		
			
				|  |  | +    std::unique_ptr<ServerCompletionQueue> cq) {
 | 
	
		
			
				|  |  | +  GPR_ASSERT(impl_ == nullptr);
 | 
	
		
			
				|  |  | +  impl_.reset(new HealthCheckServiceImpl(this, std::move(cq)));
 | 
	
		
			
				|  |  | +  return impl_.get();
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +// DefaultHealthCheckService::ServiceData
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::ServiceData::SetServingStatus(
 | 
	
		
			
				|  |  | +    ServingStatus status) {
 | 
	
		
			
				|  |  | +  status_ = status;
 | 
	
		
			
				|  |  | +  for (auto& call_handler : call_handlers_) {
 | 
	
		
			
				|  |  | +    call_handler->SendHealth(call_handler /* copies ref */, status);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::ServiceData::AddCallHandler(
 | 
	
		
			
				|  |  | +    std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
 | 
	
		
			
				|  |  | +  call_handlers_.insert(std::move(handler));
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::ServiceData::RemoveCallHandler(
 | 
	
		
			
				|  |  | +    std::shared_ptr<HealthCheckServiceImpl::CallHandler> handler) {
 | 
	
		
			
				|  |  | +  call_handlers_.erase(handler);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +// DefaultHealthCheckService::HealthCheckServiceImpl
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  namespace {
 | 
	
		
			
				|  |  |  const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
 | 
	
		
			
				|  |  | +const char kHealthWatchMethodName[] = "/grpc.health.v1.Health/Watch";
 | 
	
		
			
				|  |  |  }  // namespace
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
 | 
	
		
			
				|  |  | -    DefaultHealthCheckService* service)
 | 
	
		
			
				|  |  | -    : service_(service), method_(nullptr) {
 | 
	
		
			
				|  |  | -  internal::MethodHandler* handler =
 | 
	
		
			
				|  |  | -      new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer,
 | 
	
		
			
				|  |  | -                                     ByteBuffer>(
 | 
	
		
			
				|  |  | -          std::mem_fn(&HealthCheckServiceImpl::Check), this);
 | 
	
		
			
				|  |  | -  method_ = new internal::RpcServiceMethod(
 | 
	
		
			
				|  |  | -      kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler);
 | 
	
		
			
				|  |  | -  AddMethod(method_);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -Status DefaultHealthCheckService::HealthCheckServiceImpl::Check(
 | 
	
		
			
				|  |  | -    ServerContext* context, const ByteBuffer* request, ByteBuffer* response) {
 | 
	
		
			
				|  |  | -  // Decode request.
 | 
	
		
			
				|  |  | -  std::vector<Slice> slices;
 | 
	
		
			
				|  |  | -  if (!request->Dump(&slices).ok()) {
 | 
	
		
			
				|  |  | -    return Status(StatusCode::INVALID_ARGUMENT, "");
 | 
	
		
			
				|  |  | +    DefaultHealthCheckService* database,
 | 
	
		
			
				|  |  | +    std::unique_ptr<ServerCompletionQueue> cq)
 | 
	
		
			
				|  |  | +    : database_(database), cq_(std::move(cq)) {
 | 
	
		
			
				|  |  | +  // Add Check() method.
 | 
	
		
			
				|  |  | +  AddMethod(new internal::RpcServiceMethod(
 | 
	
		
			
				|  |  | +      kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, nullptr));
 | 
	
		
			
				|  |  | +  // Add Watch() method.
 | 
	
		
			
				|  |  | +  AddMethod(new internal::RpcServiceMethod(
 | 
	
		
			
				|  |  | +      kHealthWatchMethodName, internal::RpcMethod::SERVER_STREAMING, nullptr));
 | 
	
		
			
				|  |  | +  // Create serving thread.
 | 
	
		
			
				|  |  | +  thread_ = std::unique_ptr<::grpc_core::Thread>(
 | 
	
		
			
				|  |  | +      new ::grpc_core::Thread("grpc_health_check_service", Serve, this));
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +DefaultHealthCheckService::HealthCheckServiceImpl::~HealthCheckServiceImpl() {
 | 
	
		
			
				|  |  | +  // We will reach here after the server starts shutting down.
 | 
	
		
			
				|  |  | +  shutdown_ = true;
 | 
	
		
			
				|  |  | +  {
 | 
	
		
			
				|  |  | +    std::unique_lock<std::mutex> lock(cq_shutdown_mu_);
 | 
	
		
			
				|  |  | +    cq_->Shutdown();
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +  thread_->Join();
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::HealthCheckServiceImpl::StartServingThread() {
 | 
	
		
			
				|  |  | +  // Request the calls we're interested in.
 | 
	
		
			
				|  |  | +  // We do this before starting the serving thread, so that we know it's
 | 
	
		
			
				|  |  | +  // done before server startup is complete.
 | 
	
		
			
				|  |  | +  CheckCallHandler::CreateAndStart(cq_.get(), database_, this);
 | 
	
		
			
				|  |  | +  WatchCallHandler::CreateAndStart(cq_.get(), database_, this);
 | 
	
		
			
				|  |  | +  // Start serving thread.
 | 
	
		
			
				|  |  | +  thread_->Start();
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) {
 | 
	
		
			
				|  |  | +  HealthCheckServiceImpl* service =
 | 
	
		
			
				|  |  | +      reinterpret_cast<HealthCheckServiceImpl*>(arg);
 | 
	
		
			
				|  |  | +  void* tag;
 | 
	
		
			
				|  |  | +  bool ok;
 | 
	
		
			
				|  |  | +  while (true) {
 | 
	
		
			
				|  |  | +    if (!service->cq_->Next(&tag, &ok)) {
 | 
	
		
			
				|  |  | +      // The completion queue is shutting down.
 | 
	
		
			
				|  |  | +      GPR_ASSERT(service->shutdown_);
 | 
	
		
			
				|  |  | +      break;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    auto* next_step = static_cast<CallableTag*>(tag);
 | 
	
		
			
				|  |  | +    next_step->Run(ok);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +bool DefaultHealthCheckService::HealthCheckServiceImpl::DecodeRequest(
 | 
	
		
			
				|  |  | +    const ByteBuffer& request, grpc::string* service_name) {
 | 
	
		
			
				|  |  | +  std::vector<Slice> slices;
 | 
	
		
			
				|  |  | +  if (!request.Dump(&slices).ok()) return false;
 | 
	
		
			
				|  |  |    uint8_t* request_bytes = nullptr;
 | 
	
		
			
				|  |  |    bool request_bytes_owned = false;
 | 
	
		
			
				|  |  |    size_t request_size = 0;
 | 
	
	
		
			
				|  | @@ -64,14 +194,13 @@ Status DefaultHealthCheckService::HealthCheckServiceImpl::Check(
 | 
	
		
			
				|  |  |      request_size = slices[0].size();
 | 
	
		
			
				|  |  |    } else {
 | 
	
		
			
				|  |  |      request_bytes_owned = true;
 | 
	
		
			
				|  |  | -    request_bytes = static_cast<uint8_t*>(gpr_malloc(request->Length()));
 | 
	
		
			
				|  |  | +    request_bytes = static_cast<uint8_t*>(gpr_malloc(request.Length()));
 | 
	
		
			
				|  |  |      uint8_t* copy_to = request_bytes;
 | 
	
		
			
				|  |  |      for (size_t i = 0; i < slices.size(); i++) {
 | 
	
		
			
				|  |  |        memcpy(copy_to, slices[i].begin(), slices[i].size());
 | 
	
		
			
				|  |  |        copy_to += slices[i].size();
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |    if (request_bytes != nullptr) {
 | 
	
		
			
				|  |  |      pb_istream_t istream = pb_istream_from_buffer(request_bytes, request_size);
 | 
	
		
			
				|  |  |      bool decode_status = pb_decode(
 | 
	
	
		
			
				|  | @@ -79,26 +208,22 @@ Status DefaultHealthCheckService::HealthCheckServiceImpl::Check(
 | 
	
		
			
				|  |  |      if (request_bytes_owned) {
 | 
	
		
			
				|  |  |        gpr_free(request_bytes);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    if (!decode_status) {
 | 
	
		
			
				|  |  | -      return Status(StatusCode::INVALID_ARGUMENT, "");
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  // Check status from the associated default health checking service.
 | 
	
		
			
				|  |  | -  DefaultHealthCheckService::ServingStatus serving_status =
 | 
	
		
			
				|  |  | -      service_->GetServingStatus(
 | 
	
		
			
				|  |  | -          request_struct.has_service ? request_struct.service : "");
 | 
	
		
			
				|  |  | -  if (serving_status == DefaultHealthCheckService::NOT_FOUND) {
 | 
	
		
			
				|  |  | -    return Status(StatusCode::NOT_FOUND, "");
 | 
	
		
			
				|  |  | +    if (!decode_status) return false;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | +  *service_name = request_struct.has_service ? request_struct.service : "";
 | 
	
		
			
				|  |  | +  return true;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  // Encode response
 | 
	
		
			
				|  |  | +bool DefaultHealthCheckService::HealthCheckServiceImpl::EncodeResponse(
 | 
	
		
			
				|  |  | +    ServingStatus status, ByteBuffer* response) {
 | 
	
		
			
				|  |  |    grpc_health_v1_HealthCheckResponse response_struct;
 | 
	
		
			
				|  |  |    response_struct.has_status = true;
 | 
	
		
			
				|  |  |    response_struct.status =
 | 
	
		
			
				|  |  | -      serving_status == DefaultHealthCheckService::SERVING
 | 
	
		
			
				|  |  | -          ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
 | 
	
		
			
				|  |  | -          : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
 | 
	
		
			
				|  |  | +      status == NOT_FOUND
 | 
	
		
			
				|  |  | +          ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVICE_UNKNOWN
 | 
	
		
			
				|  |  | +          : status == SERVING
 | 
	
		
			
				|  |  | +                ? grpc_health_v1_HealthCheckResponse_ServingStatus_SERVING
 | 
	
		
			
				|  |  | +                : grpc_health_v1_HealthCheckResponse_ServingStatus_NOT_SERVING;
 | 
	
		
			
				|  |  |    pb_ostream_t ostream;
 | 
	
		
			
				|  |  |    memset(&ostream, 0, sizeof(ostream));
 | 
	
		
			
				|  |  |    pb_encode(&ostream, grpc_health_v1_HealthCheckResponse_fields,
 | 
	
	
		
			
				|  | @@ -108,48 +233,250 @@ Status DefaultHealthCheckService::HealthCheckServiceImpl::Check(
 | 
	
		
			
				|  |  |                                     GRPC_SLICE_LENGTH(response_slice));
 | 
	
		
			
				|  |  |    bool encode_status = pb_encode(
 | 
	
		
			
				|  |  |        &ostream, grpc_health_v1_HealthCheckResponse_fields, &response_struct);
 | 
	
		
			
				|  |  | -  if (!encode_status) {
 | 
	
		
			
				|  |  | -    return Status(StatusCode::INTERNAL, "Failed to encode response.");
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | +  if (!encode_status) return false;
 | 
	
		
			
				|  |  |    Slice encoded_response(response_slice, Slice::STEAL_REF);
 | 
	
		
			
				|  |  |    ByteBuffer response_buffer(&encoded_response, 1);
 | 
	
		
			
				|  |  |    response->Swap(&response_buffer);
 | 
	
		
			
				|  |  | -  return Status::OK;
 | 
	
		
			
				|  |  | +  return true;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -DefaultHealthCheckService::DefaultHealthCheckService() {
 | 
	
		
			
				|  |  | -  services_map_.emplace("", true);
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +// DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
 | 
	
		
			
				|  |  | +    CreateAndStart(ServerCompletionQueue* cq,
 | 
	
		
			
				|  |  | +                   DefaultHealthCheckService* database,
 | 
	
		
			
				|  |  | +                   HealthCheckServiceImpl* service) {
 | 
	
		
			
				|  |  | +  std::shared_ptr<CallHandler> self =
 | 
	
		
			
				|  |  | +      std::make_shared<CheckCallHandler>(cq, database, service);
 | 
	
		
			
				|  |  | +  CheckCallHandler* handler = static_cast<CheckCallHandler*>(self.get());
 | 
	
		
			
				|  |  | +  {
 | 
	
		
			
				|  |  | +    std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
 | 
	
		
			
				|  |  | +    if (service->shutdown_) return;
 | 
	
		
			
				|  |  | +    // Request a Check() call.
 | 
	
		
			
				|  |  | +    handler->next_ =
 | 
	
		
			
				|  |  | +        CallableTag(std::bind(&CheckCallHandler::OnCallReceived, handler,
 | 
	
		
			
				|  |  | +                              std::placeholders::_1, std::placeholders::_2),
 | 
	
		
			
				|  |  | +                    std::move(self));
 | 
	
		
			
				|  |  | +    service->RequestAsyncUnary(0, &handler->ctx_, &handler->request_,
 | 
	
		
			
				|  |  | +                               &handler->writer_, cq, cq, &handler->next_);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void DefaultHealthCheckService::SetServingStatus(
 | 
	
		
			
				|  |  | -    const grpc::string& service_name, bool serving) {
 | 
	
		
			
				|  |  | -  std::lock_guard<std::mutex> lock(mu_);
 | 
	
		
			
				|  |  | -  services_map_[service_name] = serving;
 | 
	
		
			
				|  |  | +DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
 | 
	
		
			
				|  |  | +    CheckCallHandler(ServerCompletionQueue* cq,
 | 
	
		
			
				|  |  | +                     DefaultHealthCheckService* database,
 | 
	
		
			
				|  |  | +                     HealthCheckServiceImpl* service)
 | 
	
		
			
				|  |  | +    : cq_(cq), database_(database), service_(service), writer_(&ctx_) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
 | 
	
		
			
				|  |  | +    OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
 | 
	
		
			
				|  |  | +  if (!ok) {
 | 
	
		
			
				|  |  | +    // The value of ok being false means that the server is shutting down.
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // Spawn a new handler instance to serve the next new client. Every handler
 | 
	
		
			
				|  |  | +  // instance will deallocate itself when it's done.
 | 
	
		
			
				|  |  | +  CreateAndStart(cq_, database_, service_);
 | 
	
		
			
				|  |  | +  // Process request.
 | 
	
		
			
				|  |  | +  gpr_log(GPR_DEBUG, "[HCS %p] Health check started for handler %p", service_,
 | 
	
		
			
				|  |  | +          this);
 | 
	
		
			
				|  |  | +  grpc::string service_name;
 | 
	
		
			
				|  |  | +  grpc::Status status = Status::OK;
 | 
	
		
			
				|  |  | +  ByteBuffer response;
 | 
	
		
			
				|  |  | +  if (!service_->DecodeRequest(request_, &service_name)) {
 | 
	
		
			
				|  |  | +    status = Status(StatusCode::INVALID_ARGUMENT, "could not parse request");
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    ServingStatus serving_status = database_->GetServingStatus(service_name);
 | 
	
		
			
				|  |  | +    if (serving_status == NOT_FOUND) {
 | 
	
		
			
				|  |  | +      status = Status(StatusCode::NOT_FOUND, "service name unknown");
 | 
	
		
			
				|  |  | +    } else if (!service_->EncodeResponse(serving_status, &response)) {
 | 
	
		
			
				|  |  | +      status = Status(StatusCode::INTERNAL, "could not encode response");
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // Send response.
 | 
	
		
			
				|  |  | +  {
 | 
	
		
			
				|  |  | +    std::unique_lock<std::mutex> lock(service_->cq_shutdown_mu_);
 | 
	
		
			
				|  |  | +    if (!service_->shutdown_) {
 | 
	
		
			
				|  |  | +      next_ =
 | 
	
		
			
				|  |  | +          CallableTag(std::bind(&CheckCallHandler::OnFinishDone, this,
 | 
	
		
			
				|  |  | +                                std::placeholders::_1, std::placeholders::_2),
 | 
	
		
			
				|  |  | +                      std::move(self));
 | 
	
		
			
				|  |  | +      if (status.ok()) {
 | 
	
		
			
				|  |  | +        writer_.Finish(response, status, &next_);
 | 
	
		
			
				|  |  | +      } else {
 | 
	
		
			
				|  |  | +        writer_.FinishWithError(status, &next_);
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -void DefaultHealthCheckService::SetServingStatus(bool serving) {
 | 
	
		
			
				|  |  | -  std::lock_guard<std::mutex> lock(mu_);
 | 
	
		
			
				|  |  | -  for (auto iter = services_map_.begin(); iter != services_map_.end(); ++iter) {
 | 
	
		
			
				|  |  | -    iter->second = serving;
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler::
 | 
	
		
			
				|  |  | +    OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
 | 
	
		
			
				|  |  | +  if (ok) {
 | 
	
		
			
				|  |  | +    gpr_log(GPR_DEBUG, "[HCS %p] Health check call finished for handler %p",
 | 
	
		
			
				|  |  | +            service_, this);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -DefaultHealthCheckService::ServingStatus
 | 
	
		
			
				|  |  | -DefaultHealthCheckService::GetServingStatus(
 | 
	
		
			
				|  |  | -    const grpc::string& service_name) const {
 | 
	
		
			
				|  |  | -  std::lock_guard<std::mutex> lock(mu_);
 | 
	
		
			
				|  |  | -  const auto& iter = services_map_.find(service_name);
 | 
	
		
			
				|  |  | -  if (iter == services_map_.end()) {
 | 
	
		
			
				|  |  | -    return NOT_FOUND;
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +// DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler
 | 
	
		
			
				|  |  | +//
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
 | 
	
		
			
				|  |  | +    CreateAndStart(ServerCompletionQueue* cq,
 | 
	
		
			
				|  |  | +                   DefaultHealthCheckService* database,
 | 
	
		
			
				|  |  | +                   HealthCheckServiceImpl* service) {
 | 
	
		
			
				|  |  | +  std::shared_ptr<CallHandler> self =
 | 
	
		
			
				|  |  | +      std::make_shared<WatchCallHandler>(cq, database, service);
 | 
	
		
			
				|  |  | +  WatchCallHandler* handler = static_cast<WatchCallHandler*>(self.get());
 | 
	
		
			
				|  |  | +  {
 | 
	
		
			
				|  |  | +    std::unique_lock<std::mutex> lock(service->cq_shutdown_mu_);
 | 
	
		
			
				|  |  | +    if (service->shutdown_) return;
 | 
	
		
			
				|  |  | +    // Request AsyncNotifyWhenDone().
 | 
	
		
			
				|  |  | +    handler->on_done_notified_ =
 | 
	
		
			
				|  |  | +        CallableTag(std::bind(&WatchCallHandler::OnDoneNotified, handler,
 | 
	
		
			
				|  |  | +                              std::placeholders::_1, std::placeholders::_2),
 | 
	
		
			
				|  |  | +                    self /* copies ref */);
 | 
	
		
			
				|  |  | +    handler->ctx_.AsyncNotifyWhenDone(&handler->on_done_notified_);
 | 
	
		
			
				|  |  | +    // Request a Watch() call.
 | 
	
		
			
				|  |  | +    handler->next_ =
 | 
	
		
			
				|  |  | +        CallableTag(std::bind(&WatchCallHandler::OnCallReceived, handler,
 | 
	
		
			
				|  |  | +                              std::placeholders::_1, std::placeholders::_2),
 | 
	
		
			
				|  |  | +                    std::move(self));
 | 
	
		
			
				|  |  | +    service->RequestAsyncServerStreaming(1, &handler->ctx_, &handler->request_,
 | 
	
		
			
				|  |  | +                                         &handler->stream_, cq, cq,
 | 
	
		
			
				|  |  | +                                         &handler->next_);
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  | -  return iter->second ? SERVING : NOT_SERVING;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -DefaultHealthCheckService::HealthCheckServiceImpl*
 | 
	
		
			
				|  |  | -DefaultHealthCheckService::GetHealthCheckService() {
 | 
	
		
			
				|  |  | -  GPR_ASSERT(impl_ == nullptr);
 | 
	
		
			
				|  |  | -  impl_.reset(new HealthCheckServiceImpl(this));
 | 
	
		
			
				|  |  | -  return impl_.get();
 | 
	
		
			
				|  |  | +DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
 | 
	
		
			
				|  |  | +    WatchCallHandler(ServerCompletionQueue* cq,
 | 
	
		
			
				|  |  | +                     DefaultHealthCheckService* database,
 | 
	
		
			
				|  |  | +                     HealthCheckServiceImpl* service)
 | 
	
		
			
				|  |  | +    : cq_(cq), database_(database), service_(service), stream_(&ctx_) {}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
 | 
	
		
			
				|  |  | +    OnCallReceived(std::shared_ptr<CallHandler> self, bool ok) {
 | 
	
		
			
				|  |  | +  if (!ok) {
 | 
	
		
			
				|  |  | +    // Server shutting down.
 | 
	
		
			
				|  |  | +    //
 | 
	
		
			
				|  |  | +    // AsyncNotifyWhenDone() needs to be called before the call starts, but the
 | 
	
		
			
				|  |  | +    // tag will not pop out if the call never starts (
 | 
	
		
			
				|  |  | +    // https://github.com/grpc/grpc/issues/10136). So we need to manually
 | 
	
		
			
				|  |  | +    // release the ownership of the handler in this case.
 | 
	
		
			
				|  |  | +    GPR_ASSERT(on_done_notified_.ReleaseHandler() != nullptr);
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // Spawn a new handler instance to serve the next new client. Every handler
 | 
	
		
			
				|  |  | +  // instance will deallocate itself when it's done.
 | 
	
		
			
				|  |  | +  CreateAndStart(cq_, database_, service_);
 | 
	
		
			
				|  |  | +  // Parse request.
 | 
	
		
			
				|  |  | +  if (!service_->DecodeRequest(request_, &service_name_)) {
 | 
	
		
			
				|  |  | +    SendFinish(std::move(self),
 | 
	
		
			
				|  |  | +               Status(StatusCode::INVALID_ARGUMENT, "could not parse request"));
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // Register the call for updates to the service.
 | 
	
		
			
				|  |  | +  gpr_log(GPR_DEBUG,
 | 
	
		
			
				|  |  | +          "[HCS %p] Health watch started for service \"%s\" (handler: %p)",
 | 
	
		
			
				|  |  | +          service_, service_name_.c_str(), this);
 | 
	
		
			
				|  |  | +  database_->RegisterCallHandler(service_name_, std::move(self));
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
 | 
	
		
			
				|  |  | +    SendHealth(std::shared_ptr<CallHandler> self, ServingStatus status) {
 | 
	
		
			
				|  |  | +  std::unique_lock<std::mutex> lock(send_mu_);
 | 
	
		
			
				|  |  | +  // If there's already a send in flight, cache the new status, and
 | 
	
		
			
				|  |  | +  // we'll start a new send for it when the one in flight completes.
 | 
	
		
			
				|  |  | +  if (send_in_flight_) {
 | 
	
		
			
				|  |  | +    pending_status_ = status;
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  // Start a send.
 | 
	
		
			
				|  |  | +  SendHealthLocked(std::move(self), status);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
 | 
	
		
			
				|  |  | +    SendHealthLocked(std::shared_ptr<CallHandler> self, ServingStatus status) {
 | 
	
		
			
				|  |  | +  send_in_flight_ = true;
 | 
	
		
			
				|  |  | +  // Construct response.
 | 
	
		
			
				|  |  | +  ByteBuffer response;
 | 
	
		
			
				|  |  | +  bool success = service_->EncodeResponse(status, &response);
 | 
	
		
			
				|  |  | +  // Grab shutdown lock and send response.
 | 
	
		
			
				|  |  | +  std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_);
 | 
	
		
			
				|  |  | +  if (service_->shutdown_) {
 | 
	
		
			
				|  |  | +    SendFinishLocked(std::move(self), Status::CANCELLED);
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (!success) {
 | 
	
		
			
				|  |  | +    SendFinishLocked(std::move(self),
 | 
	
		
			
				|  |  | +                     Status(StatusCode::INTERNAL, "could not encode response"));
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  next_ = CallableTag(std::bind(&WatchCallHandler::OnSendHealthDone, this,
 | 
	
		
			
				|  |  | +                                std::placeholders::_1, std::placeholders::_2),
 | 
	
		
			
				|  |  | +                      std::move(self));
 | 
	
		
			
				|  |  | +  stream_.Write(response, &next_);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
 | 
	
		
			
				|  |  | +    OnSendHealthDone(std::shared_ptr<CallHandler> self, bool ok) {
 | 
	
		
			
				|  |  | +  if (!ok) {
 | 
	
		
			
				|  |  | +    SendFinish(std::move(self), Status::CANCELLED);
 | 
	
		
			
				|  |  | +    return;
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  std::unique_lock<std::mutex> lock(send_mu_);
 | 
	
		
			
				|  |  | +  send_in_flight_ = false;
 | 
	
		
			
				|  |  | +  // If we got a new status since we started the last send, start a
 | 
	
		
			
				|  |  | +  // new send for it.
 | 
	
		
			
				|  |  | +  if (pending_status_ != NOT_FOUND) {
 | 
	
		
			
				|  |  | +    auto status = pending_status_;
 | 
	
		
			
				|  |  | +    pending_status_ = NOT_FOUND;
 | 
	
		
			
				|  |  | +    SendHealthLocked(std::move(self), status);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
 | 
	
		
			
				|  |  | +    SendFinish(std::shared_ptr<CallHandler> self, const Status& status) {
 | 
	
		
			
				|  |  | +  if (finish_called_) return;
 | 
	
		
			
				|  |  | +  std::unique_lock<std::mutex> cq_lock(service_->cq_shutdown_mu_);
 | 
	
		
			
				|  |  | +  if (!service_->shutdown_) return;
 | 
	
		
			
				|  |  | +  SendFinishLocked(std::move(self), status);
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
 | 
	
		
			
				|  |  | +    SendFinishLocked(std::shared_ptr<CallHandler> self, const Status& status) {
 | 
	
		
			
				|  |  | +  on_finish_done_ =
 | 
	
		
			
				|  |  | +      CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this,
 | 
	
		
			
				|  |  | +                            std::placeholders::_1, std::placeholders::_2),
 | 
	
		
			
				|  |  | +                  std::move(self));
 | 
	
		
			
				|  |  | +  stream_.Finish(status, &on_finish_done_);
 | 
	
		
			
				|  |  | +  finish_called_ = true;
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
 | 
	
		
			
				|  |  | +    OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) {
 | 
	
		
			
				|  |  | +  if (ok) {
 | 
	
		
			
				|  |  | +    gpr_log(GPR_DEBUG,
 | 
	
		
			
				|  |  | +            "[HCS %p] Health watch call finished (service_name: \"%s\", "
 | 
	
		
			
				|  |  | +            "handler: %p).",
 | 
	
		
			
				|  |  | +            service_, service_name_.c_str(), this);
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +// TODO(roth): This method currently assumes that there will be only one
 | 
	
		
			
				|  |  | +// thread polling the cq and invoking the corresponding callbacks.  If
 | 
	
		
			
				|  |  | +// that changes, we will need to add synchronization here.
 | 
	
		
			
				|  |  | +void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler::
 | 
	
		
			
				|  |  | +    OnDoneNotified(std::shared_ptr<CallHandler> self, bool ok) {
 | 
	
		
			
				|  |  | +  GPR_ASSERT(ok);
 | 
	
		
			
				|  |  | +  gpr_log(GPR_DEBUG,
 | 
	
		
			
				|  |  | +          "[HCS %p] Healt watch call is notified done (handler: %p, "
 | 
	
		
			
				|  |  | +          "is_cancelled: %d).",
 | 
	
		
			
				|  |  | +          service_, this, static_cast<int>(ctx_.IsCancelled()));
 | 
	
		
			
				|  |  | +  SendFinish(std::move(self), Status::CANCELLED);
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  }  // namespace grpc
 |