client.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. // Copyright (c) 2016 Arista Networks, Inc.
  2. // Use of this source code is governed by the Apache License 2.0
  3. // that can be found in the COPYING file.
  4. // Package lanz implements a LANZ client that will listen to notofications from LANZ streaming
  5. // server and will decode them and send them as a protobuf over a channel to a receiver.
  6. package lanz
  7. import (
  8. "bufio"
  9. "encoding/binary"
  10. "io"
  11. "net"
  12. "time"
  13. pb "notabug.org/themusicgod1/goarista/lanz/proto"
  14. "notabug.org/themusicgod1/glog"
  15. "github.com/golang/protobuf/proto"
  16. )
  17. const (
  18. defaultConnectTimeout = 10 * time.Second
  19. defaultConnectBackoff = 30 * time.Second
  20. )
  21. // Client is the LANZ client interface.
  22. type Client interface {
  23. // Run is the main loop of the client.
  24. // It connects to the LANZ server and reads the notifications, decodes them
  25. // and sends them to the channel.
  26. // In case of disconnect, it will reconnect automatically.
  27. Run(ch chan<- *pb.LanzRecord)
  28. // Stops the client.
  29. Stop()
  30. }
  31. // ConnectReadCloser extends the io.ReadCloser interface with a Connect method.
  32. type ConnectReadCloser interface {
  33. io.ReadCloser
  34. // Connect connects to the address, returning an error if it fails.
  35. Connect() error
  36. }
  37. type client struct {
  38. addr string
  39. stopping bool
  40. timeout time.Duration
  41. backoff time.Duration
  42. conn ConnectReadCloser
  43. }
  44. // New creates a new client with default TCP connection to the LANZ server.
  45. func New(opts ...Option) Client {
  46. c := &client{
  47. stopping: false,
  48. timeout: defaultConnectTimeout,
  49. backoff: defaultConnectBackoff,
  50. }
  51. for _, opt := range opts {
  52. opt(c)
  53. }
  54. if c.conn == nil {
  55. if c.addr == "" {
  56. panic("Neither address, nor connector specified")
  57. }
  58. c.conn = &netConnector{
  59. addr: c.addr,
  60. timeout: c.timeout,
  61. backoff: c.backoff,
  62. }
  63. }
  64. return c
  65. }
  66. func (c *client) Run(ch chan<- *pb.LanzRecord) {
  67. for !c.stopping {
  68. if err := c.conn.Connect(); err != nil && !c.stopping {
  69. glog.V(1).Infof("Can't connect to LANZ server: %v", err)
  70. time.Sleep(c.backoff)
  71. continue
  72. }
  73. glog.V(1).Infof("Connected successfully to LANZ server: %v", c.addr)
  74. if err := c.read(bufio.NewReader(c.conn), ch); err != nil && !c.stopping {
  75. if err != io.EOF && err != io.ErrUnexpectedEOF {
  76. glog.Errorf("Error receiving LANZ events: %v", err)
  77. }
  78. c.conn.Close()
  79. time.Sleep(c.backoff)
  80. }
  81. }
  82. close(ch)
  83. }
  84. func (c *client) read(r *bufio.Reader, ch chan<- *pb.LanzRecord) error {
  85. for !c.stopping {
  86. len, err := binary.ReadUvarint(r)
  87. if err != nil {
  88. return err
  89. }
  90. buf := make([]byte, len)
  91. if _, err = io.ReadFull(r, buf); err != nil {
  92. return err
  93. }
  94. rec := &pb.LanzRecord{}
  95. if err = proto.Unmarshal(buf, rec); err != nil {
  96. return err
  97. }
  98. ch <- rec
  99. }
  100. return nil
  101. }
  102. func (c *client) Stop() {
  103. if c.stopping {
  104. return
  105. }
  106. c.stopping = true
  107. c.conn.Close()
  108. }
  109. type netConnector struct {
  110. net.Conn
  111. addr string
  112. timeout time.Duration
  113. backoff time.Duration
  114. }
  115. func (c *netConnector) Connect() (err error) {
  116. c.Conn, err = net.DialTimeout("tcp", c.addr, c.timeout)
  117. if err != nil {
  118. }
  119. return
  120. }