multi.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package redis
  2. import (
  3. "errors"
  4. "fmt"
  5. )
  6. var errDiscard = errors.New("redis: Discard can be used only inside Exec")
  7. // Not thread-safe.
  8. type Multi struct {
  9. *Client
  10. }
  11. func (c *Client) Multi() *Multi {
  12. return &Multi{
  13. Client: &Client{
  14. baseClient: &baseClient{
  15. opt: c.opt,
  16. connPool: newSingleConnPool(c.connPool, true),
  17. },
  18. },
  19. }
  20. }
  21. func (c *Multi) Close() error {
  22. if err := c.Unwatch().Err(); err != nil {
  23. return err
  24. }
  25. return c.Client.Close()
  26. }
  27. func (c *Multi) Watch(keys ...string) *StatusCmd {
  28. args := append([]string{"WATCH"}, keys...)
  29. cmd := NewStatusCmd(args...)
  30. c.Process(cmd)
  31. return cmd
  32. }
  33. func (c *Multi) Unwatch(keys ...string) *StatusCmd {
  34. args := append([]string{"UNWATCH"}, keys...)
  35. cmd := NewStatusCmd(args...)
  36. c.Process(cmd)
  37. return cmd
  38. }
  39. func (c *Multi) Discard() error {
  40. if c.cmds == nil {
  41. return errDiscard
  42. }
  43. c.cmds = c.cmds[:1]
  44. return nil
  45. }
  46. // Exec always returns list of commands. If transaction fails
  47. // TxFailedErr is returned. Otherwise Exec returns error of the first
  48. // failed command or nil.
  49. func (c *Multi) Exec(f func() error) ([]Cmder, error) {
  50. c.cmds = []Cmder{NewStatusCmd("MULTI")}
  51. if err := f(); err != nil {
  52. return nil, err
  53. }
  54. c.cmds = append(c.cmds, NewSliceCmd("EXEC"))
  55. cmds := c.cmds
  56. c.cmds = nil
  57. if len(cmds) == 2 {
  58. return []Cmder{}, nil
  59. }
  60. cn, err := c.conn()
  61. if err != nil {
  62. setCmdsErr(cmds[1:len(cmds)-1], err)
  63. return cmds[1 : len(cmds)-1], err
  64. }
  65. err = c.execCmds(cn, cmds)
  66. if err != nil {
  67. c.freeConn(cn, err)
  68. return cmds[1 : len(cmds)-1], err
  69. }
  70. c.putConn(cn)
  71. return cmds[1 : len(cmds)-1], nil
  72. }
  73. func (c *Multi) execCmds(cn *conn, cmds []Cmder) error {
  74. err := c.writeCmd(cn, cmds...)
  75. if err != nil {
  76. setCmdsErr(cmds[1:len(cmds)-1], err)
  77. return err
  78. }
  79. statusCmd := NewStatusCmd()
  80. // Omit last command (EXEC).
  81. cmdsLen := len(cmds) - 1
  82. // Parse queued replies.
  83. for i := 0; i < cmdsLen; i++ {
  84. if err := statusCmd.parseReply(cn.rd); err != nil {
  85. setCmdsErr(cmds[1:len(cmds)-1], err)
  86. return err
  87. }
  88. }
  89. // Parse number of replies.
  90. line, err := readLine(cn.rd)
  91. if err != nil {
  92. setCmdsErr(cmds[1:len(cmds)-1], err)
  93. return err
  94. }
  95. if line[0] != '*' {
  96. err := fmt.Errorf("redis: expected '*', but got line %q", line)
  97. setCmdsErr(cmds[1:len(cmds)-1], err)
  98. return err
  99. }
  100. if len(line) == 3 && line[1] == '-' && line[2] == '1' {
  101. setCmdsErr(cmds[1:len(cmds)-1], TxFailedErr)
  102. return TxFailedErr
  103. }
  104. var firstCmdErr error
  105. // Parse replies.
  106. // Loop starts from 1 to omit MULTI cmd.
  107. for i := 1; i < cmdsLen; i++ {
  108. cmd := cmds[i]
  109. if err := cmd.parseReply(cn.rd); err != nil {
  110. if firstCmdErr == nil {
  111. firstCmdErr = err
  112. }
  113. }
  114. }
  115. return firstCmdErr
  116. }