producer_test.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. // Copyright (c) 2016 Arista Networks, Inc.
  2. // Use of this source code is governed by the Apache License 2.0
  3. // that can be found in the COPYING file.
  4. package producer
  5. import (
  6. "encoding/json"
  7. "strings"
  8. "sync"
  9. "testing"
  10. "notabug.org/themusicgod1/goarista/kafka/openconfig"
  11. "notabug.org/themusicgod1/goarista/test"
  12. "github.com/Shopify/sarama"
  13. "github.com/golang/protobuf/proto"
  14. pb "github.com/openconfig/reference/rpc/openconfig"
  15. )
  16. type mockAsyncProducer struct {
  17. input chan *sarama.ProducerMessage
  18. successes chan *sarama.ProducerMessage
  19. errors chan *sarama.ProducerError
  20. }
  21. func newMockAsyncProducer() *mockAsyncProducer {
  22. return &mockAsyncProducer{
  23. input: make(chan *sarama.ProducerMessage),
  24. successes: make(chan *sarama.ProducerMessage),
  25. errors: make(chan *sarama.ProducerError)}
  26. }
  27. func (p *mockAsyncProducer) AsyncClose() {
  28. panic("Not implemented")
  29. }
  30. func (p *mockAsyncProducer) Close() error {
  31. close(p.successes)
  32. close(p.errors)
  33. return nil
  34. }
  35. func (p *mockAsyncProducer) Input() chan<- *sarama.ProducerMessage {
  36. return p.input
  37. }
  38. func (p *mockAsyncProducer) Successes() <-chan *sarama.ProducerMessage {
  39. return p.successes
  40. }
  41. func (p *mockAsyncProducer) Errors() <-chan *sarama.ProducerError {
  42. return p.errors
  43. }
  44. func newPath(path string) *pb.Path {
  45. if path == "" {
  46. return nil
  47. }
  48. return &pb.Path{Element: strings.Split(path, "/")}
  49. }
  50. func TestKafkaProducer(t *testing.T) {
  51. mock := newMockAsyncProducer()
  52. toDB := make(chan proto.Message)
  53. topic := "occlient"
  54. systemID := "Foobar"
  55. toDBProducer := &producer{
  56. notifsChan: toDB,
  57. kafkaProducer: mock,
  58. encoder: openconfig.NewEncoder(topic, sarama.StringEncoder(systemID), ""),
  59. done: make(chan struct{}),
  60. wg: sync.WaitGroup{},
  61. }
  62. toDBProducer.Start()
  63. response := &pb.SubscribeResponse{
  64. Response: &pb.SubscribeResponse_Update{
  65. Update: &pb.Notification{
  66. Timestamp: 0,
  67. Prefix: newPath("/foo/bar"),
  68. Update: []*pb.Update{},
  69. },
  70. },
  71. }
  72. document := map[string]interface{}{
  73. "timestamp": int64(0),
  74. "update": map[string]interface{}{
  75. "": map[string]interface{}{
  76. "foo": map[string]interface{}{
  77. "bar": map[string]interface{}{},
  78. },
  79. },
  80. },
  81. }
  82. toDB <- response
  83. kafkaMessage := <-mock.input
  84. if kafkaMessage.Topic != topic {
  85. t.Errorf("Unexpected Topic: %s, expecting %s", kafkaMessage.Topic, topic)
  86. }
  87. key, err := kafkaMessage.Key.Encode()
  88. if err != nil {
  89. t.Fatalf("Error encoding key: %s", err)
  90. }
  91. if string(key) != systemID {
  92. t.Errorf("Kafka message didn't have expected key: %s, expecting %s", string(key), systemID)
  93. }
  94. valueBytes, err := kafkaMessage.Value.Encode()
  95. if err != nil {
  96. t.Fatalf("Error encoding value: %s", err)
  97. }
  98. var result interface{}
  99. err = json.Unmarshal(valueBytes, &result)
  100. if err != nil {
  101. t.Errorf("Error decoding into JSON: %s", err)
  102. }
  103. if !test.DeepEqual(document["update"], result.(map[string]interface{})["update"]) {
  104. t.Errorf("Protobuf sent from Kafka Producer does not match original.\nOriginal: %v\nNew:%v",
  105. document, result)
  106. }
  107. toDBProducer.Stop()
  108. }
  109. type mockMsg struct{}
  110. func (m mockMsg) Reset() {}
  111. func (m mockMsg) String() string { return "" }
  112. func (m mockMsg) ProtoMessage() {}
  113. func TestProducerStartStop(t *testing.T) {
  114. // this test checks that Start() followed by Stop() doesn't cause any race conditions.
  115. mock := newMockAsyncProducer()
  116. toDB := make(chan proto.Message)
  117. topic := "occlient"
  118. systemID := "Foobar"
  119. p := &producer{
  120. notifsChan: toDB,
  121. kafkaProducer: mock,
  122. encoder: openconfig.NewEncoder(topic, sarama.StringEncoder(systemID), ""),
  123. done: make(chan struct{}),
  124. }
  125. msg := &pb.SubscribeResponse{
  126. Response: &pb.SubscribeResponse_Update{
  127. Update: &pb.Notification{
  128. Timestamp: 0,
  129. Prefix: newPath("/foo/bar"),
  130. Update: []*pb.Update{},
  131. },
  132. },
  133. }
  134. done := make(chan struct{})
  135. var wg sync.WaitGroup
  136. wg.Add(1)
  137. go func() {
  138. defer wg.Done()
  139. for {
  140. select {
  141. case <-mock.input:
  142. case <-done:
  143. return
  144. }
  145. }
  146. }()
  147. wg.Add(1)
  148. go func() {
  149. defer wg.Done()
  150. for {
  151. select {
  152. case <-done:
  153. return
  154. default:
  155. }
  156. p.Write(msg)
  157. }
  158. }()
  159. p.Start()
  160. p.Write(msg)
  161. p.Stop()
  162. close(done)
  163. wg.Wait()
  164. }