StatsClient.cpp 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. /*
  2. * Copyright 2005 - 2016 Zarafa and its licensors
  3. *
  4. * This program is free software: you can redistribute it and/or modify
  5. * it under the terms of the GNU Affero General Public License, version 3,
  6. * as published by the Free Software Foundation.
  7. *
  8. * This program is distributed in the hope that it will be useful,
  9. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. * GNU Affero General Public License for more details.
  12. *
  13. * You should have received a copy of the GNU Affero General Public License
  14. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  15. *
  16. */
  17. #include <kopano/platform.h>
  18. #include <cerrno>
  19. #include <cstdio>
  20. #include <cstdlib>
  21. #include <cstring>
  22. #include <ctime>
  23. #include <unistd.h>
  24. #include <kopano/lockhelper.hpp>
  25. #include "StatsClient.h"
  26. #include "TmpPath.h"
  27. namespace KC {
  28. static void submitThreadDo(void *p)
  29. {
  30. auto psc = static_cast<StatsClient *>(p);
  31. psc -> getLogger() -> Log(EC_LOGLEVEL_DEBUG, "Push data");
  32. time_t now = time(NULL);
  33. scoped_lock l_map(psc->mapsLock);
  34. for (const auto &it : psc->countsMapDouble)
  35. psc->submit(it.first, now, it.second);
  36. psc->countsMapDouble.clear();
  37. for (const auto &it : psc->countsMapInt64)
  38. psc->submit(it.first, now, it.second);
  39. psc->countsMapInt64.clear();
  40. }
  41. static void *submitThread(void *p)
  42. {
  43. auto psc = static_cast<StatsClient *>(p);
  44. psc -> getLogger() -> Log(EC_LOGLEVEL_DEBUG, "Submit thread started");
  45. pthread_cleanup_push(submitThreadDo, p);
  46. while(!psc -> terminate) {
  47. sleep(300);
  48. submitThreadDo(p);
  49. }
  50. pthread_cleanup_pop(1);
  51. psc -> getLogger() -> Log(EC_LOGLEVEL_DEBUG, "Submit thread stopping");
  52. return NULL;
  53. }
  54. StatsClient::StatsClient(ECLogger *l) :
  55. logger(l)
  56. {
  57. memset(&addr, 0, sizeof(addr));
  58. memset(&countsSubmitThread, 0, sizeof(countsSubmitThread));
  59. }
  60. int StatsClient::startup(const std::string &collectorSocket)
  61. {
  62. int ret = -1;
  63. fd = socket(AF_UNIX, SOCK_DGRAM, 0);
  64. if (fd == -1) {
  65. logger -> Log(EC_LOGLEVEL_ERROR, "StatsClient cannot create socket: %s", strerror(errno));
  66. return -errno; /* maybe log a bit */
  67. }
  68. srand(time(NULL));
  69. logger -> Log(EC_LOGLEVEL_DEBUG, "StatsClient binding socket");
  70. for (unsigned int retry = 0; retry < 3; ++retry) {
  71. struct sockaddr_un laddr;
  72. memset(&laddr, 0, sizeof(laddr));
  73. laddr.sun_family = AF_UNIX;
  74. int ret = snprintf(laddr.sun_path, sizeof(laddr.sun_path), "%s/.%x%x.sock", TmpPath::getInstance() -> getTempPath().c_str(), rand(), rand());
  75. if (ret >= 0 &&
  76. static_cast<size_t>(ret) >= sizeof(laddr.sun_path)) {
  77. ec_log_err("%s: Random path too long (%s...) for AF_UNIX socket",
  78. __func__, laddr.sun_path);
  79. return -ENAMETOOLONG;
  80. }
  81. ret = bind(fd, reinterpret_cast<const struct sockaddr *>(&laddr),
  82. sizeof(laddr));
  83. if (ret == 0) {
  84. logger -> Log(EC_LOGLEVEL_DEBUG, "StatsClient bound socket to %s", laddr.sun_path);
  85. unlink(laddr.sun_path);
  86. break;
  87. }
  88. ret = -errno;
  89. ec_log_err("StatsClient bind %s: %s", laddr.sun_path, strerror(errno));
  90. if (ret == -EADDRINUSE)
  91. return ret;
  92. }
  93. if (ret != 0)
  94. return ret;
  95. memset(&addr, 0, sizeof(addr));
  96. addr.sun_family = AF_UNIX;
  97. ret = snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", collectorSocket.c_str());
  98. if (ret >= 0 && static_cast<size_t>(ret) >= sizeof(addr.sun_path)) {
  99. ec_log_err("%s: Path \"%s\" too long for AF_UNIX socket",
  100. __func__, collectorSocket.c_str());
  101. return -ENAMETOOLONG;
  102. }
  103. addr_len = sizeof(addr);
  104. if (pthread_create(&countsSubmitThread, NULL, submitThread, this) == 0)
  105. thread_running = true;
  106. logger -> Log(EC_LOGLEVEL_DEBUG, "StatsClient thread started");
  107. return 0;
  108. }
  109. StatsClient::~StatsClient() {
  110. logger -> Log(EC_LOGLEVEL_DEBUG, "StatsClient terminating");
  111. terminate = true;
  112. if (thread_running) {
  113. // interrupt sleep()
  114. pthread_cancel(countsSubmitThread);
  115. void *dummy = NULL;
  116. pthread_join(countsSubmitThread, &dummy);
  117. }
  118. close(fd);
  119. logger -> Log(EC_LOGLEVEL_DEBUG, "StatsClient terminated");
  120. }
  121. void StatsClient::submit(const std::string & key, const time_t ts, const double value) {
  122. if (fd == -1)
  123. return;
  124. char msg[4096];
  125. int len = snprintf(msg, sizeof msg, "ADD float %s %ld %f", key.c_str(), ts, value);
  126. // in theory snprintf can return -1
  127. if (len > 0) {
  128. int rc = sendto(fd, msg, len, 0, (struct sockaddr *)&addr, addr_len);
  129. if (rc == -1)
  130. logger -> Log(EC_LOGLEVEL_DEBUG, "StatsClient submit float failed: %s", strerror(errno));
  131. }
  132. }
  133. void StatsClient::submit(const std::string & key, const time_t ts, const int64_t value) {
  134. if (fd == -1)
  135. return;
  136. char msg[4096];
  137. int len = snprintf(msg, sizeof msg, "ADD int %s %ld %zd",
  138. key.c_str(), static_cast<long>(ts),
  139. static_cast<size_t>(value));
  140. // in theory snprintf can return -1
  141. if (len > 0) {
  142. int rc = sendto(fd, msg, len, 0, (struct sockaddr *)&addr, addr_len);
  143. if (rc == -1)
  144. logger -> Log(EC_LOGLEVEL_DEBUG, "StatsClient submit int failed: %s", strerror(errno));
  145. }
  146. }
  147. void StatsClient::countInc(const std::string & key, const std::string & key_sub) {
  148. countAdd(key, key_sub, int64_t(1));
  149. }
  150. void StatsClient::countAdd(const std::string & key, const std::string & key_sub, const double n) {
  151. std::string kp = key + " " + key_sub;
  152. scoped_lock l_map(mapsLock);
  153. auto doubleIterator = countsMapDouble.find(kp);
  154. if (doubleIterator == countsMapDouble.cend())
  155. countsMapDouble.insert(std::pair<std::string, double>(kp, n));
  156. else
  157. doubleIterator -> second += n;
  158. }
  159. void StatsClient::countAdd(const std::string & key, const std::string & key_sub, const int64_t n) {
  160. std::string kp = key + " " + key_sub;
  161. scoped_lock l_map(mapsLock);
  162. auto int64Iterator = countsMapInt64.find(kp);
  163. if (int64Iterator == countsMapInt64.cend())
  164. countsMapInt64.insert(std::pair<std::string, int64_t>(kp, n));
  165. else
  166. int64Iterator -> second += n;
  167. }
  168. } /* namespace */