ax.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package ax
  2. import (
  3. "net/http"
  4. "github.com/gorilla/mux"
  5. "github.com/gorilla/websocket"
  6. "github.com/dchest/uniuri"
  7. "fmt"
  8. "log"
  9. "time"
  10. "io"
  11. )
  12. type Config struct {
  13. Port int
  14. ConnectionTimeout int
  15. }
  16. type Router struct {
  17. mux.Router
  18. }
  19. // Structure `Client` defines client contiguous connection.
  20. type Client struct {
  21. // The WebSocket connection
  22. ws *websocket.Conn
  23. // Bufered channel of outbound messages
  24. send chan []byte
  25. // Connection ID
  26. cid string
  27. t int64
  28. // User context (available for client code)
  29. Context map[string] interface{}
  30. }
  31. const (
  32. // Time allowed to write a message to the peer.
  33. writeWait = 10 * time.Second
  34. // Time allowed to read the next pong message from the peer.
  35. pongWait = 60 * time.Second
  36. // Send pings to peer with this period. Must be less than pongWait.
  37. pingPeriod = (pongWait * 9) / 10
  38. // Maximum message size allowed from peer.
  39. maxMessageSize = 512 * 1024
  40. purgePeriod = 120 * time.Second
  41. )
  42. var (
  43. config Config
  44. upgrader = websocket.Upgrader {
  45. ReadBufferSize: 1024,
  46. WriteBufferSize: 1024,
  47. }
  48. onenter func(*Client, *http.Request)
  49. onleave func(*Client)
  50. onping func(*Client)
  51. )
  52. const cidCookieName = "__cid__"
  53. func (c *Client) Cid() string {
  54. return c.cid
  55. }
  56. func genConnId() string {
  57. return uniuri.NewLen(20)
  58. }
  59. func getCurrentCid(r *http.Request) (string, error) {
  60. cookie, err := r.Cookie(cidCookieName)
  61. if err != nil {
  62. return "", err
  63. }
  64. return cookie.Value, nil
  65. }
  66. func makeCookie(cid string) *http.Cookie {
  67. expire := time.Now().Add(
  68. time.Duration(config.ConnectionTimeout) * time.Second)
  69. cookie := &http.Cookie {
  70. Name: cidCookieName,
  71. Value: cid,
  72. Path: "/",
  73. Expires: expire,
  74. }
  75. return cookie
  76. }
  77. func makeInitScript(cid string, port int, connectionTimeout int) string {
  78. return fmt.Sprintf("var __state = {cid:'%s',conn_timeout:%d,port:%d};\n",
  79. cid, connectionTimeout, port)
  80. }
  81. func axInitHandler(w http.ResponseWriter, r *http.Request) {
  82. cid, err := getCurrentCid(r)
  83. if err != nil {
  84. cid = genConnId()
  85. http.SetCookie(w, makeCookie(cid))
  86. }
  87. w.Header().Set("Content-Type", "text/javascript")
  88. script := makeInitScript(cid, config.Port, config.ConnectionTimeout)
  89. fmt.Fprintf(w, "%s", script)
  90. }
  91. func axStaticHandler(w http.ResponseWriter, r *http.Request) {
  92. w.Header().Set("Content-Type", "text/javascript")
  93. http.ServeFile(w, r, "./ax/ax.js");
  94. }
  95. func (c *Client) write(msgtype int, data []byte) error {
  96. c.ws.SetWriteDeadline(time.Now().Add(writeWait))
  97. return c.ws.WriteMessage(msgtype, data)
  98. }
  99. func sendLoop(c *Client) {
  100. ticker := time.NewTicker(pingPeriod)
  101. defer func() {
  102. ticker.Stop()
  103. c.ws.Close()
  104. }()
  105. for {
  106. select {
  107. case data, ok := <-c.send:
  108. if !ok {
  109. c.write(websocket.CloseMessage, []byte{})
  110. return
  111. }
  112. if err := c.write(websocket.TextMessage, data);
  113. err != nil {
  114. log.Printf("[%v]ws.write TextMessage error %+v\n", c.cid, err)
  115. return
  116. }
  117. case <-ticker.C:
  118. if err := c.write(websocket.PingMessage, []byte{});
  119. err != nil {
  120. log.Printf("[%v]ws.write PingMessage error %+v\n", c.cid, err)
  121. return
  122. }
  123. if onping != nil {
  124. onping(c)
  125. }
  126. // Refresh cookie's "expires" property to avoid cookie invalidation
  127. if time.Now().Unix() - c.t > int64(config.ConnectionTimeout / 2) {
  128. c.t = time.Now().Unix()
  129. c.setCidCookie()
  130. }
  131. }
  132. }
  133. }
  134. func recvLoop(c *Client) {
  135. defer func() {
  136. if onleave != nil {
  137. onleave(c)
  138. }
  139. close(c.send)
  140. }()
  141. c.ws.SetReadLimit(maxMessageSize)
  142. c.ws.SetReadDeadline(time.Now().Add(pongWait))
  143. c.ws.SetPongHandler(func(string) error {
  144. c.ws.SetReadDeadline(time.Now().Add(pongWait))
  145. return nil
  146. })
  147. for {
  148. _, data, err := c.ws.ReadMessage()
  149. if err != nil && err != io.EOF {
  150. log.Printf("[%v]ws.ReadMessage error %+v\n", c.cid, err)
  151. }
  152. if err != nil {
  153. break
  154. }
  155. onRecv(c, data)
  156. }
  157. }
  158. func axWebsocketHandler(w http.ResponseWriter, r *http.Request) {
  159. conn, err := upgrader.Upgrade(w, r, nil)
  160. if err != nil {
  161. log.Printf("WS upgrade error %+v\n", err)
  162. return
  163. }
  164. cid, err := getCurrentCid(r)
  165. if err != nil {
  166. cid = genConnId()
  167. log.Printf("ERROR no CID cookie in websocket handler\n")
  168. log.Printf("ERROR context will not be preserved on " +
  169. "page reload\n")
  170. }
  171. c := &Client {
  172. ws: conn,
  173. send: make(chan []byte, 256),
  174. cid: cid,
  175. t: time.Now().Unix(),
  176. Context: make(map[string]interface{}),
  177. }
  178. c.setCidCookie()
  179. if onenter != nil {
  180. onenter(c, r)
  181. }
  182. go sendLoop(c)
  183. recvLoop(c)
  184. }
  185. func Setup(c *Config) *Router {
  186. config = *c
  187. // Initialize routing
  188. r := mux.NewRouter()
  189. http.HandleFunc("/__ax_init.js", axInitHandler)
  190. http.HandleFunc("/__ax.js", axStaticHandler)
  191. http.HandleFunc("/__ws", axWebsocketHandler)
  192. r.PathPrefix("/static").Handler(http.FileServer(http.Dir(".")))
  193. return &Router{*r}
  194. }
  195. func OnEnter(handler func(c *Client, r *http.Request)) {
  196. onenter = handler
  197. }
  198. func OnLeave(handler func(c *Client)) {
  199. onleave = handler
  200. }
  201. func OnPing(handler func(c * Client)) {
  202. onping = handler
  203. }
  204. func (c *Client) setCidCookie() {
  205. c.Send([]byte(`{"type": "__ax_set_cookie", "data": {}}`))
  206. }
  207. func (c *Client) Send(data []byte) {
  208. c.send <- data
  209. }