webpush-fcm-relay.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/base64"
  6. "encoding/binary"
  7. "flag"
  8. "fmt"
  9. "net/http"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "firebase.google.com/go/v4/messaging"
  14. "github.com/appleboy/go-fcm"
  15. uuid "github.com/satori/go.uuid"
  16. log "github.com/sirupsen/logrus"
  17. httptrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/net/http"
  18. dd_logrus "gopkg.in/DataDog/dd-trace-go.v1/contrib/sirupsen/logrus"
  19. "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
  20. )
  21. var (
  22. client *fcm.Client
  23. configListenAddr string
  24. configCredentialsFilePath string
  25. configMaxQueueSize int
  26. configMaxWorkers int
  27. messageChan chan *messaging.Message
  28. ctx context.Context
  29. )
  30. func main() {
  31. tracer.Start()
  32. defer tracer.Stop()
  33. mux := httptrace.NewServeMux()
  34. log.AddHook(&dd_logrus.DDContextLogHook{})
  35. flag.StringVar(&configListenAddr, "bind", "127.0.0.1:42069", "Bind address")
  36. flag.StringVar(&configCredentialsFilePath, "credentials-file-path", "", "Path to the Firebase credentials file")
  37. flag.IntVar(&configMaxQueueSize, "max-queue-size", 1024, "Maximum number of messages to queue")
  38. flag.IntVar(&configMaxWorkers, "max-workers", 4, "Maximum number of workers")
  39. flag.Parse()
  40. if configCredentialsFilePath == "" {
  41. log.Fatal("Firebase server key not provided")
  42. }
  43. var err error
  44. ctx = context.Background()
  45. client, err = fcm.NewClient(ctx, fcm.WithCredentialsFile(configCredentialsFilePath))
  46. if err != nil {
  47. log.Fatal(fmt.Sprintf("Error setting up FCM client: %s", err))
  48. }
  49. // create workers
  50. messageChan = make(chan *messaging.Message, configMaxQueueSize)
  51. for i := 1; i <= configMaxWorkers; i++ {
  52. go worker(i)
  53. }
  54. mux.HandleFunc("/relay-to/", handler)
  55. log.Info(fmt.Sprintf("Starting on %s...", configListenAddr))
  56. log.Fatal(http.ListenAndServe(configListenAddr, mux))
  57. }
  58. func nextRequestID() string {
  59. return uuid.NewV4().String()
  60. }
  61. func handler(writer http.ResponseWriter, request *http.Request) {
  62. span, sctx := tracer.StartSpanFromContext(ctx, "web.request", tracer.ResourceName(request.RequestURI))
  63. defer span.Finish()
  64. requestID := nextRequestID()
  65. requestLog := log.WithFields(log.Fields{"request-id": requestID}).WithContext(sctx)
  66. writer.Header().Set("X-Request-Id", requestID)
  67. components := strings.Split(request.URL.Path, "/")
  68. if len(components) < 4 {
  69. http.Error(writer, "Invalid URL path", http.StatusBadRequest)
  70. requestLog.Error(fmt.Sprintf("Invalid URL path: %s", request.URL.Path))
  71. return
  72. }
  73. if components[2] != "fcm" {
  74. http.Error(writer, "Invalid target environment", http.StatusBadRequest)
  75. requestLog.Error(fmt.Sprintf("Invalid target environment: %s", components[2]))
  76. return
  77. }
  78. deviceToken := components[3]
  79. if deviceToken == "" {
  80. http.Error(writer, "Missing device token", http.StatusBadRequest)
  81. requestLog.Error("Missing device token")
  82. return
  83. }
  84. buffer := new(bytes.Buffer)
  85. buffer.ReadFrom(request.Body)
  86. encodedString := encode85(buffer.Bytes())
  87. message := &messaging.Message{
  88. Token: deviceToken,
  89. Android: &messaging.AndroidConfig{},
  90. Data: map[string]string{
  91. "p": encodedString,
  92. },
  93. Notification: &messaging.Notification{
  94. Title: "🎺",
  95. },
  96. APNS: &messaging.APNSConfig{
  97. Payload: &messaging.APNSPayload{
  98. Aps: &messaging.Aps{
  99. ContentAvailable: true,
  100. MutableContent: true,
  101. },
  102. },
  103. },
  104. }
  105. if len(components) > 4 {
  106. message.Data["x"] = strings.Join(components[4:], "/")
  107. }
  108. switch request.Header.Get("Content-Encoding") {
  109. case "aesgcm":
  110. if publicKey, err := encodedValue(request.Header, "Crypto-Key", "dh"); err == nil {
  111. message.Data["k"] = publicKey
  112. } else {
  113. http.Error(writer, "Error retrieving public key", http.StatusBadRequest)
  114. requestLog.Error(fmt.Sprintf("Error retrieving public key: %s", err))
  115. return
  116. }
  117. if salt, err := encodedValue(request.Header, "Encryption", "salt"); err == nil {
  118. message.Data["s"] = salt
  119. } else {
  120. http.Error(writer, "Error retrieving salt", http.StatusBadRequest)
  121. requestLog.Error(fmt.Sprintf("Error retrieving salt: %s", err))
  122. return
  123. }
  124. case "aes128gcm":
  125. message.Data["rfc"] = "1"
  126. default:
  127. http.Error(writer, "Unsupported content encoding", http.StatusUnsupportedMediaType)
  128. requestLog.Error(fmt.Sprintf("Unsupported content encoding: %s", request.Header.Get("Content-Encoding")))
  129. return
  130. }
  131. if seconds := request.Header.Get("TTL"); seconds != "" {
  132. if ttl, err := strconv.Atoi(seconds); err == nil {
  133. timeToLive := time.Duration(ttl) * time.Second
  134. message.Android.TTL = &timeToLive
  135. }
  136. }
  137. if topic := request.Header.Get("Topic"); topic != "" {
  138. message.Android.CollapseKey = topic
  139. }
  140. switch request.Header.Get("Urgency") {
  141. case "very-low", "low":
  142. message.Android.Priority = "normal"
  143. default:
  144. message.Android.Priority = "high"
  145. }
  146. messageChan <- message
  147. writer.WriteHeader(201)
  148. requestLog.WithFields(log.Fields{
  149. "to": message.Token,
  150. "priority": message.Android.Priority,
  151. "ttl": message.Android.TTL,
  152. "collapse-key": message.Android.CollapseKey,
  153. }).Info("Queue success")
  154. }
  155. func worker(wid int) {
  156. log.Info(fmt.Sprintf("Starting worker %d", wid))
  157. for msg := range messageChan {
  158. resp, err := client.Send(ctx, msg)
  159. if err != nil {
  160. log.Error(fmt.Sprintf("error sending fcm message: %s", err.Error()))
  161. }
  162. if resp.FailureCount > 0 {
  163. for _, resp := range resp.Responses {
  164. if !resp.Success {
  165. log.Warn(fmt.Sprintf("message rejected (%s): %s", resp.MessageID, resp.Error))
  166. }
  167. }
  168. }
  169. }
  170. log.Info(fmt.Sprintf("Worker %d stopped", wid))
  171. }
  172. func encodedValue(header http.Header, name, key string) (string, error) {
  173. keyValues := parseKeyValues(header.Get(name))
  174. value, exists := keyValues[key]
  175. if !exists {
  176. return "", fmt.Errorf("value %s not found in header %s", key, name)
  177. }
  178. bytes, err := base64.RawURLEncoding.DecodeString(value)
  179. if err != nil {
  180. return "", err
  181. }
  182. return encode85(bytes), nil
  183. }
  184. func parseKeyValues(values string) map[string]string {
  185. f := func(c rune) bool {
  186. return c == ';'
  187. }
  188. entries := strings.FieldsFunc(values, f)
  189. m := make(map[string]string)
  190. for _, entry := range entries {
  191. parts := strings.Split(entry, "=")
  192. m[parts[0]] = parts[1]
  193. }
  194. return m
  195. }
  196. var z85digits = []byte("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#")
  197. func encode85(bytes []byte) string {
  198. numBlocks := len(bytes) / 4
  199. suffixLength := len(bytes) % 4
  200. encodedLength := numBlocks * 5
  201. if suffixLength != 0 {
  202. encodedLength += suffixLength + 1
  203. }
  204. encodedBytes := make([]byte, encodedLength)
  205. src := bytes
  206. dest := encodedBytes
  207. for block := 0; block < numBlocks; block++ {
  208. value := binary.BigEndian.Uint32(src)
  209. for i := 0; i < 5; i++ {
  210. dest[4-i] = z85digits[value%85]
  211. value /= 85
  212. }
  213. src = src[4:]
  214. dest = dest[5:]
  215. }
  216. if suffixLength != 0 {
  217. value := 0
  218. for i := 0; i < suffixLength; i++ {
  219. value *= 256
  220. value |= int(src[i])
  221. }
  222. for i := 0; i < suffixLength+1; i++ {
  223. dest[suffixLength-i] = z85digits[value%85]
  224. value /= 85
  225. }
  226. }
  227. return string(encodedBytes)
  228. }