| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 | #include <chrono>#include <iostream>#include <sstream>#include <string>#include <thread>#include <google/protobuf/io/coded_stream.h>#include <google/protobuf/io/zero_copy_stream_impl.h>#include <google/protobuf/util/json_util.h>#include <google/protobuf/util/message_differencer.h>#include "exposer.h"#include "cpp/metrics.pb.h"namespace prometheus {MetricsHandler::MetricsHandler(    const std::vector<std::weak_ptr<Collectable>>& collectables,    Registry& registry)    : collectables_(collectables),      bytesTransferedFamily_(registry.add_counter(          "exposer_bytes_transfered", "bytesTransferred to metrics services",          {{"component", "exposer"}})),      bytesTransfered_(bytesTransferedFamily_->add({})),      numScrapesFamily_(registry.add_counter(          "exposer_total_scrapes", "Number of times metrics were scraped",          {{"component", "exposer"}})),      numScrapes_(numScrapesFamily_->add({})),      requestLatenciesFamily_(registry.add_histogram(          "exposer_request_latencies",          "Latencies of serving scrape requests, in milliseconds",          {{"component", "exposer"}})),      requestLatencies_(requestLatenciesFamily_->add(          {}, Histogram::BucketBoundaries{1, 5, 10, 20, 40, 80, 160, 320, 640,                                          1280, 2560})) {}static std::string serializeToDelimitedProtobuf(    const std::vector<io::prometheus::client::MetricFamily>& metrics) {  std::ostringstream ss;  for (auto&& metric : metrics) {    {      google::protobuf::io::OstreamOutputStream rawOutput{&ss};      google::protobuf::io::CodedOutputStream output(&rawOutput);      const int size = metric.ByteSize();      output.WriteVarint32(size);    }    auto buffer = std::string{};    metric.SerializeToString(&buffer);    ss << buffer;  }  return ss.str();}static std::string serializeToJson(    const std::vector<io::prometheus::client::MetricFamily>& metrics) {  using google::protobuf::util::MessageDifferencer;  std::stringstream ss;  ss << "[";  for (auto&& metric : metrics) {    std::string result;    google::protobuf::util::MessageToJsonString(        metric, &result, google::protobuf::util::JsonPrintOptions());    ss << result;    if (!MessageDifferencer::Equals(metric, metrics.back())) {      ss << ",";    }  }  ss << "]";  return ss.str();}static std::string serializeToHumanReadable(    const std::vector<io::prometheus::client::MetricFamily>& metrics) {  auto result = std::string{};  for (auto&& metric : metrics) {    result += metric.DebugString() + "\n";  }  return result;}static std::string getAcceptedEncoding(struct mg_connection* conn) {  auto request_info = mg_get_request_info(conn);  for (int i = 0; i < request_info->num_headers; i++) {    auto header = request_info->http_headers[i];    if (std::string{header.name} == "Accept") {      return {header.value};    }  }  return "";}bool MetricsHandler::handleGet(CivetServer* server,                               struct mg_connection* conn) {  using namespace io::prometheus::client;  auto startTimeOfRequest = std::chrono::steady_clock::now();  auto acceptedEncoding = getAcceptedEncoding(conn);  auto metrics = collectMetrics();  auto body = std::string{};  auto contentType = std::string{};  if (acceptedEncoding.find("application/vnd.google.protobuf") !=      std::string::npos) {    body = serializeToDelimitedProtobuf(metrics);    contentType =        "application/vnd.google.protobuf; "        "proto=io.prometheus.client.MetricFamily; "        "encoding=delimited";  } else if (acceptedEncoding.find("application/json") != std::string::npos) {    body = serializeToJson(metrics);    contentType = "application/json";  } else {    body = serializeToHumanReadable(metrics);    contentType = "text/plain";  }  mg_printf(conn,            "HTTP/1.1 200 OK\r\n"            "Content-Type: %s\r\n",            contentType.c_str());  mg_printf(conn, "Content-Length: %lu\r\n\r\n", body.size());  mg_write(conn, body.data(), body.size());  auto stopTimeOfRequest = std::chrono::steady_clock::now();  auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(      stopTimeOfRequest - startTimeOfRequest);  requestLatencies_->observe(duration.count());  bytesTransfered_->inc(body.size());  numScrapes_->inc();  return true;}Exposer::Exposer(std::uint16_t port)    : server_({"listening_ports", std::to_string(port)}),      exposerRegistry_(          std::make_shared<Registry>(std::map<std::string, std::string>{})),      metricsHandler_(collectables_, *exposerRegistry_) {  registerCollectable(exposerRegistry_);  server_.addHandler("/metrics", &metricsHandler_);}void Exposer::registerCollectable(    const std::weak_ptr<Collectable>& collectable) {  collectables_.push_back(collectable);}std::vector<io::prometheus::client::MetricFamily>MetricsHandler::collectMetrics() const {  auto collectedMetrics = std::vector<io::prometheus::client::MetricFamily>{};  for (auto&& wcollectable : collectables_) {    auto collectable = wcollectable.lock();    if (!collectable) {      continue;    }    for (auto metric : collectable->collect()) {      collectedMetrics.push_back(metric);    }  }  return collectedMetrics;}}
 |