123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- #include <kopano/platform.h>
- #include <cerrno>
- #include <cstdio>
- #include <cstdlib>
- #include <cstring>
- #include <ctime>
- #include <unistd.h>
- #include <kopano/lockhelper.hpp>
- #include "StatsClient.h"
- #include "TmpPath.h"
- namespace KC {
- static void submitThreadDo(void *p)
- {
- auto psc = static_cast<StatsClient *>(p);
- psc -> getLogger() -> Log(EC_LOGLEVEL_DEBUG, "Push data");
- time_t now = time(NULL);
- scoped_lock l_map(psc->mapsLock);
- for (const auto &it : psc->countsMapDouble)
- psc->submit(it.first, now, it.second);
- psc->countsMapDouble.clear();
- for (const auto &it : psc->countsMapInt64)
- psc->submit(it.first, now, it.second);
- psc->countsMapInt64.clear();
- }
- static void *submitThread(void *p)
- {
- auto psc = static_cast<StatsClient *>(p);
- psc -> getLogger() -> Log(EC_LOGLEVEL_DEBUG, "Submit thread started");
- pthread_cleanup_push(submitThreadDo, p);
- while(!psc -> terminate) {
- sleep(300);
- submitThreadDo(p);
- }
- pthread_cleanup_pop(1);
- psc -> getLogger() -> Log(EC_LOGLEVEL_DEBUG, "Submit thread stopping");
- return NULL;
- }
- StatsClient::StatsClient(ECLogger *l) :
- logger(l)
- {
- memset(&addr, 0, sizeof(addr));
- memset(&countsSubmitThread, 0, sizeof(countsSubmitThread));
- }
- int StatsClient::startup(const std::string &collectorSocket)
- {
- int ret = -1;
- fd = socket(AF_UNIX, SOCK_DGRAM, 0);
- if (fd == -1) {
- logger -> Log(EC_LOGLEVEL_ERROR, "StatsClient cannot create socket: %s", strerror(errno));
- return -errno;
- }
- srand(time(NULL));
- logger -> Log(EC_LOGLEVEL_DEBUG, "StatsClient binding socket");
- for (unsigned int retry = 0; retry < 3; ++retry) {
- struct sockaddr_un laddr;
- memset(&laddr, 0, sizeof(laddr));
- laddr.sun_family = AF_UNIX;
- int ret = snprintf(laddr.sun_path, sizeof(laddr.sun_path), "%s/.%x%x.sock", TmpPath::getInstance() -> getTempPath().c_str(), rand(), rand());
- if (ret >= 0 &&
- static_cast<size_t>(ret) >= sizeof(laddr.sun_path)) {
- ec_log_err("%s: Random path too long (%s...) for AF_UNIX socket",
- __func__, laddr.sun_path);
- return -ENAMETOOLONG;
- }
- ret = bind(fd, reinterpret_cast<const struct sockaddr *>(&laddr),
- sizeof(laddr));
- if (ret == 0) {
- logger -> Log(EC_LOGLEVEL_DEBUG, "StatsClient bound socket to %s", laddr.sun_path);
- unlink(laddr.sun_path);
- break;
- }
- ret = -errno;
- ec_log_err("StatsClient bind %s: %s", laddr.sun_path, strerror(errno));
- if (ret == -EADDRINUSE)
- return ret;
- }
- if (ret != 0)
- return ret;
- memset(&addr, 0, sizeof(addr));
- addr.sun_family = AF_UNIX;
- ret = snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", collectorSocket.c_str());
- if (ret >= 0 && static_cast<size_t>(ret) >= sizeof(addr.sun_path)) {
- ec_log_err("%s: Path \"%s\" too long for AF_UNIX socket",
- __func__, collectorSocket.c_str());
- return -ENAMETOOLONG;
- }
- addr_len = sizeof(addr);
- if (pthread_create(&countsSubmitThread, NULL, submitThread, this) == 0)
- thread_running = true;
- logger -> Log(EC_LOGLEVEL_DEBUG, "StatsClient thread started");
- return 0;
- }
- StatsClient::~StatsClient() {
- logger -> Log(EC_LOGLEVEL_DEBUG, "StatsClient terminating");
- terminate = true;
- if (thread_running) {
-
- pthread_cancel(countsSubmitThread);
- void *dummy = NULL;
- pthread_join(countsSubmitThread, &dummy);
- }
- close(fd);
- logger -> Log(EC_LOGLEVEL_DEBUG, "StatsClient terminated");
- }
- void StatsClient::submit(const std::string & key, const time_t ts, const double value) {
- if (fd == -1)
- return;
- char msg[4096];
- int len = snprintf(msg, sizeof msg, "ADD float %s %ld %f", key.c_str(), ts, value);
-
- if (len > 0) {
- int rc = sendto(fd, msg, len, 0, (struct sockaddr *)&addr, addr_len);
- if (rc == -1)
- logger -> Log(EC_LOGLEVEL_DEBUG, "StatsClient submit float failed: %s", strerror(errno));
- }
- }
- void StatsClient::submit(const std::string & key, const time_t ts, const int64_t value) {
- if (fd == -1)
- return;
- char msg[4096];
- int len = snprintf(msg, sizeof msg, "ADD int %s %ld %zd",
- key.c_str(), static_cast<long>(ts),
- static_cast<size_t>(value));
-
- if (len > 0) {
- int rc = sendto(fd, msg, len, 0, (struct sockaddr *)&addr, addr_len);
- if (rc == -1)
- logger -> Log(EC_LOGLEVEL_DEBUG, "StatsClient submit int failed: %s", strerror(errno));
- }
- }
- void StatsClient::countInc(const std::string & key, const std::string & key_sub) {
- countAdd(key, key_sub, int64_t(1));
- }
- void StatsClient::countAdd(const std::string & key, const std::string & key_sub, const double n) {
- std::string kp = key + " " + key_sub;
- scoped_lock l_map(mapsLock);
- auto doubleIterator = countsMapDouble.find(kp);
- if (doubleIterator == countsMapDouble.cend())
- countsMapDouble.insert(std::pair<std::string, double>(kp, n));
- else
- doubleIterator -> second += n;
- }
- void StatsClient::countAdd(const std::string & key, const std::string & key_sub, const int64_t n) {
- std::string kp = key + " " + key_sub;
- scoped_lock l_map(mapsLock);
- auto int64Iterator = countsMapInt64.find(kp);
- if (int64Iterator == countsMapInt64.cend())
- countsMapInt64.insert(std::pair<std::string, int64_t>(kp, n));
- else
- int64Iterator -> second += n;
- }
- }
|