subscription_test.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rpc
  17. import (
  18. "context"
  19. "encoding/json"
  20. "fmt"
  21. "net"
  22. "sync"
  23. "testing"
  24. "time"
  25. )
  26. type NotificationTestService struct {
  27. mu sync.Mutex
  28. unsubscribed bool
  29. gotHangSubscriptionReq chan struct{}
  30. unblockHangSubscription chan struct{}
  31. }
  32. func (s *NotificationTestService) Echo(i int) int {
  33. return i
  34. }
  35. func (s *NotificationTestService) wasUnsubCallbackCalled() bool {
  36. s.mu.Lock()
  37. defer s.mu.Unlock()
  38. return s.unsubscribed
  39. }
  40. func (s *NotificationTestService) Unsubscribe(subid string) {
  41. s.mu.Lock()
  42. s.unsubscribed = true
  43. s.mu.Unlock()
  44. }
  45. func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (*Subscription, error) {
  46. notifier, supported := NotifierFromContext(ctx)
  47. if !supported {
  48. return nil, ErrNotificationsUnsupported
  49. }
  50. // by explicitly creating an subscription we make sure that the subscription id is send back to the client
  51. // before the first subscription.Notify is called. Otherwise the events might be send before the response
  52. // for the eth_subscribe method.
  53. subscription := notifier.CreateSubscription()
  54. go func() {
  55. // test expects n events, if we begin sending event immediately some events
  56. // will probably be dropped since the subscription ID might not be send to
  57. // the client.
  58. time.Sleep(5 * time.Second)
  59. for i := 0; i < n; i++ {
  60. if err := notifier.Notify(subscription.ID, val+i); err != nil {
  61. return
  62. }
  63. }
  64. select {
  65. case <-notifier.Closed():
  66. s.mu.Lock()
  67. s.unsubscribed = true
  68. s.mu.Unlock()
  69. case <-subscription.Err():
  70. s.mu.Lock()
  71. s.unsubscribed = true
  72. s.mu.Unlock()
  73. }
  74. }()
  75. return subscription, nil
  76. }
  77. // HangSubscription blocks on s.unblockHangSubscription before
  78. // sending anything.
  79. func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (*Subscription, error) {
  80. notifier, supported := NotifierFromContext(ctx)
  81. if !supported {
  82. return nil, ErrNotificationsUnsupported
  83. }
  84. s.gotHangSubscriptionReq <- struct{}{}
  85. <-s.unblockHangSubscription
  86. subscription := notifier.CreateSubscription()
  87. go func() {
  88. notifier.Notify(subscription.ID, val)
  89. }()
  90. return subscription, nil
  91. }
  92. func TestNotifications(t *testing.T) {
  93. server := NewServer()
  94. service := &NotificationTestService{}
  95. if err := server.RegisterName("eth", service); err != nil {
  96. t.Fatalf("unable to register test service %v", err)
  97. }
  98. clientConn, serverConn := net.Pipe()
  99. go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions)
  100. out := json.NewEncoder(clientConn)
  101. in := json.NewDecoder(clientConn)
  102. n := 5
  103. val := 12345
  104. request := map[string]interface{}{
  105. "id": 1,
  106. "method": "eth_subscribe",
  107. "version": "2.0",
  108. "params": []interface{}{"someSubscription", n, val},
  109. }
  110. // create subscription
  111. if err := out.Encode(request); err != nil {
  112. t.Fatal(err)
  113. }
  114. var subid string
  115. response := jsonSuccessResponse{Result: subid}
  116. if err := in.Decode(&response); err != nil {
  117. t.Fatal(err)
  118. }
  119. var ok bool
  120. if _, ok = response.Result.(string); !ok {
  121. t.Fatalf("expected subscription id, got %T", response.Result)
  122. }
  123. for i := 0; i < n; i++ {
  124. var notification jsonNotification
  125. if err := in.Decode(&notification); err != nil {
  126. t.Fatalf("%v", err)
  127. }
  128. if int(notification.Params.Result.(float64)) != val+i {
  129. t.Fatalf("expected %d, got %d", val+i, notification.Params.Result)
  130. }
  131. }
  132. clientConn.Close() // causes notification unsubscribe callback to be called
  133. time.Sleep(1 * time.Second)
  134. if !service.wasUnsubCallbackCalled() {
  135. t.Error("unsubscribe callback not called after closing connection")
  136. }
  137. }
  138. func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSuccessResponse,
  139. failures chan<- jsonErrResponse, notifications chan<- jsonNotification, errors chan<- error) {
  140. // read and parse server messages
  141. for {
  142. var rmsg json.RawMessage
  143. if err := in.Decode(&rmsg); err != nil {
  144. return
  145. }
  146. var responses []map[string]interface{}
  147. if rmsg[0] == '[' {
  148. if err := json.Unmarshal(rmsg, &responses); err != nil {
  149. errors <- fmt.Errorf("Received invalid message: %s", rmsg)
  150. return
  151. }
  152. } else {
  153. var msg map[string]interface{}
  154. if err := json.Unmarshal(rmsg, &msg); err != nil {
  155. errors <- fmt.Errorf("Received invalid message: %s", rmsg)
  156. return
  157. }
  158. responses = append(responses, msg)
  159. }
  160. for _, msg := range responses {
  161. // determine what kind of msg was received and broadcast
  162. // it to over the corresponding channel
  163. if _, found := msg["result"]; found {
  164. successes <- jsonSuccessResponse{
  165. Version: msg["jsonrpc"].(string),
  166. Id: msg["id"],
  167. Result: msg["result"],
  168. }
  169. continue
  170. }
  171. if _, found := msg["error"]; found {
  172. params := msg["params"].(map[string]interface{})
  173. failures <- jsonErrResponse{
  174. Version: msg["jsonrpc"].(string),
  175. Id: msg["id"],
  176. Error: jsonError{int(params["subscription"].(float64)), params["message"].(string), params["data"]},
  177. }
  178. continue
  179. }
  180. if _, found := msg["params"]; found {
  181. params := msg["params"].(map[string]interface{})
  182. notifications <- jsonNotification{
  183. Version: msg["jsonrpc"].(string),
  184. Method: msg["method"].(string),
  185. Params: jsonSubscription{params["subscription"].(string), params["result"]},
  186. }
  187. continue
  188. }
  189. errors <- fmt.Errorf("Received invalid message: %s", msg)
  190. }
  191. }
  192. }
  193. // TestSubscriptionMultipleNamespaces ensures that subscriptions can exists
  194. // for multiple different namespaces.
  195. func TestSubscriptionMultipleNamespaces(t *testing.T) {
  196. var (
  197. namespaces = []string{"eth", "shh", "bzz"}
  198. server = NewServer()
  199. service = NotificationTestService{}
  200. clientConn, serverConn = net.Pipe()
  201. out = json.NewEncoder(clientConn)
  202. in = json.NewDecoder(clientConn)
  203. successes = make(chan jsonSuccessResponse)
  204. failures = make(chan jsonErrResponse)
  205. notifications = make(chan jsonNotification)
  206. errors = make(chan error, 10)
  207. )
  208. // setup and start server
  209. for _, namespace := range namespaces {
  210. if err := server.RegisterName(namespace, &service); err != nil {
  211. t.Fatalf("unable to register test service %v", err)
  212. }
  213. }
  214. go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions)
  215. defer server.Stop()
  216. // wait for message and write them to the given channels
  217. go waitForMessages(t, in, successes, failures, notifications, errors)
  218. // create subscriptions one by one
  219. n := 3
  220. for i, namespace := range namespaces {
  221. request := map[string]interface{}{
  222. "id": i,
  223. "method": fmt.Sprintf("%s_subscribe", namespace),
  224. "version": "2.0",
  225. "params": []interface{}{"someSubscription", n, i},
  226. }
  227. if err := out.Encode(&request); err != nil {
  228. t.Fatalf("Could not create subscription: %v", err)
  229. }
  230. }
  231. // create all subscriptions in 1 batch
  232. var requests []interface{}
  233. for i, namespace := range namespaces {
  234. requests = append(requests, map[string]interface{}{
  235. "id": i,
  236. "method": fmt.Sprintf("%s_subscribe", namespace),
  237. "version": "2.0",
  238. "params": []interface{}{"someSubscription", n, i},
  239. })
  240. }
  241. if err := out.Encode(&requests); err != nil {
  242. t.Fatalf("Could not create subscription in batch form: %v", err)
  243. }
  244. timeout := time.After(30 * time.Second)
  245. subids := make(map[string]string, 2*len(namespaces))
  246. count := make(map[string]int, 2*len(namespaces))
  247. for {
  248. done := true
  249. for id := range count {
  250. if count, found := count[id]; !found || count < (2*n) {
  251. done = false
  252. }
  253. }
  254. if done && len(count) == len(namespaces) {
  255. break
  256. }
  257. select {
  258. case err := <-errors:
  259. t.Fatal(err)
  260. case suc := <-successes: // subscription created
  261. subids[namespaces[int(suc.Id.(float64))]] = suc.Result.(string)
  262. case failure := <-failures:
  263. t.Errorf("received error: %v", failure.Error)
  264. case notification := <-notifications:
  265. if cnt, found := count[notification.Params.Subscription]; found {
  266. count[notification.Params.Subscription] = cnt + 1
  267. } else {
  268. count[notification.Params.Subscription] = 1
  269. }
  270. case <-timeout:
  271. for _, namespace := range namespaces {
  272. subid, found := subids[namespace]
  273. if !found {
  274. t.Errorf("Subscription for '%s' not created", namespace)
  275. continue
  276. }
  277. if count, found := count[subid]; !found || count < n {
  278. t.Errorf("Didn't receive all notifications (%d<%d) in time for namespace '%s'", count, n, namespace)
  279. }
  280. }
  281. return
  282. }
  283. }
  284. }