access.go 34 KB


  1. package traffic
  2. import (
  3. "apiote.xyz/p/szczanieckiej/config"
  4. "apiote.xyz/p/szczanieckiej/file"
  5. "apiote.xyz/p/szczanieckiej/gtfs_rt"
  6. traffic_errors "apiote.xyz/p/szczanieckiej/traffic/errors"
  7. "apiote.xyz/p/szczanieckiej/transformers"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "log"
  12. "net"
  13. "os"
  14. "path/filepath"
  15. "sort"
  16. "strconv"
  17. "strings"
  18. "time"
  19. "golang.org/x/text/language"
  20. "golang.org/x/text/runes"
  21. "golang.org/x/text/transform"
  22. "git.sr.ht/~sircmpwn/go-bare"
  23. "github.com/dhconnelly/rtreego"
  24. "github.com/sahilm/fuzzy"
  25. "notabug.org/apiote/gott"
  26. )
  27. type OlcError struct {
  28. Value string
  29. Err error
  30. }
  31. func (e OlcError) Error() string {
  32. return e.Err.Error()
  33. }
  34. type _Result struct {
  35. Filename string
  36. Offset uint
  37. Date time.Time
  38. LineName string
  39. TimetableHome string
  40. Calendar []Schedule
  41. DeparturesType DeparturesType
  42. Vehicles Vehicles
  43. Feed Feed
  44. Location *time.Location
  45. Datetime time.Time
  46. MinuteB4Datetime time.Time
  47. Midnight time.Time
  48. TodaySchedule map[string]struct{}
  49. YesterdaySchedule map[string]struct{}
  50. file *os.File
  51. TripsFile *os.File
  52. Trips map[string]Trip
  53. Departures []DepartureRealtime
  54. Stop Stop
  55. Line Line
  56. Trip Trip
  57. FeedInfo FeedInfo
  58. }
  59. var lastUpdatedGtfsRt = map[string]uint64{}
  60. func isTimeout(err error) bool {
  61. var e net.Error
  62. return errors.As(err, &e) && e.Timeout()
  63. }
  64. func CleanQuery(query string, feed Feed) (string, error) {
  65. t := transform.Chain(runes.Remove(runes.Predicate(transformers.IsNonAlphanum)), feed.Transformer())
  66. queryCleaned, _, err := transform.String(t, query)
  67. return strings.ToLower(queryCleaned), err
  68. }
  69. func findSchedule(home string, time time.Time, calendar []Schedule) (map[string]struct{},
  70. error) {
  71. schedules := map[string]struct{}{}
  72. weekday := uint8(1 << time.Weekday())
  73. date := time.Format(DateFormat)
  74. for _, schedule := range calendar {
  75. if schedule.StartDate <= date && date <= schedule.EndDate &&
  76. (schedule.Weekdays&weekday != 0) {
  77. schedules[schedule.Id] = struct{}{}
  78. }
  79. }
  80. var err error
  81. if len(schedules) == 0 {
  82. err = traffic_errors.NoSchedule{Date: date}
  83. }
  84. return schedules, err
  85. }
  86. func getRealtimeOffset(tripID string, stopSequence int,
  87. feed Feed) (gtfs_rt.Update, error) {
  88. updates, lastUpdated, err := gtfs_rt.GetRt(lastUpdatedGtfsRt[feed.String()],
  89. feed.RealtimeFeeds(), feed.String())
  90. if err != nil {
  91. return gtfs_rt.Update{}, err
  92. }
  93. lastUpdatedGtfsRt[feed.String()] = lastUpdated
  94. update := updates[tripID]
  95. return update, nil
  96. }
  97. func getRealtimeUpdates(feed Feed) (map[string]gtfs_rt.Update, error) {
  98. updates, lastUpdated, err := gtfs_rt.GetRt(lastUpdatedGtfsRt[feed.String()],
  99. feed.RealtimeFeeds(), feed.String())
  100. if err != nil {
  101. return map[string]gtfs_rt.Update{}, err
  102. }
  103. lastUpdatedGtfsRt[feed.String()] = lastUpdated
  104. return updates, nil
  105. }
  106. func calculateGtfsTime(gtfsTime uint, delay int32, date time.Time,
  107. timezone *time.Location) (time.Time, error) {
  108. noon := time.Date(date.Year(), date.Month(), date.Day(), 12, 0, 0, 0,
  109. timezone)
  110. twelve, _ := time.ParseDuration("-12h")
  111. midnight := noon.Add(twelve)
  112. departureDuration, err := time.ParseDuration(
  113. strconv.FormatInt(int64(gtfsTime), 10) + "m")
  114. if err != nil {
  115. return midnight, err
  116. }
  117. delayDuration, err := time.ParseDuration(strconv.FormatInt(int64(delay),
  118. 10) + "s")
  119. if err != nil {
  120. return midnight, err
  121. }
  122. t := midnight.Add(departureDuration).Add(delayDuration)
  123. return t, nil
  124. }
  125. func loadLocation(input ...interface{}) (interface{}, error) {
  126. result := input[0].(_Result)
  127. var err error = nil
  128. result.Location, err = GetTimezone(result.Stop, result.Feed)
  129. return result, err
  130. }
  131. func loadTime(input ...interface{}) interface{} {
  132. result := input[0].(_Result)
  133. deadzone, _ := time.ParseDuration("-1m")
  134. now := time.Now()
  135. datetime := time.Date(result.Date.Year(), result.Date.Month(),
  136. result.Date.Day(), now.Hour(), now.Minute(), now.Second(), 0,
  137. result.Location)
  138. result.Datetime = datetime
  139. result.MinuteB4Datetime = datetime.Add(deadzone)
  140. result.Midnight = time.Date(datetime.Year(), datetime.Month(),
  141. datetime.Day(), 0, 0, 0, 0, result.Location)
  142. return result
  143. }
  144. func loadTodaySchedule(input ...interface{}) (interface{}, error) {
  145. result := input[0].(_Result)
  146. todaySchedule, err := findSchedule(result.TimetableHome, result.Date,
  147. result.Calendar)
  148. result.TodaySchedule = todaySchedule
  149. return result, err
  150. }
  151. func loadYesterdaySchedule(input ...interface{}) (interface{}, error) {
  152. result := input[0].(_Result)
  153. yesterday := result.Date.AddDate(0, 0, -1)
  154. yesterdaySchedule, err := findSchedule(result.TimetableHome, yesterday,
  155. result.Calendar)
  156. result.YesterdaySchedule = yesterdaySchedule
  157. return result, err
  158. }
  159. func recoverYesterdaySchedule(input ...interface{}) (interface{}, error) {
  160. result := input[0].(_Result)
  161. err := input[1].(error)
  162. dayBefore := result.Date.AddDate(0, 0, -1).Format(DateFormat)
  163. if err, ok := err.(traffic_errors.NoSchedule); ok && err.Date == dayBefore {
  164. result.YesterdaySchedule = map[string]struct{}{}
  165. return gott.Tuple{result}, nil
  166. }
  167. return gott.Tuple{result}, err
  168. }
  169. func openFile(input ...interface{}) (interface{}, error) {
  170. result := input[0].(_Result)
  171. file, err := os.Open(filepath.Join(result.TimetableHome, result.Filename))
  172. result.file = file
  173. return result, err
  174. }
  175. func seek(input ...interface{}) (interface{}, error) {
  176. result := input[0].(_Result)
  177. _, err := result.file.Seek(int64(result.Offset), 0)
  178. return result, err
  179. }
  180. func unmarshalStop(input ...interface{}) (interface{}, error) {
  181. result := input[0].(_Result)
  182. result.Stop = Stop{}
  183. err := bare.UnmarshalReader(result.file, &result.Stop)
  184. result.file.Close()
  185. return result, err
  186. }
  187. func unmarshalFeedInfo(input ...interface{}) (interface{}, error) {
  188. result := input[0].(_Result)
  189. result.FeedInfo = FeedInfo{}
  190. err := bare.UnmarshalReader(result.file, &result.FeedInfo)
  191. result.file.Close()
  192. return result, err
  193. }
  194. func unmarshalLine(input ...interface{}) (interface{}, error) {
  195. result := input[0].(_Result)
  196. result.Line = Line{}
  197. err := bare.UnmarshalReader(result.file, &result.Line)
  198. result.file.Close()
  199. return result, err
  200. }
  201. func unmarshalTrip(input ...interface{}) (interface{}, error) {
  202. result := input[0].(_Result)
  203. result.Trip = Trip{}
  204. err := bare.UnmarshalReader(result.file, &result.Trip)
  205. result.file.Close()
  206. return result, err
  207. }
  208. func openTripsFile(input ...interface{}) (interface{}, error) {
  209. result := input[0].(_Result)
  210. tripsFile, err := os.Open(filepath.Join(result.TimetableHome, "trips.bare"))
  211. result.TripsFile = tripsFile
  212. return result, err
  213. }
  214. func readTrips(input ...interface{}) (interface{}, error) {
  215. result := input[0].(_Result)
  216. trips := map[string]Trip{}
  217. orders := map[string]StopOrder{}
  218. for _, order := range result.Stop.Order {
  219. _, err := result.TripsFile.Seek(int64(order.TripOffset), 0)
  220. if err != nil {
  221. return result, err
  222. }
  223. trip := Trip{}
  224. err = bare.UnmarshalReader(result.TripsFile, &trip)
  225. if err != nil {
  226. return result, err
  227. }
  228. _, inToday := result.TodaySchedule[trip.ScheduleID]
  229. _, inYesterday := result.YesterdaySchedule[trip.ScheduleID]
  230. if inToday || inYesterday {
  231. trips[trip.Id] = trip
  232. orders[trip.Id] = order
  233. }
  234. }
  235. result.Stop.Order = orders
  236. result.Trips = trips
  237. return result, nil
  238. }
  239. func getUpdates(input ...interface{}) (interface{}, error) {
  240. result := input[0].(_Result)
  241. departures := []DepartureRealtime{}
  242. timedOut := false
  243. for tripID, order := range result.Stop.Order {
  244. trip := result.Trips[tripID]
  245. var date time.Time
  246. if _, ok := result.TodaySchedule[trip.ScheduleID]; ok {
  247. date = result.Date
  248. } else if _, ok := result.YesterdaySchedule[trip.ScheduleID]; ok {
  249. date = result.Date.AddDate(0, 0, -1)
  250. } else {
  251. continue
  252. }
  253. departure, err := getDeparture(date, result, order, trip, result.Feed, timedOut)
  254. if err != nil {
  255. if isTimeout(err) {
  256. timedOut = true
  257. err = nil
  258. } else {
  259. return result, err
  260. }
  261. }
  262. departures = append(departures, departure)
  263. }
  264. result.Departures = departures
  265. result.TripsFile.Close()
  266. return result, nil
  267. }
  268. func getDeparture(date time.Time, result _Result, order StopOrder,
  269. trip Trip, feed Feed, timedOut bool) (DepartureRealtime, error) {
  270. found := false
  271. departureRt := DepartureRealtime{}
  272. var finalErr error
  273. for _, departure := range trip.Departures {
  274. if departure.StopSequence == order.Sequence {
  275. departureRt.Departure = departure
  276. departureRt.Headsign = trip.Headsign
  277. departureRt.LineName = trip.LineName
  278. departureRt.Order = order
  279. departureRt.Update = gtfs_rt.Update{}
  280. departureTime, err := calculateGtfsTime(departure.Time, 0, date,
  281. result.Location)
  282. if err != nil {
  283. return departureRt, err
  284. }
  285. departureRt.Update.Time = departureTime
  286. if departureTime.After(result.Midnight) {
  287. if result.DeparturesType == DEPARTURES_HYBRID && !timedOut {
  288. departureRt.Update, err = getRealtimeOffset(trip.Id,
  289. order.Sequence, feed)
  290. if err != nil {
  291. if isTimeout(err) {
  292. timedOut = true
  293. finalErr = err
  294. } else {
  295. log.Printf("while getting realtime departures: %v\n", err)
  296. }
  297. }
  298. }
  299. departureTime, err := calculateGtfsTime(departure.Time,
  300. departureRt.Update.Delay, date, result.Location)
  301. if err != nil {
  302. return departureRt, err
  303. }
  304. departureRt.Update.Time = departureTime
  305. }
  306. found = true
  307. break
  308. }
  309. }
  310. if !found {
  311. return departureRt, traffic_errors.NoStopOrder{
  312. TripID: trip.Id,
  313. Order: order.Sequence,
  314. }
  315. }
  316. return departureRt, finalErr
  317. }
  318. func filterDepartures(input ...interface{}) interface{} {
  319. result := input[0].(_Result)
  320. departures := []DepartureRealtime{}
  321. for _, departure := range result.Departures {
  322. if result.DeparturesType == DEPARTURES_FULL ||
  323. departure.Update.Time.After(result.MinuteB4Datetime) {
  324. departures = append(departures, departure)
  325. }
  326. }
  327. result.Departures = departures
  328. return result
  329. }
  330. func filterDeparturesByLine(input ...interface{}) interface{} {
  331. result := input[0].(_Result)
  332. departures := []DepartureRealtime{}
  333. if result.LineName != "" {
  334. for _, departure := range result.Departures {
  335. if departure.LineName == result.LineName {
  336. departures = append(departures, departure)
  337. }
  338. }
  339. result.Departures = departures
  340. }
  341. return result
  342. }
  343. func sortDepartures(input ...interface{}) interface{} {
  344. result := input[0].(_Result)
  345. sort.Slice(result.Departures, func(i, j int) bool {
  346. return result.Departures[i].Update.Time.Before(
  347. result.Departures[j].Update.Time)
  348. })
  349. return result
  350. }
  351. func closeFiles(input ...interface{}) (interface{}, error) {
  352. result := input[0].(_Result)
  353. err := input[1].(error)
  354. if result.file != nil {
  355. result.file.Close()
  356. }
  357. if result.TripsFile != nil {
  358. result.TripsFile.Close()
  359. }
  360. return result, err
  361. }
  362. func unmarshalCodeIndex(timetableHome string) (CodeIndex, error) {
  363. ix := CodeIndex{}
  364. ixFile, err := os.Open(filepath.Join(timetableHome, "ix_stop_codes.bare"))
  365. if err != nil {
  366. return ix, fmt.Errorf("while opening file: %w\n", err)
  367. }
  368. defer ixFile.Close()
  369. r := bare.NewReader(ixFile)
  370. num, err := r.ReadUint()
  371. if err != nil {
  372. return ix, fmt.Errorf("while reading length: %w\n", err)
  373. }
  374. for i := uint64(0); i < num; i++ {
  375. k, err := r.ReadString()
  376. if err != nil {
  377. return ix, fmt.Errorf("while reading key at %d: %w\n", i, err)
  378. }
  379. v, err := r.ReadUint()
  380. if err != nil {
  381. return ix, fmt.Errorf("while reading value at %d: %w\n", i, err)
  382. }
  383. ix[k] = uint(v)
  384. }
  385. return ix, nil
  386. }
  387. func unmarshalNameIndex(timetableHome, filename string) (NameIndex, error) {
  388. ix := NameIndex{}
  389. ixFile, err := os.Open(filepath.Join(timetableHome, filename))
  390. if err != nil {
  391. return ix, fmt.Errorf("while opening file: %w", err)
  392. }
  393. defer ixFile.Close()
  394. for err == nil {
  395. nameOffset := NameOffset{}
  396. err = bare.UnmarshalReader(ixFile, &nameOffset)
  397. if err != nil {
  398. if err == io.EOF {
  399. break
  400. } else {
  401. return ix, fmt.Errorf("while unmarshaling: %w", err)
  402. }
  403. }
  404. ix = append(ix, nameOffset)
  405. }
  406. return ix, nil
  407. }
  408. func unmarshalLineIndex(timetableHome string) (NameIndex, error) {
  409. return unmarshalNameIndex(timetableHome, "ix_lines.bare")
  410. }
  411. func unmarshalStopNameIndex(timetableHome string) (NameIndex, error) {
  412. return unmarshalNameIndex(timetableHome, "ix_stop_names.bare")
  413. }
  414. func unmarshalTripIndex(timetableHome string) (NameIndex, error) {
  415. return unmarshalNameIndex(timetableHome, "ix_trips.bare")
  416. }
  417. func readIndexes(feedHome string, versions []Version) (FeedCodeIndex,
  418. FeedNameIndex, FeedNameIndex, FeedNameIndex, error) {
  419. codeIndex := FeedCodeIndex{}
  420. nameIndex := FeedNameIndex{}
  421. lineIndex := FeedNameIndex{}
  422. tripIndex := FeedNameIndex{}
  423. for _, v := range versions {
  424. validity := Validity(v.String())
  425. timetableHome := filepath.Join(feedHome, string(validity))
  426. cIx, err := unmarshalCodeIndex(timetableHome)
  427. if err != nil {
  428. return codeIndex, nameIndex, lineIndex, tripIndex,
  429. fmt.Errorf("while unmarshalling code index: %w\n", err)
  430. }
  431. nIx, err := unmarshalStopNameIndex(timetableHome)
  432. if err != nil {
  433. return codeIndex, nameIndex, lineIndex, tripIndex,
  434. fmt.Errorf("while unmarshalling name index: %w\n", err)
  435. }
  436. lIx, err := unmarshalLineIndex(timetableHome)
  437. if err != nil {
  438. return codeIndex, nameIndex, lineIndex, tripIndex,
  439. fmt.Errorf("while unmarshalling line index: %w\n", err)
  440. }
  441. tIx, err := unmarshalTripIndex(timetableHome)
  442. if err != nil {
  443. return codeIndex, nameIndex, lineIndex, tripIndex,
  444. fmt.Errorf("while unmarshalling trip index: %w\n", err)
  445. }
  446. codeIndex[validity] = cIx
  447. nameIndex[validity] = nIx
  448. lineIndex[validity] = lIx
  449. tripIndex[validity] = tIx
  450. }
  451. return codeIndex, nameIndex, lineIndex, tripIndex, nil
  452. }
  453. func unmarshalCalendar(timetableHome string) ([]Schedule, error) {
  454. calendar := []Schedule{}
  455. calendarFile, err := os.Open(filepath.Join(timetableHome, "calendar.bare"))
  456. if err != nil {
  457. return calendar, fmt.Errorf("while opening file: %w", err)
  458. }
  459. defer calendarFile.Close()
  460. for err == nil {
  461. schedule := Schedule{}
  462. err = bare.UnmarshalReader(calendarFile, &schedule)
  463. if err != nil {
  464. if err == io.EOF {
  465. break
  466. } else {
  467. return calendar, fmt.Errorf("while unmarshaling: %w", err)
  468. }
  469. }
  470. calendar = append(calendar, schedule)
  471. }
  472. return calendar, nil
  473. }
  474. func readCalendar(feedHome string, versions []Version) (FeedCalendar, error) {
  475. calendars := FeedCalendar{}
  476. for _, v := range versions {
  477. validity := Validity(v.String())
  478. timetableHome := filepath.Join(feedHome, string(validity))
  479. schedule, err := unmarshalCalendar(timetableHome)
  480. if err != nil {
  481. return calendars, fmt.Errorf("while unmarshaling for %s: %w", v, err)
  482. }
  483. calendars[validity] = schedule
  484. }
  485. return calendars, nil
  486. }
  487. func unmarshalVehicles(timetableHome string) (Vehicles, error) {
  488. vehicles := Vehicles{}
  489. vehiclesFile, err := os.Open(filepath.Join(timetableHome, "vehicles.bare"))
  490. if err != nil {
  491. return vehicles, fmt.Errorf("while opening file: %w", err)
  492. }
  493. defer vehiclesFile.Close()
  494. for err == nil {
  495. vehicle := Vehicle{}
  496. err = bare.UnmarshalReader(vehiclesFile, &vehicle)
  497. if err != nil {
  498. if err == io.EOF {
  499. break
  500. } else {
  501. return vehicles, fmt.Errorf("while unmarshaling: %w", err)
  502. }
  503. }
  504. vehicles[vehicle.Id] = vehicle
  505. }
  506. return vehicles, nil
  507. }
  508. func readVehicles(feedHome string, versions []Version) (FeedVehicles, error) {
  509. vehicles := FeedVehicles{}
  510. for _, v := range versions {
  511. validity := Validity(v.String())
  512. timetableHome := filepath.Join(feedHome, string(validity))
  513. versionVehicles, err := unmarshalVehicles(timetableHome)
  514. if err != nil {
  515. return vehicles, fmt.Errorf("while unmarshaling for %s: %w", v, err)
  516. }
  517. vehicles[validity] = versionVehicles
  518. }
  519. return vehicles, nil
  520. }
  521. func createPositionIndex(feedHome string, versions []Version) (FeedPositionIndex, error) {
  522. feedPositionIndex := FeedPositionIndex{}
  523. for _, v := range versions {
  524. positionIndex := rtreego.NewTree(2, 25, 50)
  525. validity := Validity(v.String())
  526. timetableHome := filepath.Join(feedHome, string(validity))
  527. stopsFile, err := os.Open(filepath.Join(timetableHome, "stops.bare"))
  528. if err != nil {
  529. return feedPositionIndex, fmt.Errorf("while opening stops file: %w", err)
  530. }
  531. defer stopsFile.Close()
  532. for err == nil {
  533. stop := Stop{}
  534. err = bare.UnmarshalReader(stopsFile, &stop)
  535. if err != nil {
  536. if err == io.EOF {
  537. break
  538. } else {
  539. return feedPositionIndex, fmt.Errorf("while unmarshaling: %w", err)
  540. }
  541. }
  542. stop.Name = ""
  543. stop.NodeName = ""
  544. stop.ChangeOptions = nil
  545. stop.Zone = ""
  546. stop.Order = nil
  547. positionIndex.Insert(stop)
  548. feedPositionIndex[validity] = positionIndex
  549. }
  550. }
  551. return feedPositionIndex, nil
  552. }
  553. func unmarshalTripFromFile(tripsFile *os.File) Trip {
  554. trip := Trip{}
  555. _ = bare.UnmarshalReader(tripsFile, &trip)
  556. return trip
  557. }
  558. func EnableFeeds(cfg config.Config, traffic *Traffic) {
  559. feedsMap := RegisterFeeds()
  560. feeds := map[string]Feed{}
  561. for _, enabledFeed := range cfg.EnabledFeeds {
  562. feeds[enabledFeed] = feedsMap[enabledFeed]
  563. }
  564. traffic.Feeds = feeds
  565. }
  566. func Initialise(sigChan chan os.Signal, doneChan chan bool, cfg config.Config,
  567. traffic *Traffic) {
  568. bare.MaxMapSize(8192)
  569. for {
  570. sig := <-sigChan
  571. if sig == os.Interrupt {
  572. break
  573. }
  574. allVersions := GlobalVersions{}
  575. codeIndexes := GlobalCodeIndex{}
  576. nameIndexes := GlobalNameIndex{}
  577. lineIndexes := GlobalNameIndex{}
  578. tripIndexes := GlobalNameIndex{}
  579. calendars := GlobalCalendar{}
  580. vehicles := GlobalVehicles{}
  581. positionIndexes := GlobalPositionIndex{}
  582. for _, feed := range traffic.Feeds {
  583. feedHome := filepath.Join(cfg.FeedsPath, feed.String())
  584. err := file.UnpackTraffic(cfg.FeedsPath, feed.String())
  585. if err != nil {
  586. log.Printf("while unpacking TRAFFIC in feed %s: %v\n", feed, err)
  587. continue
  588. }
  589. err = CleanOldVersions(cfg, feed)
  590. if err != nil {
  591. log.Printf("while cleaning old TRAFFIC versions in feed %s: %v\n",
  592. feed, err)
  593. continue
  594. }
  595. feedVersions, err := ListVersions(cfg, feed)
  596. if err != nil {
  597. log.Printf("while listing TRAFFIC versions in feed %s: %v\n", feed,
  598. err)
  599. continue
  600. }
  601. feedName := feed.String()
  602. allVersions[feedName] = feedVersions
  603. codeIndexes[feedName], nameIndexes[feedName], lineIndexes[feedName], tripIndexes[feedName],
  604. err = readIndexes(feedHome, feedVersions)
  605. if err != nil {
  606. log.Printf("while reading indexes in feed %s: %v\n", feed, err)
  607. continue
  608. }
  609. calendars[feedName], err = readCalendar(feedHome, feedVersions)
  610. if err != nil {
  611. log.Printf("while reading calendars in feed %s: %v\n", feed, err)
  612. continue
  613. }
  614. vehicles[feedName], err = readVehicles(feedHome, feedVersions)
  615. if err != nil {
  616. log.Printf("while reading vehicles in feed %s: %v\n", feed, err)
  617. continue
  618. }
  619. positionIndexes[feedName], err = createPositionIndex(feedHome, feedVersions)
  620. }
  621. traffic.CodeIndexes = codeIndexes
  622. traffic.NameIndexes = nameIndexes
  623. traffic.LineIndexes = lineIndexes
  624. traffic.TripIndexes = tripIndexes
  625. traffic.Versions = allVersions
  626. traffic.Calendars = calendars
  627. traffic.Vehicles = vehicles
  628. traffic.PositionIndexes = positionIndexes
  629. log.Println("Initialised")
  630. }
  631. doneChan <- true
  632. }
  633. func GetDepartures(stopCode string, lineName, dataHome, feedName string,
  634. versionCode Validity, traffic *Traffic, date time.Time,
  635. departuresType DeparturesType) ([]DepartureRealtime, error) {
  636. codeIndex := traffic.CodeIndexes[feedName][versionCode]
  637. calendar := traffic.Calendars[feedName][versionCode]
  638. vehicles := traffic.Vehicles[feedName][versionCode]
  639. result := _Result{
  640. Offset: codeIndex[stopCode],
  641. Filename: "stops.bare",
  642. Date: date,
  643. LineName: lineName,
  644. TimetableHome: filepath.Join(dataHome, feedName, string(versionCode)),
  645. Calendar: calendar,
  646. DeparturesType: departuresType,
  647. Vehicles: vehicles,
  648. Feed: traffic.Feeds[feedName],
  649. }
  650. r, e := gott.NewResult(result).
  651. Bind(loadLocation).
  652. Map(loadTime).
  653. Bind(loadTodaySchedule).
  654. Bind(loadYesterdaySchedule).
  655. Recover(recoverYesterdaySchedule).
  656. Bind(openFile).
  657. Bind(seek).
  658. Bind(unmarshalStop).
  659. Bind(openTripsFile).
  660. Bind(readTrips).
  661. Bind(getUpdates).
  662. Map(filterDepartures).
  663. Map(filterDeparturesByLine).
  664. Map(sortDepartures).
  665. Recover(closeFiles).
  666. Finish()
  667. if e != nil {
  668. return []DepartureRealtime{}, e
  669. } else {
  670. return r.(_Result).Departures, nil
  671. }
  672. }
  673. func GetTripFromStop(tripID string, stopCode string, context Context, traffic *Traffic) ([]TimedStopStub, error) {
  674. stubs := []TimedStopStub{}
  675. var (
  676. order = -1
  677. trip Trip
  678. err error
  679. baseTime uint = 0
  680. time uint = 0
  681. )
  682. if stopCode != "" {
  683. startingStop, err := GetStop(stopCode, context, traffic)
  684. if err != nil {
  685. return stubs, fmt.Errorf("while getting starting stop: %w", err)
  686. }
  687. tripOffset := -1
  688. order = -1
  689. o := startingStop.Order[tripID]
  690. tripOffset = int(o.TripOffset)
  691. order = o.Sequence
  692. if tripOffset == -1 {
  693. return stubs, fmt.Errorf("trip for starting stop not found")
  694. }
  695. trip, err = GetTripByOffset(uint(tripOffset), context, traffic)
  696. if err != nil {
  697. return stubs, fmt.Errorf("while getting trip: %w", err)
  698. }
  699. } else {
  700. trip, err = GetTrip(tripID, context, traffic)
  701. if err != nil {
  702. return stubs, fmt.Errorf("while getting trip: %w", err)
  703. }
  704. }
  705. for _, departure := range trip.Departures {
  706. if departure.StopSequence >= order {
  707. stop, err := getStopByOffset(uint(departure.StopOffset), context, traffic)
  708. if err != nil {
  709. return stubs, fmt.Errorf("while getting stop: %w", err)
  710. }
  711. if baseTime != 0 {
  712. time = departure.Time - baseTime
  713. }
  714. stubs = append(stubs, TimedStopStub{
  715. StopStub: StopStub{
  716. Code: stop.Code,
  717. Name: stop.Name,
  718. NodeName: stop.NodeName,
  719. Zone: stop.Zone,
  720. OnDemand: departure.Pickup == BY_DRIVER || departure.Dropoff == BY_DRIVER,
  721. },
  722. Time: time,
  723. })
  724. }
  725. }
  726. return stubs, nil
  727. }
  728. func getStopByOffset(offset uint, context Context, traffic *Traffic) (Stop, error) { // todo offset should be uint64 everywhere
  729. result := _Result{
  730. Filename: "stops.bare",
  731. Offset: offset,
  732. TimetableHome: filepath.Join(context.DataHome, context.FeedName, string(context.Version)),
  733. }
  734. r, e := gott.NewResult(result).
  735. Bind(openFile).
  736. Bind(seek).
  737. Bind(unmarshalStop).
  738. Finish()
  739. if e != nil {
  740. return Stop{}, e
  741. } else {
  742. return r.(_Result).Stop, nil
  743. }
  744. }
  745. func getLineByOffset(offset uint, dataHome string, feedName string,
  746. versionCode Validity, traffic *Traffic) (Line, error) {
  747. result := _Result{
  748. Filename: "lines.bare",
  749. Offset: offset,
  750. TimetableHome: filepath.Join(dataHome, feedName, string(versionCode)),
  751. }
  752. r, e := gott.NewResult(result).
  753. Bind(openFile).
  754. Bind(seek).
  755. Bind(unmarshalLine).
  756. Finish()
  757. if e != nil {
  758. return Line{}, e
  759. } else {
  760. return r.(_Result).Line, nil
  761. }
  762. }
  763. func getFeedInfo(dataHome string, feedName string,
  764. versionCode Validity, traffic *Traffic) (FeedInfo, error) {
  765. result := _Result{
  766. Filename: "feed_info.bare",
  767. TimetableHome: filepath.Join(dataHome, feedName, string(versionCode)),
  768. }
  769. r, e := gott.NewResult(result).
  770. Bind(openFile).
  771. Bind(unmarshalFeedInfo).
  772. Finish()
  773. if e != nil {
  774. return FeedInfo{}, e
  775. } else {
  776. return r.(_Result).FeedInfo, nil
  777. }
  778. }
  779. func GetTripsByOffset(offsets []uint, context Context, t *Traffic, filter func(Trip) bool) ([]Trip, error) {
  780. trips := []Trip{}
  781. file, err := os.Open(filepath.Join(context.DataHome, context.FeedName, string(context.Version), "trips.bare"))
  782. if err != nil {
  783. return trips, fmt.Errorf("while opening file: %w", err)
  784. }
  785. defer file.Close()
  786. offsetsSet := map[uint]struct{}{}
  787. for _, offset := range offsets {
  788. offsetsSet[offset] = struct{}{}
  789. }
  790. for offset := range offsetsSet {
  791. _, err = file.Seek(int64(offset), 0)
  792. if err != nil {
  793. return trips, fmt.Errorf("while seeking to %d: %w", offset, err)
  794. }
  795. trip := Trip{}
  796. err = bare.UnmarshalReader(file, &trip)
  797. if err != nil {
  798. return trips, fmt.Errorf("while unmarshalling at %d: %w", offset, err)
  799. }
  800. if filter(trip) {
  801. trips = append(trips, trip)
  802. }
  803. }
  804. return trips, nil
  805. }
  806. func GetTripByOffset(offset uint, context Context, t *Traffic) (Trip, error) {
  807. result := _Result{
  808. Filename: "trips.bare",
  809. Offset: offset,
  810. TimetableHome: filepath.Join(context.DataHome, context.FeedName, string(context.Version)),
  811. }
  812. r, e := gott.NewResult(result).
  813. Bind(openFile).
  814. Bind(seek).
  815. Bind(unmarshalTrip).
  816. Finish()
  817. if e != nil {
  818. return Trip{}, e
  819. } else {
  820. return r.(_Result).Trip, nil
  821. }
  822. }
  823. func GetStop(stopCode string, context Context, traffic *Traffic) (Stop, error) {
  824. codeIndex := traffic.CodeIndexes[context.FeedName][context.Version]
  825. return getStopByOffset(codeIndex[stopCode], context, traffic)
  826. }
  827. func GetStopStub(stopCode string, lineName string, context Context, traffic *Traffic) (StopStub, error) {
  828. stop, err := GetStop(stopCode, context, traffic)
  829. if err != nil {
  830. return StopStub{}, err
  831. }
  832. var trip Trip
  833. var stopOrder = -1
  834. for _, order := range stop.Order {
  835. offset := order.TripOffset
  836. trip, _ = GetTripByOffset(offset, context, traffic)
  837. if trip.LineName == lineName {
  838. stopOrder = order.Sequence
  839. break
  840. }
  841. }
  842. if stopOrder == -1 {
  843. return StopStub{}, fmt.Errorf("cannot the stop on given line")
  844. }
  845. var departure *Departure
  846. for _, d := range trip.Departures {
  847. if d.StopSequence == stopOrder { // todo binary search
  848. departure = &d
  849. break
  850. }
  851. }
  852. if departure == nil {
  853. return StopStub{}, fmt.Errorf("cannot find departure at sequence %d", stopOrder)
  854. }
  855. stopStub := StopStub{
  856. Code: stop.Code,
  857. Name: stop.Name,
  858. NodeName: stop.NodeName,
  859. Zone: stop.Zone,
  860. OnDemand: departure.Pickup == BY_DRIVER || departure.Dropoff == BY_DRIVER,
  861. }
  862. return stopStub, nil
  863. }
  864. func GetLine(name string, context Context, traffic *Traffic) (Line, error) {
  865. index := traffic.LineIndexes[context.FeedName][context.Version]
  866. for _, o := range index {
  867. cleanedName, err := CleanQuery(name, traffic.Feeds[context.FeedName])
  868. if err != nil {
  869. return Line{}, err
  870. }
  871. if o.Name == cleanedName {
  872. return getLineByOffset(o.Offsets[0], context.DataHome, context.FeedName, context.Version, traffic)
  873. }
  874. }
  875. return Line{}, nil
  876. }
  877. func GetTrip(id string, context Context, traffic *Traffic) (Trip, error) {
  878. tripIndex := traffic.TripIndexes[context.FeedName][context.Version]
  879. for _, o := range tripIndex {
  880. if o.Name == id {
  881. return GetTripByOffset(o.Offsets[0], context, traffic)
  882. }
  883. }
  884. return Trip{}, fmt.Errorf("trip by id %s not found", id)
  885. }
  886. func QueryLines(query string, dataHome string, feedName string,
  887. versionCode Validity, traffic *Traffic) ([]Line, error) {
  888. lines := []Line{}
  889. index := traffic.LineIndexes[feedName][versionCode]
  890. cleanQuery, err := CleanQuery(query, traffic.Feeds[feedName])
  891. if err != nil {
  892. return lines, fmt.Errorf("while cleaning query: %w", err)
  893. }
  894. results := fuzzy.FindFrom(cleanQuery, index)
  895. for _, result := range results {
  896. for _, offset := range index[result.Index].Offsets {
  897. line, err := getLineByOffset(offset, dataHome, feedName,
  898. versionCode, traffic)
  899. if err != nil {
  900. return lines, fmt.Errorf("while getting line for %s: %w", result.Str, err)
  901. }
  902. lines = append(lines, line)
  903. }
  904. }
  905. return lines, nil
  906. }
  907. func QueryStops(query string, context Context, traffic *Traffic) ([]Stop, error) {
  908. stops := []Stop{}
  909. nameIndex := traffic.NameIndexes[context.FeedName][context.Version]
  910. results := fuzzy.FindFrom(query, nameIndex)
  911. for _, result := range results {
  912. for _, offset := range nameIndex[result.Index].Offsets {
  913. stop, err := getStopByOffset(offset, context, traffic)
  914. if err != nil {
  915. return stops, err
  916. }
  917. stops = append(stops, stop)
  918. }
  919. }
  920. return stops, nil
  921. }
  922. func GetStopsNear(location Position, context Context, traffic *Traffic) ([]Stop, error) {
  923. stops := []Stop{}
  924. positionIndex := traffic.PositionIndexes[context.FeedName][context.Version]
  925. codeIndex := traffic.CodeIndexes[context.FeedName][context.Version]
  926. spatials := positionIndex.NearestNeighbors(12, rtreego.Point{location.Lat, location.Lon})
  927. for _, spatial := range spatials {
  928. stop, err := getStopByOffset(codeIndex[spatial.(Stop).Code], context, traffic)
  929. if err != nil {
  930. return stops, fmt.Errorf("while getting stop by offset for %s: %w", spatial.(Stop).Code, err)
  931. }
  932. stops = append(stops, stop)
  933. }
  934. return stops, nil
  935. }
  936. func GetAlerts(stopCode string, preferredLanguages []language.Tag, context Context, traffic *Traffic) ([]Alert, error) {
  937. feed := traffic.Feeds[context.FeedName]
  938. alertPositions := map[uint]struct{}{}
  939. gtfsAlerts, lastUpdated, err := gtfs_rt.GetAlerts(lastUpdatedGtfsRt[feed.String()], feed.RealtimeFeeds(), feed.String())
  940. if err != nil {
  941. if isTimeout(err) {
  942. return []Alert{}, nil
  943. }
  944. return []Alert{}, fmt.Errorf("while geting alerts: %w", err)
  945. }
  946. lastUpdatedGtfsRt[feed.String()] = lastUpdated
  947. stop, err := GetStop(stopCode, context, traffic)
  948. for _, pos := range gtfsAlerts.ByStop[stop.Id] {
  949. alertPositions[pos] = struct{}{}
  950. }
  951. for _, option := range stop.ChangeOptions {
  952. line, err := GetLine(option.LineName, context, traffic)
  953. if err != nil {
  954. return []Alert{}, fmt.Errorf("while getting line %s: %w", option.LineName, err)
  955. }
  956. for _, pos := range gtfsAlerts.ByRoute[line.Id] {
  957. alertPositions[pos] = struct{}{}
  958. }
  959. for _, pos := range gtfsAlerts.ByType[line.Kind.Value()] {
  960. alertPositions[pos] = struct{}{}
  961. }
  962. }
  963. for _, order := range stop.Order {
  964. trip, err := GetTripByOffset(order.TripOffset, context, traffic)
  965. if err != nil {
  966. return []Alert{}, fmt.Errorf("while getting trip at %d: %w", order.TripOffset, err)
  967. }
  968. for _, pos := range gtfsAlerts.ByTrip[trip.Id] {
  969. alertPositions[pos] = struct{}{}
  970. }
  971. }
  972. alerts := make([]Alert, len(alertPositions))
  973. i := 0
  974. for pos := range alertPositions {
  975. gtfsAlert := gtfsAlerts.Alerts[pos]
  976. includeAlert := false
  977. for _, timeRange := range gtfsAlert.TimeRanges {
  978. start := time.Unix(int64(timeRange.GetStart()), 0)
  979. var end time.Time
  980. if timeRange.GetEnd() == 0 {
  981. end = time.Date(9999, 12, 31, 23, 59, 59, 0, time.Local)
  982. } else {
  983. end = time.Unix(int64(timeRange.GetEnd()), 0)
  984. }
  985. now := time.Now()
  986. if !now.Before(start) && !now.After(end) {
  987. includeAlert = true
  988. }
  989. }
  990. if !includeAlert {
  991. continue
  992. }
  993. alerts[i] = Alert{
  994. Cause: int32(gtfsAlert.Cause), // todo(BAF10)
  995. Effect: int32(gtfsAlert.Effect), // todo(BAF10)
  996. }
  997. alertHeaderLangs := []language.Tag{}
  998. for lang := range gtfsAlert.Headers {
  999. alertHeaderLangs = append(alertHeaderLangs, lang)
  1000. }
  1001. m := language.NewMatcher(alertHeaderLangs)
  1002. tag, _, _ := m.Match(preferredLanguages...)
  1003. alerts[i].Header = gtfsAlert.Headers[tag]
  1004. alertDescriptionLangs := []language.Tag{}
  1005. for lang := range gtfsAlert.Descriptions {
  1006. alertDescriptionLangs = append(alertDescriptionLangs, lang)
  1007. }
  1008. m = language.NewMatcher(alertDescriptionLangs)
  1009. tag, _, _ = m.Match(preferredLanguages...)
  1010. alerts[i].Description = gtfsAlert.Descriptions[tag]
  1011. alertUrlLangs := []language.Tag{}
  1012. for lang := range gtfsAlert.URLs {
  1013. alertUrlLangs = append(alertUrlLangs, lang)
  1014. }
  1015. m = language.NewMatcher(alertUrlLangs)
  1016. tag, _, _ = m.Match(preferredLanguages...)
  1017. alerts[i].URL = gtfsAlert.URLs[tag]
  1018. i++
  1019. }
  1020. return alerts, nil
  1021. }
  1022. func GetTimezone(stop Stop, feed Feed) (*time.Location, error) {
  1023. if stop.Timezone != "" {
  1024. return time.LoadLocation(stop.Timezone)
  1025. }
  1026. return feed.GetLocation(), nil
  1027. }
  1028. func GetLanguage(ctx Context, t *Traffic) (string, error) {
  1029. feedInfo, err := getFeedInfo(ctx.DataHome, ctx.FeedName, ctx.Version, t)
  1030. return feedInfo.Language, err
  1031. }
  1032. func CleanOldVersions(cfg config.Config, feed Feed) error {
  1033. allVersions, err := ListVersions(cfg, feed)
  1034. if err != nil {
  1035. return err
  1036. }
  1037. now := time.Now().In(feed.GetLocation())
  1038. versionsMap := map[string]Version{}
  1039. for _, version := range allVersions {
  1040. versionsMap[version.String()] = version
  1041. }
  1042. sort.Slice(allVersions, func(i, j int) bool {
  1043. return allVersions[i].ValidFrom.Before(allVersions[j].ValidFrom)
  1044. })
  1045. validVersions := FindValidVersions(allVersions, now)
  1046. validVersionsMap := map[string]bool{}
  1047. for _, version := range validVersions {
  1048. validVersionsMap[version.String()] = true
  1049. }
  1050. err = file.CleanOldVersions(FeedPath(cfg, feed), validVersionsMap)
  1051. return err
  1052. }
  1053. func convertVehicle(update gtfs_rt.Update, context Context, t *Traffic) (VehicleStatus, error) {
  1054. vehicles := t.Vehicles[context.FeedName][context.Version]
  1055. tripID := update.TripUpdate.GetTrip().GetTripId()
  1056. trip, err := GetTrip(tripID, context, t)
  1057. if err != nil {
  1058. return VehicleStatus{}, fmt.Errorf("while converting vehicle: %w", err)
  1059. }
  1060. return VehicleStatus{
  1061. Id: update.VehicleID,
  1062. Position: Position{float64(update.Latitude), float64(update.Longitude)},
  1063. Capabilities: vehicles[update.VehicleID].Capabilities,
  1064. Speed: update.Speed,
  1065. Headsign: trip.Headsign,
  1066. LineName: trip.LineName,
  1067. }, nil
  1068. }
  1069. func GetVehicle(tripID string, context Context, t *Traffic) (VehicleStatus, error) {
  1070. vehicle := VehicleStatus{}
  1071. update, err := getRealtimeOffset(tripID, 0, t.Feeds[context.FeedName])
  1072. if err != nil {
  1073. return vehicle, fmt.Errorf("while getting realtime update: %w", err)
  1074. }
  1075. if update.TripUpdate == nil {
  1076. return vehicle, fmt.Errorf("empty realtime update")
  1077. }
  1078. return convertVehicle(update, context, t)
  1079. }
  1080. func GetStopsIn(lb, rt Position, context Context, traffic *Traffic) ([]Stop, error) {
  1081. // todo limit rect size
  1082. // todo does it take into account rect 179 -> -179 latitude?
  1083. stops := []Stop{}
  1084. positionIndex := traffic.PositionIndexes[context.FeedName][context.Version]
  1085. codeIndex := traffic.CodeIndexes[context.FeedName][context.Version]
  1086. rect, err := rtreego.NewRectFromPoints(rtreego.Point{lb.Lat, lb.Lon}, rtreego.Point{rt.Lat, rt.Lon})
  1087. if err != nil {
  1088. return stops, fmt.Errorf("while creating a rect: %w", err)
  1089. }
  1090. spatials := positionIndex.SearchIntersect(rect)
  1091. for _, spatial := range spatials {
  1092. stop, err := getStopByOffset(codeIndex[spatial.(Stop).Code], context, traffic)
  1093. if err != nil {
  1094. return stops, fmt.Errorf("while getting stop by offset for %s: %w", spatial.(Stop).Code, err)
  1095. }
  1096. stops = append(stops, stop)
  1097. }
  1098. return stops, nil
  1099. }
  1100. func GetVehiclesIn(lb, rt Position, context Context, t *Traffic) ([]VehicleStatus, error) {
  1101. // todo limit rect size
  1102. vehicles := []VehicleStatus{}
  1103. updates, err := getRealtimeUpdates(t.Feeds[context.FeedName])
  1104. if err != nil {
  1105. return vehicles, err
  1106. }
  1107. for _, update := range updates {
  1108. if rt.Lon < float64(update.Longitude) || lb.Lon > float64(update.Longitude) {
  1109. continue
  1110. }
  1111. lat := float64(update.Latitude)
  1112. if lb.Lat < rt.Lat {
  1113. if lb.Lat < lat && lat < rt.Lat {
  1114. vehicle, err := convertVehicle(update, context, t)
  1115. if err != nil {
  1116. return vehicles, fmt.Errorf("while converting vehicle: %w", err)
  1117. }
  1118. vehicles = append(vehicles, vehicle)
  1119. }
  1120. } else {
  1121. if lat > lb.Lat || lat < rt.Lat {
  1122. vehicle, err := convertVehicle(update, context, t)
  1123. if err != nil {
  1124. return vehicles, fmt.Errorf("while converting vehicle: %w", err)
  1125. }
  1126. vehicles = append(vehicles, vehicle)
  1127. }
  1128. }
  1129. }
  1130. return vehicles, nil
  1131. }