File system events

This commit is contained in:
Csaba 2024-05-07 12:22:47 +02:00
parent 5b1c9cb80e
commit 2e6420b78f
6 changed files with 516 additions and 24 deletions

View file

@ -1,6 +1,6 @@
import amarillo.plugins.gtfs_export.gtfsrt.gtfs_realtime_pb2 as gtfs_realtime_pb2
import amarillo.plugins.gtfs_export.gtfsrt.realtime_extension_pb2 as mfdzrte
from amarillo.plugins.gtfs_export.gtfs_constants import *
from .gtfsrt import gtfs_realtime_pb2 as gtfs_realtime_pb2
from gtfsrt import realtime_extension_pb2 as mfdzrte
from .gtfs_constants import *
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import ParseDict
from datetime import datetime, timedelta

View file

@ -8,9 +8,9 @@ import logging
import re
from amarillo.utils.utils import assert_folder_exists
from amarillo.plugins.gtfs_export.models.gtfs import GtfsTimeDelta, GtfsFeedInfo, GtfsAgency, GtfsRoute, GtfsStop, GtfsStopTime, GtfsTrip, GtfsCalendar, GtfsCalendarDate, GtfsShape
from .models.gtfs import GtfsTimeDelta, GtfsFeedInfo, GtfsAgency, GtfsRoute, GtfsStop, GtfsStopTime, GtfsTrip, GtfsCalendar, GtfsCalendarDate, GtfsShape
from amarillo.plugins.enhancer.services.stops import is_carpooling_stop
from amarillo.plugins.gtfs_export.gtfs_constants import *
from .gtfs_constants import *
from .models.Carpool import Agency

View file

