123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- package redis
- import (
- "fmt"
- "time"
- )
- // Not thread-safe.
- type PubSub struct {
- *baseClient
- }
- func (c *Client) PubSub() *PubSub {
- return &PubSub{
- baseClient: &baseClient{
- opt: c.opt,
- connPool: newSingleConnPool(c.connPool, false),
- },
- }
- }
- func (c *Client) Publish(channel, message string) *IntCmd {
- req := NewIntCmd("PUBLISH", channel, message)
- c.Process(req)
- return req
- }
- type Message struct {
- Channel string
- Payload string
- }
- func (m *Message) String() string {
- return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
- }
- type PMessage struct {
- Channel string
- Pattern string
- Payload string
- }
- func (m *PMessage) String() string {
- return fmt.Sprintf("PMessage<%s: %s>", m.Channel, m.Payload)
- }
- type Subscription struct {
- Kind string
- Channel string
- Count int
- }
- func (m *Subscription) String() string {
- return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
- }
- func (c *PubSub) Receive() (interface{}, error) {
- return c.ReceiveTimeout(0)
- }
- func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
- cn, err := c.conn()
- if err != nil {
- return nil, err
- }
- cn.readTimeout = timeout
- cmd := NewSliceCmd()
- if err := cmd.parseReply(cn.rd); err != nil {
- return nil, err
- }
- reply := cmd.Val()
- msgName := reply[0].(string)
- switch msgName {
- case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
- return &Subscription{
- Kind: msgName,
- Channel: reply[1].(string),
- Count: int(reply[2].(int64)),
- }, nil
- case "message":
- return &Message{
- Channel: reply[1].(string),
- Payload: reply[2].(string),
- }, nil
- case "pmessage":
- return &PMessage{
- Pattern: reply[1].(string),
- Channel: reply[2].(string),
- Payload: reply[3].(string),
- }, nil
- }
- return nil, fmt.Errorf("redis: unsupported message name: %q", msgName)
- }
- func (c *PubSub) subscribe(cmd string, channels ...string) error {
- cn, err := c.conn()
- if err != nil {
- return err
- }
- args := append([]string{cmd}, channels...)
- req := NewSliceCmd(args...)
- return c.writeCmd(cn, req)
- }
- func (c *PubSub) Subscribe(channels ...string) error {
- return c.subscribe("SUBSCRIBE", channels...)
- }
- func (c *PubSub) PSubscribe(patterns ...string) error {
- return c.subscribe("PSUBSCRIBE", patterns...)
- }
- func (c *PubSub) unsubscribe(cmd string, channels ...string) error {
- cn, err := c.conn()
- if err != nil {
- return err
- }
- args := append([]string{cmd}, channels...)
- req := NewSliceCmd(args...)
- return c.writeCmd(cn, req)
- }
- func (c *PubSub) Unsubscribe(channels ...string) error {
- return c.unsubscribe("UNSUBSCRIBE", channels...)
- }
- func (c *PubSub) PUnsubscribe(patterns ...string) error {
- return c.unsubscribe("PUNSUBSCRIBE", patterns...)
- }
|