client_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package rpc
  17. import (
  18. "context"
  19. "fmt"
  20. "math/rand"
  21. "net"
  22. "net/http"
  23. "net/http/httptest"
  24. "os"
  25. "reflect"
  26. "runtime"
  27. "sync"
  28. "testing"
  29. "time"
  30. "github.com/davecgh/go-spew/spew"
  31. "github.com/ethereum/go-ethereum/log"
  32. )
  33. func TestClientRequest(t *testing.T) {
  34. server := newTestServer("service", new(Service))
  35. defer server.Stop()
  36. client := DialInProc(server)
  37. defer client.Close()
  38. var resp Result
  39. if err := client.Call(&resp, "service_echo", "hello", 10, &Args{"world"}); err != nil {
  40. t.Fatal(err)
  41. }
  42. if !reflect.DeepEqual(resp, Result{"hello", 10, &Args{"world"}}) {
  43. t.Errorf("incorrect result %#v", resp)
  44. }
  45. }
  46. func TestClientBatchRequest(t *testing.T) {
  47. server := newTestServer("service", new(Service))
  48. defer server.Stop()
  49. client := DialInProc(server)
  50. defer client.Close()
  51. batch := []BatchElem{
  52. {
  53. Method: "service_echo",
  54. Args: []interface{}{"hello", 10, &Args{"world"}},
  55. Result: new(Result),
  56. },
  57. {
  58. Method: "service_echo",
  59. Args: []interface{}{"hello2", 11, &Args{"world"}},
  60. Result: new(Result),
  61. },
  62. {
  63. Method: "no_such_method",
  64. Args: []interface{}{1, 2, 3},
  65. Result: new(int),
  66. },
  67. }
  68. if err := client.BatchCall(batch); err != nil {
  69. t.Fatal(err)
  70. }
  71. wantResult := []BatchElem{
  72. {
  73. Method: "service_echo",
  74. Args: []interface{}{"hello", 10, &Args{"world"}},
  75. Result: &Result{"hello", 10, &Args{"world"}},
  76. },
  77. {
  78. Method: "service_echo",
  79. Args: []interface{}{"hello2", 11, &Args{"world"}},
  80. Result: &Result{"hello2", 11, &Args{"world"}},
  81. },
  82. {
  83. Method: "no_such_method",
  84. Args: []interface{}{1, 2, 3},
  85. Result: new(int),
  86. Error: &jsonError{Code: -32601, Message: "The method no_such_method_ does not exist/is not available"},
  87. },
  88. }
  89. if !reflect.DeepEqual(batch, wantResult) {
  90. t.Errorf("batch results mismatch:\ngot %swant %s", spew.Sdump(batch), spew.Sdump(wantResult))
  91. }
  92. }
  93. // func TestClientCancelInproc(t *testing.T) { testClientCancel("inproc", t) }
  94. func TestClientCancelWebsocket(t *testing.T) { testClientCancel("ws", t) }
  95. func TestClientCancelHTTP(t *testing.T) { testClientCancel("http", t) }
  96. func TestClientCancelIPC(t *testing.T) { testClientCancel("ipc", t) }
  97. // This test checks that requests made through CallContext can be canceled by canceling
  98. // the context.
  99. func testClientCancel(transport string, t *testing.T) {
  100. server := newTestServer("service", new(Service))
  101. defer server.Stop()
  102. // What we want to achieve is that the context gets canceled
  103. // at various stages of request processing. The interesting cases
  104. // are:
  105. // - cancel during dial
  106. // - cancel while performing a HTTP request
  107. // - cancel while waiting for a response
  108. //
  109. // To trigger those, the times are chosen such that connections
  110. // are killed within the deadline for every other call (maxKillTimeout
  111. // is 2x maxCancelTimeout).
  112. //
  113. // Once a connection is dead, there is a fair chance it won't connect
  114. // successfully because the accept is delayed by 1s.
  115. maxContextCancelTimeout := 300 * time.Millisecond
  116. fl := &flakeyListener{
  117. maxAcceptDelay: 1 * time.Second,
  118. maxKillTimeout: 600 * time.Millisecond,
  119. }
  120. var client *Client
  121. switch transport {
  122. case "ws", "http":
  123. c, hs := httpTestClient(server, transport, fl)
  124. defer hs.Close()
  125. client = c
  126. case "ipc":
  127. c, l := ipcTestClient(server, fl)
  128. defer l.Close()
  129. client = c
  130. default:
  131. panic("unknown transport: " + transport)
  132. }
  133. // These tests take a lot of time, run them all at once.
  134. // You probably want to run with -parallel 1 or comment out
  135. // the call to t.Parallel if you enable the logging.
  136. t.Parallel()
  137. // The actual test starts here.
  138. var (
  139. wg sync.WaitGroup
  140. nreqs = 10
  141. ncallers = 6
  142. )
  143. caller := func(index int) {
  144. defer wg.Done()
  145. for i := 0; i < nreqs; i++ {
  146. var (
  147. ctx context.Context
  148. cancel func()
  149. timeout = time.Duration(rand.Int63n(int64(maxContextCancelTimeout)))
  150. )
  151. if index < ncallers/2 {
  152. // For half of the callers, create a context without deadline
  153. // and cancel it later.
  154. ctx, cancel = context.WithCancel(context.Background())
  155. time.AfterFunc(timeout, cancel)
  156. } else {
  157. // For the other half, create a context with a deadline instead. This is
  158. // different because the context deadline is used to set the socket write
  159. // deadline.
  160. ctx, cancel = context.WithTimeout(context.Background(), timeout)
  161. }
  162. // Now perform a call with the context.
  163. // The key thing here is that no call will ever complete successfully.
  164. err := client.CallContext(ctx, nil, "service_sleep", 2*maxContextCancelTimeout)
  165. if err != nil {
  166. log.Debug(fmt.Sprint("got expected error:", err))
  167. } else {
  168. t.Errorf("no error for call with %v wait time", timeout)
  169. }
  170. cancel()
  171. }
  172. }
  173. wg.Add(ncallers)
  174. for i := 0; i < ncallers; i++ {
  175. go caller(i)
  176. }
  177. wg.Wait()
  178. }
  179. func TestClientSubscribeInvalidArg(t *testing.T) {
  180. server := newTestServer("service", new(Service))
  181. defer server.Stop()
  182. client := DialInProc(server)
  183. defer client.Close()
  184. check := func(shouldPanic bool, arg interface{}) {
  185. defer func() {
  186. err := recover()
  187. if shouldPanic && err == nil {
  188. t.Errorf("EthSubscribe should've panicked for %#v", arg)
  189. }
  190. if !shouldPanic && err != nil {
  191. t.Errorf("EthSubscribe shouldn't have panicked for %#v", arg)
  192. buf := make([]byte, 1024*1024)
  193. buf = buf[:runtime.Stack(buf, false)]
  194. t.Error(err)
  195. t.Error(string(buf))
  196. }
  197. }()
  198. client.EthSubscribe(context.Background(), arg, "foo_bar")
  199. }
  200. check(true, nil)
  201. check(true, 1)
  202. check(true, (chan int)(nil))
  203. check(true, make(<-chan int))
  204. check(false, make(chan int))
  205. check(false, make(chan<- int))
  206. }
  207. func TestClientSubscribe(t *testing.T) {
  208. server := newTestServer("eth", new(NotificationTestService))
  209. defer server.Stop()
  210. client := DialInProc(server)
  211. defer client.Close()
  212. nc := make(chan int)
  213. count := 10
  214. sub, err := client.EthSubscribe(context.Background(), nc, "someSubscription", count, 0)
  215. if err != nil {
  216. t.Fatal("can't subscribe:", err)
  217. }
  218. for i := 0; i < count; i++ {
  219. if val := <-nc; val != i {
  220. t.Fatalf("value mismatch: got %d, want %d", val, i)
  221. }
  222. }
  223. sub.Unsubscribe()
  224. select {
  225. case v := <-nc:
  226. t.Fatal("received value after unsubscribe:", v)
  227. case err := <-sub.Err():
  228. if err != nil {
  229. t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err)
  230. }
  231. case <-time.After(1 * time.Second):
  232. t.Fatalf("subscription not closed within 1s after unsubscribe")
  233. }
  234. }
  235. func TestClientSubscribeCustomNamespace(t *testing.T) {
  236. namespace := "custom"
  237. server := newTestServer(namespace, new(NotificationTestService))
  238. defer server.Stop()
  239. client := DialInProc(server)
  240. defer client.Close()
  241. nc := make(chan int)
  242. count := 10
  243. sub, err := client.Subscribe(context.Background(), namespace, nc, "someSubscription", count, 0)
  244. if err != nil {
  245. t.Fatal("can't subscribe:", err)
  246. }
  247. for i := 0; i < count; i++ {
  248. if val := <-nc; val != i {
  249. t.Fatalf("value mismatch: got %d, want %d", val, i)
  250. }
  251. }
  252. sub.Unsubscribe()
  253. select {
  254. case v := <-nc:
  255. t.Fatal("received value after unsubscribe:", v)
  256. case err := <-sub.Err():
  257. if err != nil {
  258. t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err)
  259. }
  260. case <-time.After(1 * time.Second):
  261. t.Fatalf("subscription not closed within 1s after unsubscribe")
  262. }
  263. }
  264. // In this test, the connection drops while EthSubscribe is
  265. // waiting for a response.
  266. func TestClientSubscribeClose(t *testing.T) {
  267. service := &NotificationTestService{
  268. gotHangSubscriptionReq: make(chan struct{}),
  269. unblockHangSubscription: make(chan struct{}),
  270. }
  271. server := newTestServer("eth", service)
  272. defer server.Stop()
  273. client := DialInProc(server)
  274. defer client.Close()
  275. var (
  276. nc = make(chan int)
  277. errc = make(chan error)
  278. sub *ClientSubscription
  279. err error
  280. )
  281. go func() {
  282. sub, err = client.EthSubscribe(context.Background(), nc, "hangSubscription", 999)
  283. errc <- err
  284. }()
  285. <-service.gotHangSubscriptionReq
  286. client.Close()
  287. service.unblockHangSubscription <- struct{}{}
  288. select {
  289. case err := <-errc:
  290. if err == nil {
  291. t.Errorf("EthSubscribe returned nil error after Close")
  292. }
  293. if sub != nil {
  294. t.Error("EthSubscribe returned non-nil subscription after Close")
  295. }
  296. case <-time.After(1 * time.Second):
  297. t.Fatalf("EthSubscribe did not return within 1s after Close")
  298. }
  299. }
  300. // This test checks that Client doesn't lock up when a single subscriber
  301. // doesn't read subscription events.
  302. func TestClientNotificationStorm(t *testing.T) {
  303. server := newTestServer("eth", new(NotificationTestService))
  304. defer server.Stop()
  305. doTest := func(count int, wantError bool) {
  306. client := DialInProc(server)
  307. defer client.Close()
  308. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  309. defer cancel()
  310. // Subscribe on the server. It will start sending many notifications
  311. // very quickly.
  312. nc := make(chan int)
  313. sub, err := client.EthSubscribe(ctx, nc, "someSubscription", count, 0)
  314. if err != nil {
  315. t.Fatal("can't subscribe:", err)
  316. }
  317. defer sub.Unsubscribe()
  318. // Process each notification, try to run a call in between each of them.
  319. for i := 0; i < count; i++ {
  320. select {
  321. case val := <-nc:
  322. if val != i {
  323. t.Fatalf("(%d/%d) unexpected value %d", i, count, val)
  324. }
  325. case err := <-sub.Err():
  326. if wantError && err != ErrSubscriptionQueueOverflow {
  327. t.Fatalf("(%d/%d) got error %q, want %q", i, count, err, ErrSubscriptionQueueOverflow)
  328. } else if !wantError {
  329. t.Fatalf("(%d/%d) got unexpected error %q", i, count, err)
  330. }
  331. return
  332. }
  333. var r int
  334. err := client.CallContext(ctx, &r, "eth_echo", i)
  335. if err != nil {
  336. if !wantError {
  337. t.Fatalf("(%d/%d) call error: %v", i, count, err)
  338. }
  339. return
  340. }
  341. }
  342. }
  343. doTest(8000, false)
  344. doTest(10000, true)
  345. }
  346. func TestClientHTTP(t *testing.T) {
  347. server := newTestServer("service", new(Service))
  348. defer server.Stop()
  349. client, hs := httpTestClient(server, "http", nil)
  350. defer hs.Close()
  351. defer client.Close()
  352. // Launch concurrent requests.
  353. var (
  354. results = make([]Result, 100)
  355. errc = make(chan error)
  356. wantResult = Result{"a", 1, new(Args)}
  357. )
  358. defer client.Close()
  359. for i := range results {
  360. i := i
  361. go func() {
  362. errc <- client.Call(&results[i], "service_echo",
  363. wantResult.String, wantResult.Int, wantResult.Args)
  364. }()
  365. }
  366. // Wait for all of them to complete.
  367. timeout := time.NewTimer(5 * time.Second)
  368. defer timeout.Stop()
  369. for i := range results {
  370. select {
  371. case err := <-errc:
  372. if err != nil {
  373. t.Fatal(err)
  374. }
  375. case <-timeout.C:
  376. t.Fatalf("timeout (got %d/%d) results)", i+1, len(results))
  377. }
  378. }
  379. // Check results.
  380. for i := range results {
  381. if !reflect.DeepEqual(results[i], wantResult) {
  382. t.Errorf("result %d mismatch: got %#v, want %#v", i, results[i], wantResult)
  383. }
  384. }
  385. }
  386. func TestClientReconnect(t *testing.T) {
  387. startServer := func(addr string) (*Server, net.Listener) {
  388. srv := newTestServer("service", new(Service))
  389. l, err := net.Listen("tcp", addr)
  390. if err != nil {
  391. t.Fatal(err)
  392. }
  393. go http.Serve(l, srv.WebsocketHandler([]string{"*"}))
  394. return srv, l
  395. }
  396. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  397. defer cancel()
  398. // Start a server and corresponding client.
  399. s1, l1 := startServer("127.0.0.1:0")
  400. client, err := DialContext(ctx, "ws://"+l1.Addr().String())
  401. if err != nil {
  402. t.Fatal("can't dial", err)
  403. }
  404. // Perform a call. This should work because the server is up.
  405. var resp Result
  406. if err := client.CallContext(ctx, &resp, "service_echo", "", 1, nil); err != nil {
  407. t.Fatal(err)
  408. }
  409. // Shut down the server and try calling again. It shouldn't work.
  410. l1.Close()
  411. s1.Stop()
  412. if err := client.CallContext(ctx, &resp, "service_echo", "", 2, nil); err == nil {
  413. t.Error("successful call while the server is down")
  414. t.Logf("resp: %#v", resp)
  415. }
  416. // Allow for some cool down time so we can listen on the same address again.
  417. time.Sleep(2 * time.Second)
  418. // Start it up again and call again. The connection should be reestablished.
  419. // We spawn multiple calls here to check whether this hangs somehow.
  420. s2, l2 := startServer(l1.Addr().String())
  421. defer l2.Close()
  422. defer s2.Stop()
  423. start := make(chan struct{})
  424. errors := make(chan error, 20)
  425. for i := 0; i < cap(errors); i++ {
  426. go func() {
  427. <-start
  428. var resp Result
  429. errors <- client.CallContext(ctx, &resp, "service_echo", "", 3, nil)
  430. }()
  431. }
  432. close(start)
  433. errcount := 0
  434. for i := 0; i < cap(errors); i++ {
  435. if err = <-errors; err != nil {
  436. errcount++
  437. }
  438. }
  439. t.Log("err:", err)
  440. if errcount > 1 {
  441. t.Errorf("expected one error after disconnect, got %d", errcount)
  442. }
  443. }
  444. func newTestServer(serviceName string, service interface{}) *Server {
  445. server := NewServer()
  446. if err := server.RegisterName(serviceName, service); err != nil {
  447. panic(err)
  448. }
  449. return server
  450. }
  451. func httpTestClient(srv *Server, transport string, fl *flakeyListener) (*Client, *httptest.Server) {
  452. // Create the HTTP server.
  453. var hs *httptest.Server
  454. switch transport {
  455. case "ws":
  456. hs = httptest.NewUnstartedServer(srv.WebsocketHandler([]string{"*"}))
  457. case "http":
  458. hs = httptest.NewUnstartedServer(srv)
  459. default:
  460. panic("unknown HTTP transport: " + transport)
  461. }
  462. // Wrap the listener if required.
  463. if fl != nil {
  464. fl.Listener = hs.Listener
  465. hs.Listener = fl
  466. }
  467. // Connect the client.
  468. hs.Start()
  469. client, err := Dial(transport + "://" + hs.Listener.Addr().String())
  470. if err != nil {
  471. panic(err)
  472. }
  473. return client, hs
  474. }
  475. func ipcTestClient(srv *Server, fl *flakeyListener) (*Client, net.Listener) {
  476. // Listen on a random endpoint.
  477. endpoint := fmt.Sprintf("go-ethereum-test-ipc-%d-%d", os.Getpid(), rand.Int63())
  478. if runtime.GOOS == "windows" {
  479. endpoint = `\\.\pipe\` + endpoint
  480. } else {
  481. endpoint = os.TempDir() + "/" + endpoint
  482. }
  483. l, err := ipcListen(endpoint)
  484. if err != nil {
  485. panic(err)
  486. }
  487. // Connect the listener to the server.
  488. if fl != nil {
  489. fl.Listener = l
  490. l = fl
  491. }
  492. go srv.ServeListener(l)
  493. // Connect the client.
  494. client, err := Dial(endpoint)
  495. if err != nil {
  496. panic(err)
  497. }
  498. return client, l
  499. }
  500. // flakeyListener kills accepted connections after a random timeout.
  501. type flakeyListener struct {
  502. net.Listener
  503. maxKillTimeout time.Duration
  504. maxAcceptDelay time.Duration
  505. }
  506. func (l *flakeyListener) Accept() (net.Conn, error) {
  507. delay := time.Duration(rand.Int63n(int64(l.maxAcceptDelay)))
  508. time.Sleep(delay)
  509. c, err := l.Listener.Accept()
  510. if err == nil {
  511. timeout := time.Duration(rand.Int63n(int64(l.maxKillTimeout)))
  512. time.AfterFunc(timeout, func() {
  513. log.Debug(fmt.Sprintf("killing conn %v after %v", c.LocalAddr(), timeout))
  514. c.Close()
  515. })
  516. }
  517. return c, err
  518. }