convert.go 30 KB


  1. package traffic
  2. // todo(BAF10) direction (0|1) to const (TO|BACK)
  3. // fixme converting lines takes much time (graphs)
  4. import (
  5. "apiote.xyz/p/szczanieckiej/config"
  6. "apiote.xyz/p/szczanieckiej/file"
  7. "bufio"
  8. "encoding/csv"
  9. "errors"
  10. "fmt"
  11. "image/color"
  12. "io"
  13. "io/fs"
  14. "log"
  15. "net/http"
  16. "os"
  17. "path/filepath"
  18. "sort"
  19. "syscall"
  20. "time"
  21. gott2 "apiote.xyz/p/gott/v2"
  22. "git.sr.ht/~sircmpwn/go-bare"
  23. "notabug.org/apiote/gott"
  24. )
  25. type ErrEmpty struct{}
  26. func (ErrEmpty) Error() string {
  27. return ""
  28. }
  29. type result struct {
  30. config config.Config
  31. pid int
  32. tmpPath string
  33. feed Feed
  34. feedName string
  35. location *time.Location
  36. tmpFeedPath string
  37. homeFeedPath string
  38. downloadedVersions []Version
  39. allVersions []Version
  40. gtfsFilenames []string
  41. missingVersions []Version
  42. updatesFile *os.File
  43. updates map[string]string
  44. }
  45. type feedConverter struct {
  46. TmpFeedPath string
  47. GtfsFilename string
  48. Feed Feed
  49. HomeFeedPath string
  50. TrafficCalendarFile *os.File
  51. Departures map[string][]Departure
  52. TripsThroughStop map[string][]StopOrder
  53. LineNames map[string]string
  54. TripsOffsets map[string]uint
  55. TripChangeOpts map[string]ChangeOption
  56. StopsCodeIndex CodeIndex
  57. StopsNameIndex map[string][]uint
  58. Stops map[string]Stop
  59. LineGraphs map[string]map[uint]LineGraph
  60. LineIndex map[string][]uint
  61. }
  62. // helper functions
  63. func hex2colour(hex string) color.RGBA {
  64. if hex[0] == '#' {
  65. hex = hex[1:]
  66. }
  67. colour := color.RGBA{
  68. A: 0xff,
  69. }
  70. hexToByte := func(b byte) byte {
  71. switch {
  72. case b >= '0' && b <= '9':
  73. return b - '0'
  74. case b >= 'a' && b <= 'f':
  75. return b - 'a' + 10
  76. case b >= 'A' && b <= 'F':
  77. return b - 'A' + 10
  78. default:
  79. return 0
  80. }
  81. }
  82. colour.R = hexToByte(hex[0])<<4 + hexToByte(hex[1])
  83. colour.G = hexToByte(hex[2])<<4 + hexToByte(hex[3])
  84. colour.B = hexToByte(hex[4])<<4 + hexToByte(hex[5])
  85. return colour
  86. }
  87. func contains(array []Version, item Version) bool {
  88. for _, i := range array {
  89. if item.ValidFrom.Equal(i.ValidFrom) && i.ValidTill.Equal(i.ValidTill) {
  90. return true
  91. }
  92. }
  93. return false
  94. }
  95. // converting functions
  96. func createTmpPath(input ...interface{}) (interface{}, error) {
  97. args := input[0].(result)
  98. p := filepath.Join(args.tmpPath, args.feedName)
  99. err := os.MkdirAll(p, 0755)
  100. args.tmpFeedPath = p
  101. return gott.Tuple{args}, err
  102. }
  103. func createFeedHome(input ...interface{}) (interface{}, error) {
  104. args := input[0].(result)
  105. p := filepath.Join(args.config.FeedsPath, args.feedName)
  106. err := os.MkdirAll(p, 0755)
  107. args.homeFeedPath = p
  108. return gott.Tuple{args}, err
  109. }
  110. func listDownloadedVersions(input ...interface{}) (interface{}, error) {
  111. args := input[0].(result)
  112. v, err := ListVersions(args.config, args.feed)
  113. args.downloadedVersions = v
  114. return gott.Tuple{args}, err
  115. }
  116. func getAllVersions(input ...interface{}) (interface{}, error) {
  117. args := input[0].(result)
  118. v, err := args.feed.GetVersions(time.Now().In(args.location))
  119. args.allVersions = v
  120. return gott.Tuple{args}, err
  121. }
  122. func findValidVersions(input ...interface{}) interface{} {
  123. args := input[0].(result)
  124. dateValidVersions := []Version{}
  125. now := time.Now().In(args.location)
  126. for _, version := range args.allVersions {
  127. if version.ValidFrom.Before(now) && now.Before(version.ValidTill) {
  128. dateValidVersions = append(dateValidVersions, version)
  129. }
  130. }
  131. sort.Slice(dateValidVersions, func(i, j int) bool {
  132. return dateValidVersions[i].ValidFrom.Before(dateValidVersions[j].ValidFrom)
  133. })
  134. sort.Slice(args.downloadedVersions, func(i, j int) bool {
  135. return args.downloadedVersions[i].ValidFrom.Before(args.downloadedVersions[j].ValidFrom)
  136. })
  137. validVersions := FindValidVersions(dateValidVersions, now)
  138. missingVersions := []Version{}
  139. for _, version := range validVersions {
  140. if !contains(args.downloadedVersions, version) {
  141. missingVersions = append(missingVersions, version)
  142. }
  143. }
  144. args.missingVersions = missingVersions
  145. return gott.Tuple{args}
  146. }
  147. func getGtfsFiles(input ...interface{}) (interface{}, error) {
  148. args := input[0].(result)
  149. names := []string{}
  150. for _, version := range args.missingVersions {
  151. name := version.String() + ".zip"
  152. zipPath := filepath.Join(args.tmpFeedPath, name)
  153. url := version.Link
  154. response, err := http.Get(url)
  155. if err != nil {
  156. return gott.Tuple{args}, fmt.Errorf("while downloading gtfs %s %w\n", name, err)
  157. }
  158. file, err := os.Create(zipPath)
  159. if err != nil {
  160. return gott.Tuple{args}, fmt.Errorf("while downloading gtfs %s %w\n", name, err)
  161. }
  162. defer file.Close()
  163. _, err = io.Copy(file, response.Body)
  164. if err != nil {
  165. return gott.Tuple{args}, fmt.Errorf("while downloading gtfs %s %w\n", name, err)
  166. }
  167. names = append(names, name)
  168. }
  169. args.gtfsFilenames = names
  170. return gott.Tuple{args}, nil
  171. }
  172. func unzipGtfs(c feedConverter) error {
  173. return file.UnzipGtfs(c.TmpFeedPath, c.GtfsFilename)
  174. }
  175. func convertVehicles(c feedConverter) error {
  176. return c.Feed.ConvertVehicles(c.TmpFeedPath)
  177. }
  178. func createTrafficCalendarFile(c feedConverter) (feedConverter, error) {
  179. path := c.TmpFeedPath
  180. var err error
  181. c.TrafficCalendarFile, err = os.Create(filepath.Join(path, "calendar.bare"))
  182. return c, err
  183. }
  184. func recoverCalendar(c feedConverter, e error) (feedConverter, error) {
  185. var pathError *os.PathError
  186. if errors.As(e, &pathError) && errors.Is(pathError, fs.ErrNotExist) {
  187. return c, nil
  188. }
  189. return c, e
  190. }
  191. func convertCalendar(c feedConverter) (feedConverter, error) {
  192. path := c.TmpFeedPath
  193. resultFile := c.TrafficCalendarFile
  194. calendarFile, err := os.Open(filepath.Join(path, "calendar.txt"))
  195. if err != nil {
  196. return c, fmt.Errorf("while opening file: %w", err)
  197. }
  198. defer calendarFile.Close()
  199. r := csv.NewReader(bufio.NewReader(calendarFile))
  200. header, err := r.Read()
  201. if err != nil {
  202. return c, fmt.Errorf("while reading header: %w", err)
  203. }
  204. fields := map[string]int{}
  205. for i, headerField := range header {
  206. fields[headerField] = i
  207. }
  208. for {
  209. schedule := Schedule{}
  210. record, err := r.Read()
  211. if err == io.EOF {
  212. break
  213. }
  214. if err != nil {
  215. return c, fmt.Errorf("while reading a record: %w", err)
  216. }
  217. schedule.ScheduleID = record[fields["service_id"]]
  218. startDate := record[fields["start_date"]]
  219. endDate := record[fields["end_date"]]
  220. schedule.StartDate = startDate[:4] + "-" + startDate[4:6] + "-" + startDate[6:]
  221. schedule.EndDate = endDate[:4] + "-" + endDate[4:6] + "-" + endDate[6:]
  222. if record[fields["monday"]] == "1" {
  223. schedule.Weekdays |= (1 << 1)
  224. }
  225. if record[fields["tuesday"]] == "1" {
  226. schedule.Weekdays |= (1 << 2)
  227. }
  228. if record[fields["wednesday"]] == "1" {
  229. schedule.Weekdays |= (1 << 3)
  230. }
  231. if record[fields["thursday"]] == "1" {
  232. schedule.Weekdays |= (1 << 4)
  233. }
  234. if record[fields["friday"]] == "1" {
  235. schedule.Weekdays |= (1 << 5)
  236. }
  237. if record[fields["saturday"]] == "1" {
  238. schedule.Weekdays |= (1 << 6)
  239. }
  240. if record[fields["sunday"]] == "1" {
  241. schedule.Weekdays |= (1 << 0)
  242. schedule.Weekdays |= (1 << 7)
  243. }
  244. bytes, err := bare.Marshal(&schedule)
  245. if err != nil {
  246. return c, fmt.Errorf("while marshalling: %w", err)
  247. }
  248. _, err = resultFile.Write(bytes)
  249. if err != nil {
  250. return c, fmt.Errorf("while writing: %w", err)
  251. }
  252. }
  253. return c, nil
  254. }
  255. func convertCalendarDates(c feedConverter) (feedConverter, error) {
  256. path := c.TmpFeedPath
  257. resultFile := c.TrafficCalendarFile
  258. datesFile, err := os.Open(filepath.Join(path, "calendar_dates.txt"))
  259. if err != nil {
  260. return c, fmt.Errorf("while opening file: %w", err)
  261. }
  262. defer datesFile.Close()
  263. r := csv.NewReader(bufio.NewReader(datesFile))
  264. header, err := r.Read()
  265. if err != nil {
  266. return c, fmt.Errorf("while reading header: %w", err)
  267. }
  268. fields := map[string]int{}
  269. for i, headerField := range header {
  270. fields[headerField] = i
  271. }
  272. for {
  273. schedule := Schedule{}
  274. record, err := r.Read()
  275. if err == io.EOF {
  276. break
  277. }
  278. if err != nil {
  279. return c, fmt.Errorf("while reading a record: %w", err)
  280. }
  281. if record[fields["exception_type"]] == "1" {
  282. date := record[fields["date"]]
  283. schedule.ScheduleID = record[fields["service_id"]]
  284. schedule.StartDate = date[:4] + "-" + date[4:6] + "-" + date[6:]
  285. schedule.EndDate = date[:4] + "-" + date[4:6] + "-" + date[6:]
  286. schedule.Weekdays = 0xff
  287. } else {
  288. continue
  289. }
  290. bytes, err := bare.Marshal(&schedule)
  291. if err != nil {
  292. return c, fmt.Errorf("while marshalling: %w", err)
  293. }
  294. _, err = resultFile.Write(bytes)
  295. if err != nil {
  296. return c, fmt.Errorf("while writing: %w", err)
  297. }
  298. }
  299. return c, nil
  300. }
  301. func checkAnyCalendarConverted(c feedConverter) error {
  302. info, err := c.TrafficCalendarFile.Stat()
  303. if err == nil && info.Size() == 0 {
  304. return fmt.Errorf("no calendar converted")
  305. }
  306. return err
  307. }
  308. func closeTrafficCalendarFile(c feedConverter, e error) (feedConverter, error) {
  309. if c.TrafficCalendarFile != nil {
  310. c.TrafficCalendarFile.Close()
  311. }
  312. return c, e
  313. }
  314. func convertDepartures(c feedConverter) (feedConverter, error) { // O(n:stop_times) ; ( -- departures:map[tripID][]departure, tripsThroughStop:map[stopID][]{tripID,order} >> )
  315. path := c.TmpFeedPath
  316. file, err := os.Open(filepath.Join(path, "stop_times.txt"))
  317. if err != nil {
  318. return c, fmt.Errorf("while opening file: %w", err)
  319. }
  320. defer file.Close()
  321. departures := map[string][]Departure{}
  322. r := csv.NewReader(bufio.NewReader(file))
  323. header, err := r.Read()
  324. if err != nil {
  325. return c, fmt.Errorf("while reading header: %w", err)
  326. }
  327. fields := map[string]int{}
  328. for i, headerField := range header {
  329. fields[headerField] = i
  330. }
  331. tripsThroughStop := map[string][]StopOrder{}
  332. for {
  333. departure := Departure{}
  334. record, err := r.Read()
  335. if err == io.EOF {
  336. break
  337. }
  338. if err != nil {
  339. return c, fmt.Errorf("while reading a record: %w", err)
  340. }
  341. stopID := record[fields["stop_id"]]
  342. tripID := record[fields["trip_id"]]
  343. fmt.Sscanf(record[fields["stop_sequence"]], "%d", &departure.StopSeq)
  344. fmt.Sscanf(record[fields["pickup_type"]], "%d", &departure.Pickup)
  345. fmt.Sscanf(record[fields["drop_off_type"]], "%d", &departure.Dropoff)
  346. tripsThroughStop[stopID] = append(tripsThroughStop[stopID], StopOrder{
  347. TripID: tripID,
  348. Order: departure.StopSeq,
  349. })
  350. var hours, minutes uint
  351. fmt.Sscanf(record[fields["arrival_time"]], "%d:%d", &hours, &minutes)
  352. departure.Time = hours*60 + minutes
  353. departures[tripID] = append(departures[tripID], departure)
  354. }
  355. c.Departures = departures
  356. c.TripsThroughStop = tripsThroughStop
  357. return c, nil
  358. }
  359. func getLineNames(c feedConverter) (feedConverter, error) { // O(n:routes) ; ( -- lineNames:map[routeID]lineName >> )
  360. path := c.TmpFeedPath
  361. file, err := os.Open(filepath.Join(path, "routes.txt"))
  362. if err != nil {
  363. return c, fmt.Errorf("while opening file: %w", err)
  364. }
  365. defer file.Close()
  366. r := csv.NewReader(bufio.NewReader(file))
  367. header, err := r.Read()
  368. if err != nil {
  369. return c, fmt.Errorf("while reading header: %w", err)
  370. }
  371. fields := map[string]int{}
  372. for i, headerField := range header {
  373. fields[headerField] = i
  374. }
  375. names := map[string]string{}
  376. for {
  377. record, err := r.Read()
  378. if err == io.EOF {
  379. break
  380. }
  381. if err != nil {
  382. return c, fmt.Errorf("while reading a record: %w", err)
  383. }
  384. routeID := record[fields["route_id"]]
  385. names[routeID] = record[fields["route_short_name"]]
  386. }
  387. c.LineNames = names
  388. return c, nil
  389. }
  390. func convertTrips(c feedConverter) (feedConverter, error) { // O(n:trips) ; (departures, lineNames -- tripsOffsets:map[tripID]offset, tripsChangeOpts:map[tripID]{lineID,headsign} >> trips)
  391. path := c.TmpFeedPath
  392. departures := c.Departures
  393. lineNames := c.LineNames
  394. file, err := os.Open(filepath.Join(path, "trips.txt"))
  395. if err != nil {
  396. return c, fmt.Errorf("while opening file: %w", err)
  397. }
  398. defer file.Close()
  399. result, err := os.Create(filepath.Join(path, "trips.bare"))
  400. if err != nil {
  401. return c, fmt.Errorf("while creating file: %w", err)
  402. }
  403. defer result.Close()
  404. r := csv.NewReader(bufio.NewReader(file))
  405. header, err := r.Read()
  406. if err != nil {
  407. return c, fmt.Errorf("while reading header: %w", err)
  408. }
  409. fields := map[string]int{}
  410. for i, headerField := range header {
  411. fields[headerField] = i
  412. }
  413. var offset uint = 0
  414. tripsOffsets := map[string]uint{}
  415. tripChangeOpts := map[string]ChangeOption{}
  416. for {
  417. trip := Trip{}
  418. record, err := r.Read()
  419. if err == io.EOF {
  420. break
  421. }
  422. if err != nil {
  423. return c, fmt.Errorf("while reading a record: %w", err)
  424. }
  425. trip.ID = record[fields["trip_id"]]
  426. trip.Headsign = record[fields["trip_headsign"]]
  427. trip.Departures = departures[trip.ID]
  428. trip.ScheduleID = record[fields["service_id"]]
  429. trip.LineName = lineNames[record[fields["route_id"]]]
  430. fmt.Sscanf(record[fields["direction_id"]], "%d", &trip.Direction)
  431. tripChangeOpts[trip.ID] = ChangeOption{
  432. LineID: record[fields["route_id"]],
  433. Headsign: trip.Headsign,
  434. }
  435. bytes, err := bare.Marshal(&trip)
  436. if err != nil {
  437. return c, fmt.Errorf("while marshalling: %w", err)
  438. }
  439. b, err := result.Write(bytes)
  440. if err != nil {
  441. return c, fmt.Errorf("while writing: %w", err)
  442. }
  443. tripsOffsets[trip.ID] = offset
  444. offset += uint(b)
  445. }
  446. c.TripsOffsets = tripsOffsets
  447. c.TripChangeOpts = tripChangeOpts
  448. return c, nil
  449. }
  450. func convertStops(c feedConverter) (feedConverter, error) { // O(n:stops) ; (tripsThroughStop, lineNames, tripChangeOpts, tripOffsets -- stopsOffsetsByCode:CodeIndex, stopsOffsetsByName:map[name][]offsets >> stops)
  451. path := c.TmpFeedPath
  452. tripsThroughStop := c.TripsThroughStop
  453. lineNames := c.LineNames
  454. tripChangeOpts := c.TripChangeOpts
  455. tripsOffsets := c.TripsOffsets
  456. file, err := os.Open(filepath.Join(path, "stops.txt"))
  457. if err != nil {
  458. return c, fmt.Errorf("while opening file: %w", err)
  459. }
  460. defer file.Close()
  461. result, err := os.Create(filepath.Join(path, "stops.bare"))
  462. if err != nil {
  463. return c, fmt.Errorf("while creating file: %w", err)
  464. }
  465. defer result.Close()
  466. r := csv.NewReader(bufio.NewReader(file))
  467. header, err := r.Read()
  468. if err != nil {
  469. return c, fmt.Errorf("while reading header: %w", err)
  470. }
  471. fields := map[string]int{}
  472. for i, headerField := range header {
  473. fields[headerField] = i
  474. }
  475. var offset uint = 0
  476. stopsOffsetsByName := map[string][]uint{}
  477. stopsOffsetsByCode := CodeIndex{}
  478. stops := map[string]Stop{}
  479. for {
  480. stop := Stop{}
  481. record, err := r.Read()
  482. if err == io.EOF {
  483. break
  484. }
  485. if err != nil {
  486. return c, fmt.Errorf("while reading a record: %w", err)
  487. }
  488. stopID := record[fields["stop_id"]]
  489. stopTrips := tripsThroughStop[stopID]
  490. stop.ID = stopID
  491. stop.Code = record[fields["stop_code"]]
  492. if stop.Code == "" {
  493. stop.Code = stopID
  494. }
  495. stop.Name = record[fields["stop_name"]]
  496. stop.Zone = record[fields["zone_id"]]
  497. stops[record[fields["stop_id"]]] = stop
  498. if field, ok := fields["stop_timezone"]; ok {
  499. stop.Timezone = record[field]
  500. }
  501. var lat, lon float64
  502. fmt.Sscanf(record[fields["stop_lat"]], "%f", &lat)
  503. fmt.Sscanf(record[fields["stop_lon"]], "%f", &lon)
  504. stop.Coordinates = Position{lat, lon}
  505. changeOptionMap := map[string]ChangeOption{}
  506. stop.ChangeOptions = []ChangeOption{}
  507. stop.Order = []StopOrder{}
  508. for _, stopTrip := range stopTrips {
  509. changeOption := ChangeOption{
  510. LineName: lineNames[tripChangeOpts[stopTrip.TripID].LineID],
  511. Headsign: tripChangeOpts[stopTrip.TripID].Headsign,
  512. }
  513. stopOrder := StopOrder{
  514. TripOffset: tripsOffsets[stopTrip.TripID],
  515. Order: stopTrip.Order,
  516. }
  517. stop.Order = append(stop.Order, stopOrder)
  518. changeOptionMap[changeOption.LineName+"->"+changeOption.Headsign] = changeOption
  519. }
  520. for _, option := range changeOptionMap {
  521. stop.ChangeOptions = append(stop.ChangeOptions, option)
  522. }
  523. sort.Slice(stop.ChangeOptions, func(i, j int) bool {
  524. var num1, num2 int
  525. _, err1 := fmt.Sscanf(stop.ChangeOptions[i].LineName, "%d", &num1)
  526. _, err2 := fmt.Sscanf(stop.ChangeOptions[j].LineName, "%d", &num2)
  527. if err1 != nil && err2 != nil {
  528. return stop.ChangeOptions[i].LineName < stop.ChangeOptions[j].LineName
  529. } else if err1 != nil {
  530. return false
  531. } else if err2 != nil {
  532. return true
  533. } else {
  534. return num1 < num2
  535. }
  536. })
  537. bytes, err := bare.Marshal(&stop)
  538. if err != nil {
  539. return c, fmt.Errorf("while marshalling: %w", err)
  540. }
  541. b, err := result.Write(bytes)
  542. if err != nil {
  543. return c, fmt.Errorf("while writing: %w", err)
  544. }
  545. stopsOffsetsByName[stop.Name] = append(stopsOffsetsByName[stop.Name], offset)
  546. stopsOffsetsByCode[ID(stop.Code)] = offset
  547. offset += uint(b)
  548. }
  549. c.StopsCodeIndex = stopsOffsetsByCode
  550. c.StopsNameIndex = stopsOffsetsByName
  551. c.Stops = stops
  552. return c, nil
  553. }
  554. func convertLineGraphs(c feedConverter) (feedConverter, error) { // O(n:stop_times) ; (tripsOffsets, stops -- lineGrapsh:map[lineName]map[direction]graph >> )
  555. path := c.TmpFeedPath
  556. tripsOffsets := c.TripsOffsets
  557. stops := c.Stops
  558. file, err := os.Open(filepath.Join(path, "stop_times.txt"))
  559. if err != nil {
  560. return c, fmt.Errorf("while opening stop_times: %w", err)
  561. }
  562. defer file.Close()
  563. trips, err := os.Open(filepath.Join(path, "trips.bare"))
  564. if err != nil {
  565. return c, fmt.Errorf("while opening trips: %w", err)
  566. }
  567. defer trips.Close()
  568. r := csv.NewReader(bufio.NewReader(file))
  569. header, err := r.Read()
  570. if err != nil {
  571. return c, fmt.Errorf("while reading header: %w", err)
  572. }
  573. fields := map[string]int{}
  574. for i, headerField := range header {
  575. fields[headerField] = i
  576. }
  577. // lineNa dire
  578. graphs := map[string]map[uint]LineGraph{}
  579. previousTripID := ""
  580. previous := -1
  581. previousTrip := Trip{}
  582. for {
  583. record, err := r.Read()
  584. if err == io.EOF {
  585. break
  586. }
  587. if err != nil {
  588. return c, fmt.Errorf("while reading a record: %w", err)
  589. }
  590. tripID := record[fields["trip_id"]]
  591. stop := stops[record[fields["stop_id"]]]
  592. tripOffset := tripsOffsets[tripID]
  593. _, err = trips.Seek(int64(tripOffset), 0)
  594. if err != nil {
  595. return c, fmt.Errorf("while seeking: %w", err)
  596. }
  597. trip := unmarshalTripFromFile(trips)
  598. if _, ok := graphs[trip.LineName]; !ok {
  599. graphs[trip.LineName] = map[uint]LineGraph{}
  600. graphs[trip.LineName][0] = LineGraph{
  601. NextNodes: map[int][]int{},
  602. PrevNodes: map[int][]int{},
  603. }
  604. graphs[trip.LineName][1] = LineGraph{
  605. NextNodes: map[int][]int{},
  606. PrevNodes: map[int][]int{},
  607. }
  608. }
  609. if previousTripID != tripID {
  610. if previousTripID != "" {
  611. // last of previous trip
  612. previousGraph := graphs[previousTrip.LineName][previousTrip.Direction]
  613. connectionDone := false
  614. for _, n := range previousGraph.NextNodes[previous] {
  615. if n == -1 {
  616. connectionDone = true
  617. }
  618. }
  619. if !connectionDone {
  620. previousGraph.NextNodes[previous] = append(previousGraph.NextNodes[previous], -1)
  621. }
  622. graphs[previousTrip.LineName][previousTrip.Direction] = previousGraph
  623. }
  624. }
  625. graph := graphs[trip.LineName][trip.Direction]
  626. current := -1
  627. for i, code := range graph.StopCodes {
  628. if code == stop.Code {
  629. current = i
  630. break
  631. }
  632. }
  633. if current == -1 {
  634. current = len(graph.StopCodes)
  635. graph.StopCodes = append(graph.StopCodes, stop.Code)
  636. }
  637. if previousTripID != tripID {
  638. // first of current trip
  639. connectionDone := false
  640. for _, n := range graph.PrevNodes[current] {
  641. if n == -1 {
  642. connectionDone = true
  643. }
  644. }
  645. if !connectionDone {
  646. graph.PrevNodes[current] = append(graph.PrevNodes[current], -1)
  647. }
  648. } else {
  649. // second <- first to last <- penultimate of current trip
  650. connectionDone := false
  651. for _, n := range graph.NextNodes[previous] {
  652. if n == current {
  653. connectionDone = true
  654. }
  655. }
  656. if !connectionDone {
  657. graph.NextNodes[previous] = append(graph.NextNodes[previous], current)
  658. graph.PrevNodes[current] = append(graph.PrevNodes[current], previous)
  659. }
  660. }
  661. previous = current
  662. previousTripID = tripID
  663. previousTrip = trip
  664. graphs[trip.LineName][trip.Direction] = graph
  665. }
  666. g := graphs[previousTrip.LineName][previousTrip.Direction]
  667. connectionDone := false
  668. for _, n := range g.NextNodes[previous] {
  669. if n == -1 {
  670. connectionDone = true
  671. }
  672. }
  673. if !connectionDone {
  674. g.NextNodes[previous] = append(g.NextNodes[previous], -1)
  675. }
  676. c.LineGraphs = graphs
  677. return c, nil
  678. }
  679. func convertLines(c feedConverter) (feedConverter, error) { // O(n:routes) ; (lineGraphs -- lineIndex:map[lineName][]offsets >> lines)
  680. path := c.TmpFeedPath
  681. feed := c.Feed
  682. file, err := os.Open(filepath.Join(path, "routes.txt"))
  683. if err != nil {
  684. return c, fmt.Errorf("while opening file: %w", err)
  685. }
  686. defer file.Close()
  687. result, err := os.Create(filepath.Join(path, "lines.bare"))
  688. if err != nil {
  689. return c, fmt.Errorf("while creating file: %w", err)
  690. }
  691. defer result.Close()
  692. r := csv.NewReader(bufio.NewReader(file))
  693. header, err := r.Read()
  694. if err != nil {
  695. return c, fmt.Errorf("while reading header: %w", err)
  696. }
  697. fields := map[string]int{}
  698. for i, headerField := range header {
  699. fields[headerField] = i
  700. }
  701. names := map[string]string{}
  702. var offset uint = 0
  703. index := map[string][]uint{}
  704. for {
  705. record, err := r.Read()
  706. if err == io.EOF {
  707. break
  708. }
  709. if err != nil {
  710. return c, fmt.Errorf("while reading a record: %w", err)
  711. }
  712. routeID := record[fields["route_id"]]
  713. names[routeID] = record[fields["route_short_name"]]
  714. lineName := record[fields["route_short_name"]]
  715. var kind uint
  716. fmt.Sscanf(record[fields["route_type"]], "%d", &kind)
  717. var (
  718. colour string = record[fields["route_color"]]
  719. )
  720. if colour == "" {
  721. colour = "ffffff"
  722. }
  723. line := Line{
  724. ID: routeID,
  725. Name: lineName,
  726. Colour: hex2colour(colour),
  727. Type: kind,
  728. GraphThere: c.LineGraphs[lineName][0],
  729. GraphBack: c.LineGraphs[lineName][1],
  730. }
  731. if field, present := fields["agency_id"]; present {
  732. line.AgencyID = ID(record[field])
  733. }
  734. bytes, err := bare.Marshal(&line)
  735. if err != nil {
  736. return c, fmt.Errorf("while marshalling: %w", err)
  737. }
  738. b, err := result.Write(bytes)
  739. if err != nil {
  740. return c, fmt.Errorf("while writing: %w", err)
  741. }
  742. cleanQuery, err := CleanQuery(line.Name, feed)
  743. if err != nil {
  744. return c, fmt.Errorf("while cleaning line name: %w", err)
  745. }
  746. index[cleanQuery] = []uint{offset}
  747. offset += uint(b)
  748. }
  749. c.LineIndex = index
  750. return c, nil
  751. }
  752. func convertFeedInfo(c feedConverter) error { // O(1:feed_info) ; ( -- >> feed_info)
  753. path := c.TmpFeedPath
  754. result, err := os.Create(filepath.Join(path, "feed_info.bare"))
  755. if err != nil {
  756. return fmt.Errorf("while creating file: %w", err)
  757. }
  758. defer result.Close()
  759. file, err := os.Open(filepath.Join(path, "feed_info.txt"))
  760. if err != nil {
  761. if errors.Is(err, fs.ErrNotExist) {
  762. log.Println("[WARN] no feed_info.txt")
  763. return nil
  764. }
  765. return fmt.Errorf("while opening file: %w", err)
  766. }
  767. defer file.Close()
  768. r := csv.NewReader(bufio.NewReader(file))
  769. header, err := r.Read()
  770. if err != nil {
  771. return fmt.Errorf("while reading header: %w", err)
  772. }
  773. fields := map[string]int{}
  774. for i, headerField := range header {
  775. fields[headerField] = i
  776. }
  777. feedInfo := FeedInfo{}
  778. record, err := r.Read()
  779. if err != nil {
  780. return fmt.Errorf("while reading a record: %w", err)
  781. }
  782. feedInfo.Name = record[fields["feed_publisher_name"]]
  783. feedInfo.Website = record[fields["feed_publisher_url"]]
  784. feedInfo.Language = record[fields["feed_lang"]]
  785. bytes, err := bare.Marshal(&feedInfo)
  786. if err != nil {
  787. return fmt.Errorf("while marshalling: %w", err)
  788. }
  789. _, err = result.Write(bytes)
  790. if err != nil {
  791. return fmt.Errorf("while writing: %w", err)
  792. }
  793. return nil
  794. }
  795. func convertAgencies(c feedConverter) error { // O(n:agency) ; ( -- >> agencies)
  796. path := c.TmpFeedPath
  797. file, err := os.Open(filepath.Join(path, "agency.txt"))
  798. if err != nil {
  799. return fmt.Errorf("while opening file: %w", err)
  800. }
  801. defer file.Close()
  802. result, err := os.Create(filepath.Join(path, "agencies.bare"))
  803. if err != nil {
  804. return fmt.Errorf("while creating file: %w", err)
  805. }
  806. defer file.Close()
  807. r := csv.NewReader(bufio.NewReader(file))
  808. header, err := r.Read()
  809. if err != nil {
  810. return fmt.Errorf("while reading header: %w", err)
  811. }
  812. fields := map[string]int{}
  813. for i, headerField := range header {
  814. fields[headerField] = i
  815. }
  816. for {
  817. record, err := r.Read()
  818. if err == io.EOF {
  819. break
  820. }
  821. if err != nil {
  822. return fmt.Errorf("while reading a record: %w", err)
  823. }
  824. agency := Agency{
  825. ID: ID(record[fields["agency_id"]]),
  826. Name: record[fields["agency_name"]],
  827. Website: record[fields["agency_url"]],
  828. Timezone: record[fields["agency_timezone"]],
  829. }
  830. if field, present := fields["agency_lang"]; present {
  831. agency.Language = record[field]
  832. }
  833. if field, present := fields["agency_phone"]; present {
  834. agency.PhoneNumber = record[field]
  835. }
  836. if field, present := fields["agency_fare_url"]; present {
  837. agency.FareWebsite = record[field]
  838. }
  839. if field, present := fields["agency_email"]; present {
  840. agency.Email = record[field]
  841. }
  842. bytes, err := bare.Marshal(&agency)
  843. if err != nil {
  844. return fmt.Errorf("while marshalling: %w", err)
  845. }
  846. _, err = result.Write(bytes)
  847. if err != nil {
  848. return fmt.Errorf("while writing: %w", err)
  849. }
  850. }
  851. return nil
  852. }
  853. func writeNameIndex(c feedConverter, index map[string][]uint, filename string, raw bool) error {
  854. path := c.TmpFeedPath
  855. feed := c.Feed
  856. result, err := os.Create(filepath.Join(path, filename))
  857. if err != nil {
  858. return fmt.Errorf("while creating file: %w", err)
  859. }
  860. defer result.Close()
  861. for name, offsets := range index {
  862. cleanQuery := name
  863. if !raw {
  864. cleanQuery, err = CleanQuery(name, feed)
  865. if err != nil {
  866. return fmt.Errorf("while cleaning name %s: %w", name, err)
  867. }
  868. }
  869. stopOffset := NameOffset{
  870. Name: cleanQuery,
  871. Offsets: offsets,
  872. }
  873. bytes, err := bare.Marshal(&stopOffset)
  874. if err != nil {
  875. return fmt.Errorf("while marshalling: %w", err)
  876. }
  877. _, err = result.Write(bytes)
  878. if err != nil {
  879. return fmt.Errorf("while writing: %w", err)
  880. }
  881. }
  882. return nil
  883. }
  884. func writeStopNameIndex(c feedConverter) error {
  885. return writeNameIndex(c, c.StopsNameIndex, "ix_stop_names.bare", false)
  886. }
  887. func writeLineIndex(c feedConverter) error {
  888. return writeNameIndex(c, c.LineIndex, "ix_lines.bare", false)
  889. }
  890. func writeTripIndex(c feedConverter) error {
  891. tripIndex := map[string][]uint{}
  892. for trip, offset := range c.TripsOffsets {
  893. tripIndex[trip] = []uint{offset}
  894. }
  895. return writeNameIndex(c, tripIndex, "ix_trips.bare", true)
  896. }
  897. func writeStopCodeIndex(c feedConverter) error {
  898. path := c.TmpFeedPath
  899. stopsOffsetsByCode := c.StopsCodeIndex
  900. result, err := os.Create(filepath.Join(path, "ix_stop_codes.bare"))
  901. if err != nil {
  902. return fmt.Errorf("while creating file: %w", err)
  903. }
  904. defer result.Close()
  905. bytes, err := bare.Marshal(&stopsOffsetsByCode)
  906. if err != nil {
  907. return fmt.Errorf("while marshalling: %w", err)
  908. }
  909. _, err = result.Write(bytes)
  910. if err != nil {
  911. return fmt.Errorf("while writing: %w", err)
  912. }
  913. return nil
  914. }
  915. func deleteTxtFiles(c feedConverter) error {
  916. return file.DeleteTxtFiles(c.TmpFeedPath, c.GtfsFilename)
  917. }
  918. func compressTraffic(c feedConverter) error {
  919. return file.CompressBare(c.TmpFeedPath, c.GtfsFilename)
  920. }
  921. func deleteBareFiles(c feedConverter) error {
  922. return file.DeleteBareFiles(c.TmpFeedPath)
  923. }
  924. func moveTraffic(c feedConverter) error {
  925. // todo(BAF25) filename take from calendar
  926. return file.MoveTraffic(c.GtfsFilename, c.TmpFeedPath, c.HomeFeedPath)
  927. }
  928. func convert(input ...interface{}) (interface{}, error) {
  929. args := input[0].(result)
  930. for _, gtfsFile := range args.gtfsFilenames {
  931. r := gott2.R[feedConverter]{
  932. S: feedConverter{
  933. TmpFeedPath: args.tmpFeedPath,
  934. GtfsFilename: gtfsFile,
  935. Feed: args.feed,
  936. HomeFeedPath: args.homeFeedPath,
  937. },
  938. LogLevel: gott2.Debug,
  939. }
  940. r = r.
  941. Tee(unzipGtfs).
  942. Tee(convertVehicles).
  943. Bind(createTrafficCalendarFile).
  944. Bind(convertCalendar).
  945. Recover(recoverCalendar).
  946. Bind(convertCalendarDates).
  947. Recover(recoverCalendar).
  948. Tee(checkAnyCalendarConverted).
  949. Recover(closeTrafficCalendarFile).
  950. Bind(convertDepartures).
  951. Bind(getLineNames).
  952. Bind(convertTrips).
  953. Bind(convertStops).
  954. Bind(convertLineGraphs).
  955. Bind(convertLines).
  956. Tee(convertFeedInfo).
  957. Tee(convertAgencies).
  958. Tee(writeStopNameIndex).
  959. Tee(writeLineIndex).
  960. Tee(writeStopCodeIndex).
  961. Tee(writeTripIndex).
  962. Tee(deleteTxtFiles).
  963. Tee(compressTraffic).
  964. Tee(deleteBareFiles).
  965. Tee(moveTraffic)
  966. if r.E != nil {
  967. return gott.Tuple{args}, r.E
  968. }
  969. }
  970. return gott.Tuple{args}, nil
  971. }
  972. func signal(input ...interface{}) (interface{}, error) {
  973. args := input[0].(result)
  974. if len(args.gtfsFilenames) > 0 && args.pid > 0 {
  975. process, err := os.FindProcess(args.pid)
  976. if err != nil {
  977. return gott.Tuple{args}, err
  978. }
  979. err = process.Signal(syscall.SIGUSR1)
  980. if err != nil {
  981. return gott.Tuple{args}, err
  982. }
  983. }
  984. return gott.Tuple{args}, nil
  985. }
  986. func openLastUpdated(input ...interface{}) (interface{}, error) {
  987. args := input[0].(result)
  988. updatesFilename := filepath.Join(args.config.FeedsPath, "updated.bare")
  989. var err error
  990. args.updatesFile, err = os.OpenFile(updatesFilename, os.O_RDWR|os.O_CREATE, 0644)
  991. return gott.Tuple{args}, err
  992. }
  993. func isEmpty(input ...interface{}) error {
  994. args := input[0].(result)
  995. stat, err := os.Stat(args.updatesFile.Name())
  996. if err != nil {
  997. return err
  998. }
  999. if stat.Size() == 0 {
  1000. return ErrEmpty{}
  1001. }
  1002. return nil
  1003. }
  1004. func unmarshalLastUpdated(input ...interface{}) (interface{}, error) {
  1005. args := input[0].(result)
  1006. var lastUpdated map[string]string
  1007. err := bare.UnmarshalReader(args.updatesFile, &lastUpdated)
  1008. args.updates = lastUpdated
  1009. return gott.Tuple{args}, err
  1010. }
  1011. func recoverEmpty(input ...interface{}) (interface{}, error) {
  1012. args := input[0].(result)
  1013. err := input[1].(error)
  1014. var emptyError ErrEmpty
  1015. if errors.As(err, &emptyError) {
  1016. return gott.Tuple{args}, nil
  1017. } else {
  1018. return gott.Tuple{args}, err
  1019. }
  1020. }
  1021. func lastUpdated(input ...interface{}) interface{} {
  1022. args := input[0].(result)
  1023. args.updates[args.feed.String()] = time.Now().Format(time.RFC3339)
  1024. return gott.Tuple{args}
  1025. }
  1026. func marshalLastUpdated(input ...interface{}) error {
  1027. args := input[0].(result)
  1028. err := bare.MarshalWriter(bare.NewWriter(args.updatesFile), &args.updates)
  1029. return err
  1030. }
  1031. func Prepare(cfg config.Config, t Traffic, bimbaPid int) error { // todo(BAF18) remove pid
  1032. for _, feed := range t.Feeds {
  1033. r := gott.Tuple{result{
  1034. config: cfg,
  1035. pid: bimbaPid,
  1036. tmpPath: os.TempDir(),
  1037. feed: feed,
  1038. feedName: feed.String(),
  1039. location: feed.GetLocation(),
  1040. updates: map[string]string{},
  1041. }}
  1042. _, err := gott.NewResult(r).
  1043. SetLevelLog(gott.Debug).
  1044. Bind(createTmpPath).
  1045. Bind(createFeedHome).
  1046. Bind(listDownloadedVersions).
  1047. Bind(getAllVersions).
  1048. Map(findValidVersions).
  1049. Bind(getGtfsFiles).
  1050. Bind(convert).
  1051. Bind(signal).
  1052. Bind(openLastUpdated).
  1053. Tee(isEmpty).
  1054. Bind(unmarshalLastUpdated).
  1055. Recover(recoverEmpty).
  1056. Map(lastUpdated).
  1057. Tee(marshalLastUpdated).
  1058. Finish()
  1059. if err != nil {
  1060. log.Printf("Error converting %s: %v\n", feed.String(), err)
  1061. }
  1062. }
  1063. return nil
  1064. }