@ -1,4 +1,4 @@
from fastapi import FastAPI, Body, status
from fastapi import FastAPI, Body, HTTPException, status
from fastapi.responses import FileResponse
from .gtfs_export import GtfsExport, GtfsFeedInfo, GtfsAgency
@ -12,16 +12,46 @@ import schedule
import threading
import time
import logging
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from .models.Carpool import Carpool, Region
from .router import _assert_region_exists
from amarillo.plugins.enhancer.services import stops
from amarillo.plugins.enhancer.services.trips import TripStore, Trip
from amarillo.plugins.enhancer.services.carpools import CarpoolService
from amarillo.plugins.enhancer.services import stops #TODO: make stop service its own package??
from .services.trips import TripStore, Trip
from .services.carpools import CarpoolService
from amarillo.services.agencies import AgencyService
from amarillo.services.regions import RegionService
from amarillo.utils.utils import agency_carpool_ids_from_filename
logger = logging.getLogger(__name__)
class EventHandler(FileSystemEventHandler):
def on_closed(self, event):
logger.info("CLOSE_WRITE: Created %s", event.src_path)
try:
with open(event.src_path, 'r', encoding='utf-8') as f:
dict = json.load(f)
carpool = Carpool(**dict)
container['carpools'].put(carpool.agency, carpool.id, carpool)
except FileNotFoundError as e:
logger.error("Carpool could not be added, as already deleted (%s)", event.src_path)
except:
logger.exception("Eventhandler on_closed encountered exception")
def on_deleted(self, event):
try:
logger.info("DELETE: Removing %s", event.src_path)
(agency_id, carpool_id) = agency_carpool_ids_from_filename(event.src_path)
container['carpools'].delete(agency_id, carpool_id)
except:
logger.exception("Eventhandler on_deleted encountered exception")
def init():
container['agencies'] = AgencyService()
logger.info("Loaded %d agencies", len(container['agencies'].agencies))
@ -40,7 +70,7 @@ def init():
container['stops_store'] = stop_store
container['trips_store'] = TripStore(stop_store)
# TODO: do we need the carpool service at all?
# TODO: the carpool service may be obsolete
container['carpools'] = CarpoolService(container['trips_store'])
logger.info("Restore carpools...")
@ -55,6 +85,11 @@ def init():
except Exception as e:
logger.warning("Issue during restore of carpool %s: %s", carpool_file_name, repr(e))
observer = Observer() # Watch Manager
observer.schedule(EventHandler(), 'data/enhanced', recursive=True)
observer.start()
def run_schedule():
while 1:
@ -82,7 +117,7 @@ def generate_gtfs():
exporter = GtfsExport(
container['agencies'].agencies,
feed_info,
container['trips_store'], # TODO: read carpools from disk and convert them to trips
container['trips_store'],
container['stops_store'],
region.bbox)
exporter.export(f"data/gtfs/amarillo.{region.id}.gtfs.zip", "data/tmp/")
@ -109,7 +144,7 @@ def setup(app : FastAPI):
pass
logging.config.fileConfig('logging.conf', disable_existing_loggers=False)
logger = logging.getLogger("enhancer")
logger = logging.getLogger("gtfs-generator")
#TODO: clean up metadata
app = FastAPI(title="Amarillo GTFS Generator",
@ -173,22 +208,22 @@ app = FastAPI(title="Amarillo GTFS Generator",
init()
@app.post("/",
operation_id="enhancecarpool",
summary="Add a new or update existing carpool",
description="Carpool object to be enhanced",
responses={
status.HTTP_404_NOT_FOUND: {
"description": "Agency does not exist"},
# @app.post("/",
# operation_id="enhancecarpool",
# summary="Add a new or update existing carpool",
# description="Carpool object to be enhanced",
# responses={
# status.HTTP_404_NOT_FOUND: {
# "description": "Agency does not exist"},
})
# })
#TODO: add examples
async def post_carpool(carpool: Carpool = Body(...)):
# async def post_carpool(carpool: Carpool = Body(...)):
logger.info(f"POST trip {carpool.agency}:{carpool.id}.")
# logger.info(f"POST trip {carpool.agency}:{carpool.id}.")
trips_store: TripStore = container['trips_store']
trip = trips_store._load_as_trip(carpool)
# trips_store: TripStore = container['trips_store']
# trip = trips_store._load_as_trip(carpool)
#TODO: carpool deleted endpoint
@ -208,6 +243,26 @@ async def get_file(region_id: str):
# verify_permission("gtfs", requesting_user)
return FileResponse(f'data/gtfs/amarillo.{region_id}.gtfs.zip')
@app.get("/region/{region_id}/grfs-rt/",
summary="Return GRFS-RT Feed for this region",
response_description="GRFS-RT-Feed",
response_class=FileResponse,
responses={
status.HTTP_404_NOT_FOUND: {"description": "Region not found"},
status.HTTP_400_BAD_REQUEST: {"description": "Bad request, e.g. because format is not supported, i.e. neither protobuf nor json."}
}
)
async def get_file(region_id: str, format: str = 'protobuf'):
generate_gtfs_rt()
_assert_region_exists(region_id)
if format == 'json':
return FileResponse(f'data/grfs/amarillo.{region_id}.gtfsrt.json')
elif format == 'protobuf':
return FileResponse(f'data/grfs/amarillo.{region_id}.gtfsrt.pbf')
else:
message = "Specified format is not supported, i.e. neither protobuf nor json."
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
#TODO: sync endpoint that calls midnight
@app.post("/sync",

View file

@ -0,0 +1,60 @@
import json
import logging
from datetime import datetime
from typing import Dict
from amarillo.models.Carpool import Carpool
from amarillo.utils.utils import yesterday, is_older_than_days
logger = logging.getLogger(__name__)
class CarpoolService():
MAX_OFFER_AGE_IN_DAYS = 180
def __init__(self, trip_store):
self.trip_store = trip_store
self.carpools: Dict[str, Carpool] = {}
def is_outdated(self, carpool):
"""
A carpool offer is outdated, if
* it's completly in the past (if it's a single date offer).
As we know the start time but not latest arrival, we deem
offers starting the day before yesterday as outdated
* it's last update occured before MAX_OFFER_AGE_IN_DAYS
"""
runs_once = not isinstance(carpool.departureDate, set)
return (is_older_than_days(carpool.lastUpdated.date(), self.MAX_OFFER_AGE_IN_DAYS) or
(runs_once and carpool.departureDate < yesterday()))
def purge_outdated_offers(self):
"""
Iterates over all carpools and deletes those which are outdated
"""
for key in list(self.carpools.keys()):
cp = self.carpools.get(key)
if cp and self.is_outdated(cp):
logger.info("Purge outdated offer %s", key)
self.delete(cp.agency, cp.id)
def get(self, agency_id: str, carpool_id: str):
return self.carpools.get(f"{agency_id}:{carpool_id}")
def get_all_ids(self):
return list(self.carpools)
def put(self, agency_id: str, carpool_id: str, carpool):
self.carpools[f"{agency_id}:{carpool_id}"] = carpool
# Outdated trips (which might have been in the store)
# will be deleted
if self.is_outdated(carpool):
logger.info('Deleting outdated carpool %s:%s', agency_id, carpool_id)
self.delete(agency_id, carpool_id)
else:
self.trip_store.put_carpool(carpool)
def delete(self, agency_id: str, carpool_id: str):
id = f"{agency_id}:{carpool_id}"
if id in self.carpools:
del self.carpools[id]
self.trip_store.delete_carpool(agency_id, carpool_id)

View file

@ -0,0 +1,377 @@
from amarillo.plugins.enhancer.models.gtfs import GtfsTimeDelta, GtfsStopTime
from amarillo.models.Carpool import MAX_STOPS_PER_TRIP, Carpool, Weekday, StopTime, PickupDropoffType, Driver, RidesharingInfo
from amarillo.services.config import config
from amarillo.plugins.enhancer.services.gtfs_constants import *
from amarillo.plugins.enhancer.services.routing import RoutingService, RoutingException
from amarillo.plugins.enhancer.services.stops import is_carpooling_stop
from amarillo.utils.utils import assert_folder_exists, is_older_than_days, yesterday, geodesic_distance_in_m
from shapely.geometry import Point, LineString, box
from geojson_pydantic.geometries import LineString as GeoJSONLineString
from datetime import datetime, timedelta
import numpy as np
import os
import json
import logging
logger = logging.getLogger(__name__)
class Trip:
def __init__(self, trip_id, route_name, headsign, url, calendar, departureTime, path, agency, lastUpdated, stop_times, driver: Driver, additional_ridesharing_info: RidesharingInfo, bbox):
if isinstance(calendar, set):
self.runs_regularly = True
self.weekdays = [
1 if Weekday.monday in calendar else 0,
1 if Weekday.tuesday in calendar else 0,
1 if Weekday.wednesday in calendar else 0,
1 if Weekday.thursday in calendar else 0,
1 if Weekday.friday in calendar else 0,
1 if Weekday.saturday in calendar else 0,
1 if Weekday.sunday in calendar else 0,
]
start_in_day = self._total_seconds(departureTime)
else:
self.start = datetime.combine(calendar, departureTime)
self.runs_regularly = False
self.weekdays = [0,0,0,0,0,0,0]
self.start_time = departureTime
self.path = path
self.trip_id = trip_id
self.url = url
self.agency = agency
self.stops = []
self.lastUpdated = lastUpdated
self.stop_times = stop_times
self.driver = driver
self.additional_ridesharing_info = additional_ridesharing_info
self.bbox = bbox
self.route_name = route_name
self.trip_headsign = headsign
def path_as_line_string(self):
return self.path
def _total_seconds(self, instant):
return instant.hour * 3600 + instant.minute * 60 + instant.second
def start_time_str(self):
return self.start_time.strftime("%H:%M:%S")
def next_trip_dates(self, start_date, day_count=14):
if self.runs_regularly:
for single_date in (start_date + timedelta(n) for n in range(day_count)):
if self.weekdays[single_date.weekday()]==1:
yield single_date.strftime("%Y%m%d")
else:
yield self.start.strftime("%Y%m%d")
def route_long_name(self):
return self.route_name
def intersects(self, bbox):
return self.bbox.intersects(box(*bbox))
class TripStore():
"""
TripStore maintains the currently valid trips. A trip is a
carpool offer enhanced with all stops this
Attributes:
trips Dict of currently valid trips.
deleted_trips Dict of recently deleted trips.
"""
def __init__(self, stops_store):
self.transformer = TripTransformer(stops_store)
self.stops_store = stops_store
self.trips = {}
self.deleted_trips = {}
self.recent_trips = {}
def put_carpool(self, carpool: Carpool):
"""
Adds carpool to the TripStore.
"""
id = "{}:{}".format(carpool.agency, carpool.id)
filename = f'data/enhanced/{carpool.agency}/{carpool.id}.json'
try:
existing_carpool = self._load_carpool_if_exists(carpool.agency, carpool.id)
if existing_carpool and existing_carpool.lastUpdated == carpool.lastUpdated:
enhanced_carpool = existing_carpool
else:
if len(carpool.stops) < 2 or self.distance_in_m(carpool) < 1000:
logger.warning("Failed to add carpool %s:%s to TripStore, distance too low", carpool.agency, carpool.id)
self.handle_failed_carpool_enhancement(carpool)
return
enhanced_carpool = self.transformer.enhance_carpool(carpool)
# TODO should only store enhanced_carpool, if it has 2 or more stops
assert_folder_exists(f'data/enhanced/{carpool.agency}/')
with open(filename, 'w', encoding='utf-8') as f:
f.write(enhanced_carpool.json())
logger.info("Added enhanced carpool %s:%s", carpool.agency, carpool.id)
return self._load_as_trip(enhanced_carpool)
except RoutingException as err:
logger.warning("Failed to add carpool %s:%s to TripStore due to RoutingException %s", carpool.agency, carpool.id, getattr(err, 'message', repr(err)))
self.handle_failed_carpool_enhancement(carpool)
except Exception as err:
logger.error("Failed to add carpool %s:%s to TripStore.", carpool.agency, carpool.id, exc_info=True)
self.handle_failed_carpool_enhancement(carpool)
def handle_failed_carpool_enhancement(sellf, carpool: Carpool):
assert_folder_exists(f'data/failed/{carpool.agency}/')
with open(f'data/failed/{carpool.agency}/{carpool.id}.json', 'w', encoding='utf-8') as f:
f.write(carpool.json())
def distance_in_m(self, carpool):
if len(carpool.stops) < 2:
return 0
s1 = carpool.stops[0]
s2 = carpool.stops[-1]
return geodesic_distance_in_m((s1.lon, s1.lat),(s2.lon, s2.lat))
def recently_added_trips(self):
return list(self.recent_trips.values())
def recently_deleted_trips(self):
return list(self.deleted_trips.values())
def _load_carpool_if_exists(self, agency_id: str, carpool_id: str):
if carpool_exists(agency_id, carpool_id, 'data/enhanced'):
try:
return load_carpool(agency_id, carpool_id, 'data/enhanced')
except Exception as e:
# An error on restore could be caused by model changes,
# in such a case, it need's to be recreated
logger.warning("Could not restore enhanced trip %s:%s, reason: %s", agency_id, carpool_id, repr(e))
return None
def _load_as_trip(self, carpool: Carpool):
trip = self.transformer.transform_to_trip(carpool)
id = trip.trip_id
self.trips[id] = trip
if not is_older_than_days(carpool.lastUpdated, 1):
self.recent_trips[id] = trip
logger.debug("Added trip %s", id)
return trip
def delete_carpool(self, agency_id: str, carpool_id: str):
"""
Deletes carpool from the TripStore.
"""
agencyScopedCarpoolId = f"{agency_id}:{carpool_id}"
trip_to_be_deleted = self.trips.get(agencyScopedCarpoolId)
if trip_to_be_deleted:
self.deleted_trips[agencyScopedCarpoolId] = trip_to_be_deleted
del self.trips[agencyScopedCarpoolId]
if self.recent_trips.get(agencyScopedCarpoolId):
del self.recent_trips[agencyScopedCarpoolId]
if carpool_exists(agency_id, carpool_id):
remove_carpool_file(agency_id, carpool_id)
logger.debug("Deleted trip %s", id)
def unflag_unrecent_updates(self):
"""
Trips that were last updated before yesterday, are not recent
any longer. As no updates need to be sent for them any longer,
they will be removed from recent recent_trips and deleted_trips.
"""
for key in list(self.recent_trips):
t = self.recent_trips.get(key)
if t and t.lastUpdated.date() < yesterday():
del self.recent_trips[key]
for key in list(self.deleted_trips):
t = self.deleted_trips.get(key)
if t and t.lastUpdated.date() < yesterday():
del self.deleted_trips[key]
class TripTransformer:
REPLACE_CARPOOL_STOPS_BY_CLOSEST_TRANSIT_STOPS = True
REPLACEMENT_STOPS_SERACH_RADIUS_IN_M = 1000
SIMPLIFY_TOLERANCE = 0.0001
router = RoutingService(config.graphhopper_base_url)
def __init__(self, stops_store):
self.stops_store = stops_store
def transform_to_trip(self, carpool : Carpool):
stop_times = self._convert_stop_times(carpool)
route_name = carpool.stops[0].name + " nach " + carpool.stops[-1].name
headsign= carpool.stops[-1].name
trip_id = self._trip_id(carpool)
path = carpool.path
bbox = box(
min([pt[0] for pt in path.coordinates]),
min([pt[1] for pt in path.coordinates]),
max([pt[0] for pt in path.coordinates]),
max([pt[1] for pt in path.coordinates]))
trip = Trip(trip_id, route_name, headsign, str(carpool.deeplink), carpool.departureDate, carpool.departureTime, carpool.path, carpool.agency, carpool.lastUpdated, stop_times, carpool.driver, carpool.additional_ridesharing_info, bbox)
return trip
def _trip_id(self, carpool):
return f"{carpool.agency}:{carpool.id}"
def _replace_stops_by_transit_stops(self, carpool, max_search_distance):
new_stops = []
for carpool_stop in carpool.stops:
new_stops.append(self.stops_store.find_closest_stop(carpool_stop, max_search_distance))
return new_stops
def enhance_carpool(self, carpool):
if self.REPLACE_CARPOOL_STOPS_BY_CLOSEST_TRANSIT_STOPS:
carpool.stops = self._replace_stops_by_transit_stops(carpool, self.REPLACEMENT_STOPS_SERACH_RADIUS_IN_M)
path = self._path_for_ride(carpool)
lineString_shapely_wgs84 = LineString(coordinates = path["points"]["coordinates"]).simplify(0.0001)
lineString_wgs84 = GeoJSONLineString(type="LineString", coordinates=list(lineString_shapely_wgs84.coords))
virtual_stops = self.stops_store.find_additional_stops_around(lineString_wgs84, carpool.stops)
if not virtual_stops.empty:
virtual_stops["time"] = self._estimate_times(path, virtual_stops['distance'])
logger.debug("Virtual stops found: {}".format(virtual_stops))
if len(virtual_stops) > MAX_STOPS_PER_TRIP:
# in case we found more than MAX_STOPS_PER_TRIP, we retain first and last
# half of MAX_STOPS_PER_TRIP
virtual_stops = virtual_stops.iloc[np.r_[0:int(MAX_STOPS_PER_TRIP/2), int(MAX_STOPS_PER_TRIP/2):]]
trip_id = f"{carpool.agency}:{carpool.id}"
stop_times = self._stops_and_stop_times(carpool.departureTime, trip_id, virtual_stops)
enhanced_carpool = carpool.copy()
enhanced_carpool.stops = stop_times
enhanced_carpool.path = lineString_wgs84
return enhanced_carpool
def _convert_stop_times(self, carpool):
stop_times = [GtfsStopTime(
self._trip_id(carpool),
stop.arrivalTime,
stop.departureTime,
stop.id,
seq_nr+1,
STOP_TIMES_STOP_TYPE_NONE if stop.pickup_dropoff == PickupDropoffType.only_dropoff else STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER,
STOP_TIMES_STOP_TYPE_NONE if stop.pickup_dropoff == PickupDropoffType.only_pickup else STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER,
STOP_TIMES_TIMEPOINT_APPROXIMATE)
for seq_nr, stop in enumerate(carpool.stops)]
return stop_times
def _path_for_ride(self, carpool):
points = self._stop_coords(carpool.stops)
return self.router.path_for_stops(points)
def _stop_coords(self, stops):
# Retrieve coordinates of all officially announced stops (start, intermediate, target)
return [Point(stop.lon, stop.lat) for stop in stops]
def _estimate_times(self, path, distances_from_start):
cumulated_distance = 0
cumulated_time = 0
stop_times = []
instructions = path["instructions"]
cnt = 0
instr_distance = instructions[cnt]["distance"]
instr_time = instructions[cnt]["time"]
for distance in distances_from_start:
while cnt < len(instructions) and cumulated_distance + instructions[cnt]["distance"] < distance:
cumulated_distance = cumulated_distance + instructions[cnt]["distance"]
cumulated_time = cumulated_time + instructions[cnt]["time"]
cnt = cnt + 1
if cnt < len(instructions):
if instructions[cnt]["distance"] ==0:
raise RoutingException("Origin and destinaction too close")
percent_dist = (distance - cumulated_distance) / instructions[cnt]["distance"]
stop_time = cumulated_time + percent_dist * instructions[cnt]["time"]
stop_times.append(stop_time)
else:
logger.debug("distance {} exceeds total length {}, using max arrival time {}".format(distance, cumulated_distance, cumulated_time))
stop_times.append(cumulated_time)
return stop_times
def _stops_and_stop_times(self, start_time, trip_id, stops_frame):
# Assumptions:
# arrival_time = departure_time
# pickup_type, drop_off_type for origin: = coordinate/none
# pickup_type, drop_off_type for destination: = none/coordinate
# timepoint = approximate for origin and destination (not sure what consequences this might have for trip planners)
number_of_stops = len(stops_frame.index)
total_distance = stops_frame.iloc[number_of_stops-1]["distance"]
first_stop_time = GtfsTimeDelta(hours = start_time.hour, minutes = start_time.minute, seconds = start_time.second)
stop_times = []
seq_nr = 0
for i in range(0, number_of_stops):
current_stop = stops_frame.iloc[i]
if not current_stop.id:
continue
elif i == 0:
if (stops_frame.iloc[1].time-current_stop.time) < 1000:
# skip custom stop if there is an official stop very close by
logger.debug("Skipped stop %s", current_stop.id)
continue
else:
if (current_stop.time-stops_frame.iloc[i-1].time) < 5000 and not i==1 and not is_carpooling_stop(current_stop.id, current_stop.stop_name):
# skip latter stop if it's very close (<5 seconds drive) by the preceding
logger.debug("Skipped stop %s", current_stop.id)
continue
trip_time = timedelta(milliseconds=int(current_stop.time))
is_dropoff = self._is_dropoff_stop(current_stop, total_distance)
is_pickup = self._is_pickup_stop(current_stop, total_distance)
# TODO would be nice if possible to publish a minimum shared distance
pickup_type = STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER if is_pickup else STOP_TIMES_STOP_TYPE_NONE
dropoff_type = STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER if is_dropoff else STOP_TIMES_STOP_TYPE_NONE
if is_pickup and not is_dropoff:
pickup_dropoff = PickupDropoffType.only_pickup
elif not is_pickup and is_dropoff:
pickup_dropoff = PickupDropoffType.only_dropoff
else:
pickup_dropoff = PickupDropoffType.pickup_and_dropoff
next_stop_time = first_stop_time + trip_time
seq_nr += 1
stop_times.append(StopTime(**{
'arrivalTime': str(next_stop_time),
'departureTime': str(next_stop_time),
'id': current_stop.id,
'pickup_dropoff': pickup_dropoff,
'name': str(current_stop.stop_name),
'lat': current_stop.y,
'lon': current_stop.x
}))
return stop_times
def _is_dropoff_stop(self, current_stop, total_distance):
return current_stop["distance"] >= 0.5 * total_distance
def _is_pickup_stop(self, current_stop, total_distance):
return current_stop["distance"] < 0.5 * total_distance
def load_carpool(agency_id: str, carpool_id: str, folder: str ='data/enhanced') -> Carpool:
with open(f'{folder}/{agency_id}/{carpool_id}.json', 'r', encoding='utf-8') as f:
dict = json.load(f)
carpool = Carpool(**dict)
return carpool
def carpool_exists(agency_id: str, carpool_id: str, folder: str ='data/enhanced'):
return os.path.exists(f"{folder}/{agency_id}/{carpool_id}.json")
def remove_carpool_file(agency_id: str, carpool_id: str, folder: str ='data/enhanced'):
return os.remove(f"{folder}/{agency_id}/{carpool_id}.json")