meter.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. package metrics
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // Meters count events to produce exponentially-weighted moving average rates
  7. // at one-, five-, and fifteen-minutes and a mean rate.
  8. type Meter interface {
  9. Count() int64
  10. Mark(int64)
  11. Rate1() float64
  12. Rate5() float64
  13. Rate15() float64
  14. RateMean() float64
  15. Snapshot() Meter
  16. Stop()
  17. }
  18. // GetOrRegisterMeter returns an existing Meter or constructs and registers a
  19. // new StandardMeter.
  20. // Be sure to unregister the meter from the registry once it is of no use to
  21. // allow for garbage collection.
  22. func GetOrRegisterMeter(name string, r Registry) Meter {
  23. if nil == r {
  24. r = DefaultRegistry
  25. }
  26. return r.GetOrRegister(name, NewMeter).(Meter)
  27. }
  28. // NewMeter constructs a new StandardMeter and launches a goroutine.
  29. // Be sure to call Stop() once the meter is of no use to allow for garbage collection.
  30. func NewMeter() Meter {
  31. if !Enabled {
  32. return NilMeter{}
  33. }
  34. m := newStandardMeter()
  35. arbiter.Lock()
  36. defer arbiter.Unlock()
  37. arbiter.meters[m] = struct{}{}
  38. if !arbiter.started {
  39. arbiter.started = true
  40. go arbiter.tick()
  41. }
  42. return m
  43. }
  44. // NewMeter constructs and registers a new StandardMeter and launches a
  45. // goroutine.
  46. // Be sure to unregister the meter from the registry once it is of no use to
  47. // allow for garbage collection.
  48. func NewRegisteredMeter(name string, r Registry) Meter {
  49. c := NewMeter()
  50. if nil == r {
  51. r = DefaultRegistry
  52. }
  53. r.Register(name, c)
  54. return c
  55. }
  56. // MeterSnapshot is a read-only copy of another Meter.
  57. type MeterSnapshot struct {
  58. count int64
  59. rate1, rate5, rate15, rateMean float64
  60. }
  61. // Count returns the count of events at the time the snapshot was taken.
  62. func (m *MeterSnapshot) Count() int64 { return m.count }
  63. // Mark panics.
  64. func (*MeterSnapshot) Mark(n int64) {
  65. panic("Mark called on a MeterSnapshot")
  66. }
  67. // Rate1 returns the one-minute moving average rate of events per second at the
  68. // time the snapshot was taken.
  69. func (m *MeterSnapshot) Rate1() float64 { return m.rate1 }
  70. // Rate5 returns the five-minute moving average rate of events per second at
  71. // the time the snapshot was taken.
  72. func (m *MeterSnapshot) Rate5() float64 { return m.rate5 }
  73. // Rate15 returns the fifteen-minute moving average rate of events per second
  74. // at the time the snapshot was taken.
  75. func (m *MeterSnapshot) Rate15() float64 { return m.rate15 }
  76. // RateMean returns the meter's mean rate of events per second at the time the
  77. // snapshot was taken.
  78. func (m *MeterSnapshot) RateMean() float64 { return m.rateMean }
  79. // Snapshot returns the snapshot.
  80. func (m *MeterSnapshot) Snapshot() Meter { return m }
  81. // Stop is a no-op.
  82. func (m *MeterSnapshot) Stop() {}
  83. // NilMeter is a no-op Meter.
  84. type NilMeter struct{}
  85. // Count is a no-op.
  86. func (NilMeter) Count() int64 { return 0 }
  87. // Mark is a no-op.
  88. func (NilMeter) Mark(n int64) {}
  89. // Rate1 is a no-op.
  90. func (NilMeter) Rate1() float64 { return 0.0 }
  91. // Rate5 is a no-op.
  92. func (NilMeter) Rate5() float64 { return 0.0 }
  93. // Rate15is a no-op.
  94. func (NilMeter) Rate15() float64 { return 0.0 }
  95. // RateMean is a no-op.
  96. func (NilMeter) RateMean() float64 { return 0.0 }
  97. // Snapshot is a no-op.
  98. func (NilMeter) Snapshot() Meter { return NilMeter{} }
  99. // Stop is a no-op.
  100. func (NilMeter) Stop() {}
  101. // StandardMeter is the standard implementation of a Meter.
  102. type StandardMeter struct {
  103. lock sync.RWMutex
  104. snapshot *MeterSnapshot
  105. a1, a5, a15 EWMA
  106. startTime time.Time
  107. stopped bool
  108. }
  109. func newStandardMeter() *StandardMeter {
  110. return &StandardMeter{
  111. snapshot: &MeterSnapshot{},
  112. a1: NewEWMA1(),
  113. a5: NewEWMA5(),
  114. a15: NewEWMA15(),
  115. startTime: time.Now(),
  116. }
  117. }
  118. // Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
  119. func (m *StandardMeter) Stop() {
  120. m.lock.Lock()
  121. stopped := m.stopped
  122. m.stopped = true
  123. m.lock.Unlock()
  124. if !stopped {
  125. arbiter.Lock()
  126. delete(arbiter.meters, m)
  127. arbiter.Unlock()
  128. }
  129. }
  130. // Count returns the number of events recorded.
  131. func (m *StandardMeter) Count() int64 {
  132. m.lock.RLock()
  133. count := m.snapshot.count
  134. m.lock.RUnlock()
  135. return count
  136. }
  137. // Mark records the occurrence of n events.
  138. func (m *StandardMeter) Mark(n int64) {
  139. m.lock.Lock()
  140. defer m.lock.Unlock()
  141. if m.stopped {
  142. return
  143. }
  144. m.snapshot.count += n
  145. m.a1.Update(n)
  146. m.a5.Update(n)
  147. m.a15.Update(n)
  148. m.updateSnapshot()
  149. }
  150. // Rate1 returns the one-minute moving average rate of events per second.
  151. func (m *StandardMeter) Rate1() float64 {
  152. m.lock.RLock()
  153. rate1 := m.snapshot.rate1
  154. m.lock.RUnlock()
  155. return rate1
  156. }
  157. // Rate5 returns the five-minute moving average rate of events per second.
  158. func (m *StandardMeter) Rate5() float64 {
  159. m.lock.RLock()
  160. rate5 := m.snapshot.rate5
  161. m.lock.RUnlock()
  162. return rate5
  163. }
  164. // Rate15 returns the fifteen-minute moving average rate of events per second.
  165. func (m *StandardMeter) Rate15() float64 {
  166. m.lock.RLock()
  167. rate15 := m.snapshot.rate15
  168. m.lock.RUnlock()
  169. return rate15
  170. }
  171. // RateMean returns the meter's mean rate of events per second.
  172. func (m *StandardMeter) RateMean() float64 {
  173. m.lock.RLock()
  174. rateMean := m.snapshot.rateMean
  175. m.lock.RUnlock()
  176. return rateMean
  177. }
  178. // Snapshot returns a read-only copy of the meter.
  179. func (m *StandardMeter) Snapshot() Meter {
  180. m.lock.RLock()
  181. snapshot := *m.snapshot
  182. m.lock.RUnlock()
  183. return &snapshot
  184. }
  185. func (m *StandardMeter) updateSnapshot() {
  186. // should run with write lock held on m.lock
  187. snapshot := m.snapshot
  188. snapshot.rate1 = m.a1.Rate()
  189. snapshot.rate5 = m.a5.Rate()
  190. snapshot.rate15 = m.a15.Rate()
  191. snapshot.rateMean = float64(snapshot.count) / time.Since(m.startTime).Seconds()
  192. }
  193. func (m *StandardMeter) tick() {
  194. m.lock.Lock()
  195. defer m.lock.Unlock()
  196. m.a1.Tick()
  197. m.a5.Tick()
  198. m.a15.Tick()
  199. m.updateSnapshot()
  200. }
  201. // meterArbiter ticks meters every 5s from a single goroutine.
  202. // meters are references in a set for future stopping.
  203. type meterArbiter struct {
  204. sync.RWMutex
  205. started bool
  206. meters map[*StandardMeter]struct{}
  207. ticker *time.Ticker
  208. }
  209. var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[*StandardMeter]struct{})}
  210. // Ticks meters on the scheduled interval
  211. func (ma *meterArbiter) tick() {
  212. for range ma.ticker.C {
  213. ma.tickMeters()
  214. }
  215. }
  216. func (ma *meterArbiter) tickMeters() {
  217. ma.RLock()
  218. defer ma.RUnlock()
  219. for meter := range ma.meters {
  220. meter.tick()
  221. }
  222. }