pubsub.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package redis
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. // Not thread-safe.
  7. type PubSub struct {
  8. *baseClient
  9. }
  10. func (c *Client) PubSub() *PubSub {
  11. return &PubSub{
  12. baseClient: &baseClient{
  13. opt: c.opt,
  14. connPool: newSingleConnPool(c.connPool, false),
  15. },
  16. }
  17. }
  18. func (c *Client) Publish(channel, message string) *IntCmd {
  19. req := NewIntCmd("PUBLISH", channel, message)
  20. c.Process(req)
  21. return req
  22. }
  23. type Message struct {
  24. Channel string
  25. Payload string
  26. }
  27. func (m *Message) String() string {
  28. return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
  29. }
  30. type PMessage struct {
  31. Channel string
  32. Pattern string
  33. Payload string
  34. }
  35. func (m *PMessage) String() string {
  36. return fmt.Sprintf("PMessage<%s: %s>", m.Channel, m.Payload)
  37. }
  38. type Subscription struct {
  39. Kind string
  40. Channel string
  41. Count int
  42. }
  43. func (m *Subscription) String() string {
  44. return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
  45. }
  46. func (c *PubSub) Receive() (interface{}, error) {
  47. return c.ReceiveTimeout(0)
  48. }
  49. func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
  50. cn, err := c.conn()
  51. if err != nil {
  52. return nil, err
  53. }
  54. cn.readTimeout = timeout
  55. cmd := NewSliceCmd()
  56. if err := cmd.parseReply(cn.rd); err != nil {
  57. return nil, err
  58. }
  59. reply := cmd.Val()
  60. msgName := reply[0].(string)
  61. switch msgName {
  62. case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
  63. return &Subscription{
  64. Kind: msgName,
  65. Channel: reply[1].(string),
  66. Count: int(reply[2].(int64)),
  67. }, nil
  68. case "message":
  69. return &Message{
  70. Channel: reply[1].(string),
  71. Payload: reply[2].(string),
  72. }, nil
  73. case "pmessage":
  74. return &PMessage{
  75. Pattern: reply[1].(string),
  76. Channel: reply[2].(string),
  77. Payload: reply[3].(string),
  78. }, nil
  79. }
  80. return nil, fmt.Errorf("redis: unsupported message name: %q", msgName)
  81. }
  82. func (c *PubSub) subscribe(cmd string, channels ...string) error {
  83. cn, err := c.conn()
  84. if err != nil {
  85. return err
  86. }
  87. args := append([]string{cmd}, channels...)
  88. req := NewSliceCmd(args...)
  89. return c.writeCmd(cn, req)
  90. }
  91. func (c *PubSub) Subscribe(channels ...string) error {
  92. return c.subscribe("SUBSCRIBE", channels...)
  93. }
  94. func (c *PubSub) PSubscribe(patterns ...string) error {
  95. return c.subscribe("PSUBSCRIBE", patterns...)
  96. }
  97. func (c *PubSub) unsubscribe(cmd string, channels ...string) error {
  98. cn, err := c.conn()
  99. if err != nil {
  100. return err
  101. }
  102. args := append([]string{cmd}, channels...)
  103. req := NewSliceCmd(args...)
  104. return c.writeCmd(cn, req)
  105. }
  106. func (c *PubSub) Unsubscribe(channels ...string) error {
  107. return c.unsubscribe("UNSUBSCRIBE", channels...)
  108. }
  109. func (c *PubSub) PUnsubscribe(patterns ...string) error {
  110. return c.unsubscribe("PUNSUBSCRIBE", patterns...)
  111. }