gateway.cc 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. #include "prometheus/gateway.h"
  2. #include "prometheus/client_metric.h"
  3. #include "prometheus/serializer.h"
  4. #include "prometheus/text_serializer.h"
  5. #include <curl/curl.h>
  6. namespace prometheus {
  7. static const char CONTENT_TYPE[] =
  8. "Content-Type: text/plain; version=0.0.4; charset=utf-8";
  9. Gateway::Gateway(const std::string& uri, const std::string jobname,
  10. const Labels& labels, const std::string username,
  11. const std::string password) {
  12. /* In windows, this will init the winsock stuff */
  13. curl_global_init(CURL_GLOBAL_ALL);
  14. std::stringstream jobUriStream;
  15. jobUriStream << uri << "/metrics/job/" << jobname;
  16. jobUri_ = jobUriStream.str();
  17. if (!username.empty()) {
  18. auth_ = username + ":" + password;
  19. }
  20. std::stringstream labelStream;
  21. for (auto& label : labels) {
  22. labelStream << "/" << label.first << "/" << label.second;
  23. }
  24. labels_ = labelStream.str();
  25. }
  26. Gateway::~Gateway() { curl_global_cleanup(); }
  27. const Gateway::Labels Gateway::GetInstanceLabel(std::string hostname) {
  28. if (hostname.empty()) {
  29. return Gateway::Labels{};
  30. }
  31. return Gateway::Labels{{"instance", hostname}};
  32. }
  33. void Gateway::RegisterCollectable(const std::weak_ptr<Collectable>& collectable,
  34. const Labels* labels) {
  35. std::stringstream ss;
  36. if (labels) {
  37. for (auto& label : *labels) {
  38. ss << "/" << label.first << "/" << label.second;
  39. }
  40. }
  41. collectables_.push_back(std::make_pair(collectable, ss.str()));
  42. }
  43. int Gateway::performHttpRequest(HttpMethod method, const std::string& uri,
  44. const std::string& body) const {
  45. auto curl = curl_easy_init();
  46. if (!curl) {
  47. return -CURLE_FAILED_INIT;
  48. }
  49. curl_easy_setopt(curl, CURLOPT_URL, uri.c_str());
  50. curl_slist* header_chunk = nullptr;
  51. if (!body.empty()) {
  52. header_chunk = curl_slist_append(nullptr, CONTENT_TYPE);
  53. curl_easy_setopt(curl, CURLOPT_HTTPHEADER, header_chunk);
  54. curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, body.size());
  55. curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.data());
  56. }
  57. if (!auth_.empty()) {
  58. curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
  59. curl_easy_setopt(curl, CURLOPT_USERPWD, auth_.c_str());
  60. }
  61. switch (method) {
  62. case HttpMethod::Post:
  63. curl_easy_setopt(curl, CURLOPT_HTTPGET, 0L);
  64. curl_easy_setopt(curl, CURLOPT_NOBODY, 0L);
  65. break;
  66. case HttpMethod::Put:
  67. curl_easy_setopt(curl, CURLOPT_NOBODY, 0L);
  68. curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
  69. break;
  70. case HttpMethod::Delete:
  71. curl_easy_setopt(curl, CURLOPT_HTTPGET, 0L);
  72. curl_easy_setopt(curl, CURLOPT_NOBODY, 0L);
  73. curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
  74. break;
  75. }
  76. auto curl_error = curl_easy_perform(curl);
  77. long response_code;
  78. curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
  79. curl_easy_cleanup(curl);
  80. curl_slist_free_all(header_chunk);
  81. if (curl_error != CURLE_OK) {
  82. return -curl_error;
  83. }
  84. return response_code;
  85. }
  86. std::string Gateway::getUri(const CollectableEntry& collectable) const {
  87. std::stringstream uri;
  88. uri << jobUri_ << labels_ << collectable.second;
  89. return uri.str();
  90. }
  91. int Gateway::push(HttpMethod method) {
  92. const auto serializer = TextSerializer{};
  93. for (auto& wcollectable : collectables_) {
  94. auto collectable = wcollectable.first.lock();
  95. if (!collectable) {
  96. continue;
  97. }
  98. auto metrics = collectable->Collect();
  99. auto body = serializer.Serialize(metrics);
  100. auto uri = getUri(wcollectable);
  101. auto status_code = performHttpRequest(method, uri, body);
  102. if (status_code >= 400) {
  103. return status_code;
  104. }
  105. }
  106. return 200;
  107. }
  108. std::future<int> Gateway::async_push(HttpMethod method) {
  109. const auto serializer = TextSerializer{};
  110. std::vector<std::future<int>> futures;
  111. for (auto& wcollectable : collectables_) {
  112. auto collectable = wcollectable.first.lock();
  113. if (!collectable) {
  114. continue;
  115. }
  116. auto metrics = collectable->Collect();
  117. auto body = serializer.Serialize(metrics);
  118. auto uri = getUri(wcollectable);
  119. futures.push_back(std::async(std::launch::async, [&] {
  120. return performHttpRequest(method, uri, body);
  121. }));
  122. }
  123. return std::async(std::launch::async, [&] {
  124. auto final_status_code = 200;
  125. for (auto& future : futures) {
  126. auto status_code = future.get();
  127. if (status_code >= 400) {
  128. final_status_code = status_code;
  129. }
  130. }
  131. return final_status_code;
  132. });
  133. }
  134. int Gateway::Delete() {
  135. return performHttpRequest(HttpMethod::Delete, jobUri_, {});
  136. }
  137. std::future<int> Gateway::AsyncDelete() {
  138. return std::async(std::launch::async, [&] {
  139. return Delete();
  140. });
  141. }
  142. } // namespace prometheus