123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- // Copyright (c) 2016 Arista Networks, Inc.
- // Use of this source code is governed by the Apache License 2.0
- // that can be found in the COPYING file.
- package producer
- import (
- "encoding/json"
- "strings"
- "sync"
- "testing"
- "notabug.org/themusicgod1/goarista/kafka/openconfig"
- "notabug.org/themusicgod1/goarista/test"
- "github.com/Shopify/sarama"
- "github.com/golang/protobuf/proto"
- pb "github.com/openconfig/reference/rpc/openconfig"
- )
- type mockAsyncProducer struct {
- input chan *sarama.ProducerMessage
- successes chan *sarama.ProducerMessage
- errors chan *sarama.ProducerError
- }
- func newMockAsyncProducer() *mockAsyncProducer {
- return &mockAsyncProducer{
- input: make(chan *sarama.ProducerMessage),
- successes: make(chan *sarama.ProducerMessage),
- errors: make(chan *sarama.ProducerError)}
- }
- func (p *mockAsyncProducer) AsyncClose() {
- panic("Not implemented")
- }
- func (p *mockAsyncProducer) Close() error {
- close(p.successes)
- close(p.errors)
- return nil
- }
- func (p *mockAsyncProducer) Input() chan<- *sarama.ProducerMessage {
- return p.input
- }
- func (p *mockAsyncProducer) Successes() <-chan *sarama.ProducerMessage {
- return p.successes
- }
- func (p *mockAsyncProducer) Errors() <-chan *sarama.ProducerError {
- return p.errors
- }
- func newPath(path string) *pb.Path {
- if path == "" {
- return nil
- }
- return &pb.Path{Element: strings.Split(path, "/")}
- }
- func TestKafkaProducer(t *testing.T) {
- mock := newMockAsyncProducer()
- toDB := make(chan proto.Message)
- topic := "occlient"
- systemID := "Foobar"
- toDBProducer := &producer{
- notifsChan: toDB,
- kafkaProducer: mock,
- encoder: openconfig.NewEncoder(topic, sarama.StringEncoder(systemID), ""),
- done: make(chan struct{}),
- wg: sync.WaitGroup{},
- }
- toDBProducer.Start()
- response := &pb.SubscribeResponse{
- Response: &pb.SubscribeResponse_Update{
- Update: &pb.Notification{
- Timestamp: 0,
- Prefix: newPath("/foo/bar"),
- Update: []*pb.Update{},
- },
- },
- }
- document := map[string]interface{}{
- "timestamp": int64(0),
- "update": map[string]interface{}{
- "": map[string]interface{}{
- "foo": map[string]interface{}{
- "bar": map[string]interface{}{},
- },
- },
- },
- }
- toDB <- response
- kafkaMessage := <-mock.input
- if kafkaMessage.Topic != topic {
- t.Errorf("Unexpected Topic: %s, expecting %s", kafkaMessage.Topic, topic)
- }
- key, err := kafkaMessage.Key.Encode()
- if err != nil {
- t.Fatalf("Error encoding key: %s", err)
- }
- if string(key) != systemID {
- t.Errorf("Kafka message didn't have expected key: %s, expecting %s", string(key), systemID)
- }
- valueBytes, err := kafkaMessage.Value.Encode()
- if err != nil {
- t.Fatalf("Error encoding value: %s", err)
- }
- var result interface{}
- err = json.Unmarshal(valueBytes, &result)
- if err != nil {
- t.Errorf("Error decoding into JSON: %s", err)
- }
- if !test.DeepEqual(document["update"], result.(map[string]interface{})["update"]) {
- t.Errorf("Protobuf sent from Kafka Producer does not match original.\nOriginal: %v\nNew:%v",
- document, result)
- }
- toDBProducer.Stop()
- }
- type mockMsg struct{}
- func (m mockMsg) Reset() {}
- func (m mockMsg) String() string { return "" }
- func (m mockMsg) ProtoMessage() {}
- func TestProducerStartStop(t *testing.T) {
- // this test checks that Start() followed by Stop() doesn't cause any race conditions.
- mock := newMockAsyncProducer()
- toDB := make(chan proto.Message)
- topic := "occlient"
- systemID := "Foobar"
- p := &producer{
- notifsChan: toDB,
- kafkaProducer: mock,
- encoder: openconfig.NewEncoder(topic, sarama.StringEncoder(systemID), ""),
- done: make(chan struct{}),
- }
- msg := &pb.SubscribeResponse{
- Response: &pb.SubscribeResponse_Update{
- Update: &pb.Notification{
- Timestamp: 0,
- Prefix: newPath("/foo/bar"),
- Update: []*pb.Update{},
- },
- },
- }
- done := make(chan struct{})
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- for {
- select {
- case <-mock.input:
- case <-done:
- return
- }
- }
- }()
- wg.Add(1)
- go func() {
- defer wg.Done()
- for {
- select {
- case <-done:
- return
- default:
- }
- p.Write(msg)
- }
- }()
- p.Start()
- p.Write(msg)
- p.Stop()
- close(done)
- wg.Wait()
- }
|