From b422be2b36fbb102d8bb6585fd41b016894810d6 Mon Sep 17 00:00:00 2001 From: Francia Csaba Date: Fri, 19 Jan 2024 15:16:57 +0100 Subject: [PATCH] #28 move CarpoolService and Tripstore from core to enhancer --- amarillo/plugins/enhancer/configuration.py | 4 +- .../plugins/enhancer/services/carpools.py | 60 +++ amarillo/plugins/enhancer/services/trips.py | 376 ++++++++++++++++++ 3 files changed, 438 insertions(+), 2 deletions(-) create mode 100644 amarillo/plugins/enhancer/services/carpools.py create mode 100644 amarillo/plugins/enhancer/services/trips.py diff --git a/amarillo/plugins/enhancer/configuration.py b/amarillo/plugins/enhancer/configuration.py index 0e19314..821e365 100644 --- a/amarillo/plugins/enhancer/configuration.py +++ b/amarillo/plugins/enhancer/configuration.py @@ -6,8 +6,8 @@ from glob import glob from amarillo.app.models.Carpool import Carpool from amarillo.app.services import stops -from amarillo.app.services import trips -from amarillo.app.services.carpools import CarpoolService +from amarillo.plugins.enhancer.services import trips +from amarillo.plugins.enhancer.services.carpools import CarpoolService from amarillo.plugins.enhancer.services import gtfs_generator from amarillo.app.configuration import configure_services diff --git a/amarillo/plugins/enhancer/services/carpools.py b/amarillo/plugins/enhancer/services/carpools.py new file mode 100644 index 0000000..5d6b97c --- /dev/null +++ b/amarillo/plugins/enhancer/services/carpools.py @@ -0,0 +1,60 @@ +import json +import logging +from datetime import datetime +from typing import Dict +from amarillo.app.models.Carpool import Carpool +from amarillo.app.utils.utils import yesterday, is_older_than_days + +logger = logging.getLogger(__name__) + +class CarpoolService(): + MAX_OFFER_AGE_IN_DAYS = 180 + + def __init__(self, trip_store): + + self.trip_store = trip_store + self.carpools: Dict[str, Carpool] = {} + + def is_outdated(self, carpool): + """ + A carpool offer is outdated, if + * it's completly in the past (if it's a single date offer). + As we know the start time but not latest arrival, we deem + offers starting the day before yesterday as outdated + * it's last update occured before MAX_OFFER_AGE_IN_DAYS + """ + runs_once = not isinstance(carpool.departureDate, set) + return (is_older_than_days(carpool.lastUpdated.date(), self.MAX_OFFER_AGE_IN_DAYS) or + (runs_once and carpool.departureDate < yesterday())) + + def purge_outdated_offers(self): + """ + Iterates over all carpools and deletes those which are outdated + """ + for key in list(self.carpools.keys()): + cp = self.carpools.get(key) + if cp and self.is_outdated(cp): + logger.info("Purge outdated offer %s", key) + self.delete(cp.agency, cp.id) + + def get(self, agency_id: str, carpool_id: str): + return self.carpools.get(f"{agency_id}:{carpool_id}") + + def get_all_ids(self): + return list(self.carpools) + + def put(self, agency_id: str, carpool_id: str, carpool): + self.carpools[f"{agency_id}:{carpool_id}"] = carpool + # Outdated trips (which might have been in the store) + # will be deleted + if self.is_outdated(carpool): + logger.info('Deleting outdated carpool %s:%s', agency_id, carpool_id) + self.delete(agency_id, carpool_id) + else: + self.trip_store.put_carpool(carpool) + + def delete(self, agency_id: str, carpool_id: str): + id = f"{agency_id}:{carpool_id}" + if id in self.carpools: + del self.carpools[id] + self.trip_store.delete_carpool(agency_id, carpool_id) diff --git a/amarillo/plugins/enhancer/services/trips.py b/amarillo/plugins/enhancer/services/trips.py new file mode 100644 index 0000000..6fb58ff --- /dev/null +++ b/amarillo/plugins/enhancer/services/trips.py @@ -0,0 +1,376 @@ +from amarillo.app.models.gtfs import GtfsTimeDelta, GtfsStopTime +from amarillo.app.models.Carpool import MAX_STOPS_PER_TRIP, Carpool, Weekday, StopTime, PickupDropoffType +from amarillo.app.services.gtfs_constants import * +from amarillo.app.services.routing import RoutingService, RoutingException +from amarillo.app.services.stops import is_carpooling_stop +from amarillo.app.utils.utils import assert_folder_exists, is_older_than_days, yesterday, geodesic_distance_in_m +from shapely.geometry import Point, LineString, box +from geojson_pydantic.geometries import LineString as GeoJSONLineString +from datetime import datetime, timedelta +import numpy as np +import os +import json +import logging + +logger = logging.getLogger(__name__) + +class Trip: + + # TODO: add driver attributes, additional ridesharing info + def __init__(self, trip_id, route_name, headsign, url, calendar, departureTime, path, agency, lastUpdated, stop_times, bbox): + if isinstance(calendar, set): + self.runs_regularly = True + self.weekdays = [ + 1 if Weekday.monday in calendar else 0, + 1 if Weekday.tuesday in calendar else 0, + 1 if Weekday.wednesday in calendar else 0, + 1 if Weekday.thursday in calendar else 0, + 1 if Weekday.friday in calendar else 0, + 1 if Weekday.saturday in calendar else 0, + 1 if Weekday.sunday in calendar else 0, + ] + start_in_day = self._total_seconds(departureTime) + else: + self.start = datetime.combine(calendar, departureTime) + self.runs_regularly = False + self.weekdays = [0,0,0,0,0,0,0] + + self.start_time = departureTime + self.path = path + self.trip_id = trip_id + self.url = url + self.agency = agency + self.stops = [] + self.lastUpdated = lastUpdated + self.stop_times = stop_times + self.bbox = bbox + self.route_name = route_name + self.trip_headsign = headsign + + def path_as_line_string(self): + return self.path + + def _total_seconds(self, instant): + return instant.hour * 3600 + instant.minute * 60 + instant.second + + def start_time_str(self): + return self.start_time.strftime("%H:%M:%S") + + def next_trip_dates(self, start_date, day_count=14): + if self.runs_regularly: + for single_date in (start_date + timedelta(n) for n in range(day_count)): + if self.weekdays[single_date.weekday()]==1: + yield single_date.strftime("%Y%m%d") + else: + yield self.start.strftime("%Y%m%d") + + def route_long_name(self): + return self.route_name + + def intersects(self, bbox): + return self.bbox.intersects(box(*bbox)) + + +class TripStore(): + """ + TripStore maintains the currently valid trips. A trip is a + carpool offer enhanced with all stops this + + Attributes: + trips Dict of currently valid trips. + deleted_trips Dict of recently deleted trips. + """ + + def __init__(self, stops_store): + self.transformer = TripTransformer(stops_store) + self.stops_store = stops_store + self.trips = {} + self.deleted_trips = {} + self.recent_trips = {} + + + def put_carpool(self, carpool: Carpool): + """ + Adds carpool to the TripStore. + """ + id = "{}:{}".format(carpool.agency, carpool.id) + filename = f'data/enhanced/{carpool.agency}/{carpool.id}.json' + try: + existing_carpool = self._load_carpool_if_exists(carpool.agency, carpool.id) + if existing_carpool and existing_carpool.lastUpdated == carpool.lastUpdated: + enhanced_carpool = existing_carpool + else: + if len(carpool.stops) < 2 or self.distance_in_m(carpool) < 1000: + logger.warning("Failed to add carpool %s:%s to TripStore, distance too low", carpool.agency, carpool.id) + self.handle_failed_carpool_enhancement(carpool) + return + enhanced_carpool = self.transformer.enhance_carpool(carpool) + # TODO should only store enhanced_carpool, if it has 2 or more stops + assert_folder_exists(f'data/enhanced/{carpool.agency}/') + with open(filename, 'w', encoding='utf-8') as f: + f.write(enhanced_carpool.json()) + logger.info("Added enhanced carpool %s:%s", carpool.agency, carpool.id) + + return self._load_as_trip(enhanced_carpool) + except RoutingException as err: + logger.warning("Failed to add carpool %s:%s to TripStore due to RoutingException %s", carpool.agency, carpool.id, getattr(err, 'message', repr(err))) + self.handle_failed_carpool_enhancement(carpool) + except Exception as err: + logger.error("Failed to add carpool %s:%s to TripStore.", carpool.agency, carpool.id, exc_info=True) + self.handle_failed_carpool_enhancement(carpool) + + def handle_failed_carpool_enhancement(sellf, carpool: Carpool): + assert_folder_exists(f'data/failed/{carpool.agency}/') + with open(f'data/failed/{carpool.agency}/{carpool.id}.json', 'w', encoding='utf-8') as f: + f.write(carpool.json()) + + def distance_in_m(self, carpool): + if len(carpool.stops) < 2: + return 0 + s1 = carpool.stops[0] + s2 = carpool.stops[-1] + return geodesic_distance_in_m((s1.lon, s1.lat),(s2.lon, s2.lat)) + + def recently_added_trips(self): + return list(self.recent_trips.values()) + + def recently_deleted_trips(self): + return list(self.deleted_trips.values()) + + def _load_carpool_if_exists(self, agency_id: str, carpool_id: str): + if carpool_exists(agency_id, carpool_id, 'data/enhanced'): + try: + return load_carpool(agency_id, carpool_id, 'data/enhanced') + except Exception as e: + # An error on restore could be caused by model changes, + # in such a case, it need's to be recreated + logger.warning("Could not restore enhanced trip %s:%s, reason: %s", agency_id, carpool_id, repr(e)) + + return None + + def _load_as_trip(self, carpool: Carpool): + trip = self.transformer.transform_to_trip(carpool) + id = trip.trip_id + self.trips[id] = trip + if not is_older_than_days(carpool.lastUpdated, 1): + self.recent_trips[id] = trip + logger.debug("Added trip %s", id) + + return trip + + def delete_carpool(self, agency_id: str, carpool_id: str): + """ + Deletes carpool from the TripStore. + """ + agencyScopedCarpoolId = f"{agency_id}:{carpool_id}" + trip_to_be_deleted = self.trips.get(agencyScopedCarpoolId) + if trip_to_be_deleted: + self.deleted_trips[agencyScopedCarpoolId] = trip_to_be_deleted + del self.trips[agencyScopedCarpoolId] + + if self.recent_trips.get(agencyScopedCarpoolId): + del self.recent_trips[agencyScopedCarpoolId] + + if carpool_exists(agency_id, carpool_id): + remove_carpool_file(agency_id, carpool_id) + + logger.debug("Deleted trip %s", id) + + def unflag_unrecent_updates(self): + """ + Trips that were last updated before yesterday, are not recent + any longer. As no updates need to be sent for them any longer, + they will be removed from recent recent_trips and deleted_trips. + """ + for key in list(self.recent_trips): + t = self.recent_trips.get(key) + if t and t.lastUpdated.date() < yesterday(): + del self.recent_trips[key] + + for key in list(self.deleted_trips): + t = self.deleted_trips.get(key) + if t and t.lastUpdated.date() < yesterday(): + del self.deleted_trips[key] + + +class TripTransformer: + REPLACE_CARPOOL_STOPS_BY_CLOSEST_TRANSIT_STOPS = True + REPLACEMENT_STOPS_SERACH_RADIUS_IN_M = 1000 + SIMPLIFY_TOLERANCE = 0.0001 + + router = RoutingService() + + def __init__(self, stops_store): + self.stops_store = stops_store + + def transform_to_trip(self, carpool): + stop_times = self._convert_stop_times(carpool) + route_name = carpool.stops[0].name + " nach " + carpool.stops[-1].name + headsign= carpool.stops[-1].name + trip_id = self._trip_id(carpool) + path = carpool.path + bbox = box( + min([pt[0] for pt in path.coordinates]), + min([pt[1] for pt in path.coordinates]), + max([pt[0] for pt in path.coordinates]), + max([pt[1] for pt in path.coordinates])) + + # TODO: pass driver and ridesharing info object to the Trip constructor + trip = Trip(trip_id, route_name, headsign, str(carpool.deeplink), carpool.departureDate, carpool.departureTime, carpool.path, carpool.agency, carpool.lastUpdated, stop_times, bbox) + + return trip + + def _trip_id(self, carpool): + return f"{carpool.agency}:{carpool.id}" + + def _replace_stops_by_transit_stops(self, carpool, max_search_distance): + new_stops = [] + for carpool_stop in carpool.stops: + new_stops.append(self.stops_store.find_closest_stop(carpool_stop, max_search_distance)) + return new_stops + + def enhance_carpool(self, carpool): + if self.REPLACE_CARPOOL_STOPS_BY_CLOSEST_TRANSIT_STOPS: + carpool.stops = self._replace_stops_by_transit_stops(carpool, self.REPLACEMENT_STOPS_SERACH_RADIUS_IN_M) + + path = self._path_for_ride(carpool) + lineString_shapely_wgs84 = LineString(coordinates = path["points"]["coordinates"]).simplify(0.0001) + lineString_wgs84 = GeoJSONLineString(type="LineString", coordinates=list(lineString_shapely_wgs84.coords)) + virtual_stops = self.stops_store.find_additional_stops_around(lineString_wgs84, carpool.stops) + if not virtual_stops.empty: + virtual_stops["time"] = self._estimate_times(path, virtual_stops['distance']) + logger.debug("Virtual stops found: {}".format(virtual_stops)) + if len(virtual_stops) > MAX_STOPS_PER_TRIP: + # in case we found more than MAX_STOPS_PER_TRIP, we retain first and last + # half of MAX_STOPS_PER_TRIP + virtual_stops = virtual_stops.iloc[np.r_[0:int(MAX_STOPS_PER_TRIP/2), int(MAX_STOPS_PER_TRIP/2):]] + + trip_id = f"{carpool.agency}:{carpool.id}" + stop_times = self._stops_and_stop_times(carpool.departureTime, trip_id, virtual_stops) + + enhanced_carpool = carpool.copy() + enhanced_carpool.stops = stop_times + enhanced_carpool.path = lineString_wgs84 + return enhanced_carpool + + def _convert_stop_times(self, carpool): + + stop_times = [GtfsStopTime( + self._trip_id(carpool), + stop.arrivalTime, + stop.departureTime, + stop.id, + seq_nr+1, + STOP_TIMES_STOP_TYPE_NONE if stop.pickup_dropoff == PickupDropoffType.only_dropoff else STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER, + STOP_TIMES_STOP_TYPE_NONE if stop.pickup_dropoff == PickupDropoffType.only_pickup else STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER, + STOP_TIMES_TIMEPOINT_APPROXIMATE) + for seq_nr, stop in enumerate(carpool.stops)] + return stop_times + + def _path_for_ride(self, carpool): + points = self._stop_coords(carpool.stops) + return self.router.path_for_stops(points) + + def _stop_coords(self, stops): + # Retrieve coordinates of all officially announced stops (start, intermediate, target) + return [Point(stop.lon, stop.lat) for stop in stops] + + def _estimate_times(self, path, distances_from_start): + cumulated_distance = 0 + cumulated_time = 0 + stop_times = [] + instructions = path["instructions"] + + cnt = 0 + instr_distance = instructions[cnt]["distance"] + instr_time = instructions[cnt]["time"] + + for distance in distances_from_start: + while cnt < len(instructions) and cumulated_distance + instructions[cnt]["distance"] < distance: + cumulated_distance = cumulated_distance + instructions[cnt]["distance"] + cumulated_time = cumulated_time + instructions[cnt]["time"] + cnt = cnt + 1 + + if cnt < len(instructions): + if instructions[cnt]["distance"] ==0: + raise RoutingException("Origin and destinaction too close") + percent_dist = (distance - cumulated_distance) / instructions[cnt]["distance"] + stop_time = cumulated_time + percent_dist * instructions[cnt]["time"] + stop_times.append(stop_time) + else: + logger.debug("distance {} exceeds total length {}, using max arrival time {}".format(distance, cumulated_distance, cumulated_time)) + stop_times.append(cumulated_time) + return stop_times + + def _stops_and_stop_times(self, start_time, trip_id, stops_frame): + # Assumptions: + # arrival_time = departure_time + # pickup_type, drop_off_type for origin: = coordinate/none + # pickup_type, drop_off_type for destination: = none/coordinate + # timepoint = approximate for origin and destination (not sure what consequences this might have for trip planners) + number_of_stops = len(stops_frame.index) + total_distance = stops_frame.iloc[number_of_stops-1]["distance"] + + first_stop_time = GtfsTimeDelta(hours = start_time.hour, minutes = start_time.minute, seconds = start_time.second) + stop_times = [] + seq_nr = 0 + for i in range(0, number_of_stops): + current_stop = stops_frame.iloc[i] + + if not current_stop.id: + continue + elif i == 0: + if (stops_frame.iloc[1].time-current_stop.time) < 1000: + # skip custom stop if there is an official stop very close by + logger.debug("Skipped stop %s", current_stop.id) + continue + else: + if (current_stop.time-stops_frame.iloc[i-1].time) < 5000 and not i==1 and not is_carpooling_stop(current_stop.id, current_stop.stop_name): + # skip latter stop if it's very close (<5 seconds drive) by the preceding + logger.debug("Skipped stop %s", current_stop.id) + continue + trip_time = timedelta(milliseconds=int(current_stop.time)) + is_dropoff = self._is_dropoff_stop(current_stop, total_distance) + is_pickup = self._is_pickup_stop(current_stop, total_distance) + # TODO would be nice if possible to publish a minimum shared distance + pickup_type = STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER if is_pickup else STOP_TIMES_STOP_TYPE_NONE + dropoff_type = STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER if is_dropoff else STOP_TIMES_STOP_TYPE_NONE + + if is_pickup and not is_dropoff: + pickup_dropoff = PickupDropoffType.only_pickup + elif not is_pickup and is_dropoff: + pickup_dropoff = PickupDropoffType.only_dropoff + else: + pickup_dropoff = PickupDropoffType.pickup_and_dropoff + + next_stop_time = first_stop_time + trip_time + seq_nr += 1 + stop_times.append(StopTime(**{ + 'arrivalTime': str(next_stop_time), + 'departureTime': str(next_stop_time), + 'id': current_stop.id, + 'pickup_dropoff': pickup_dropoff, + 'name': str(current_stop.stop_name), + 'lat': current_stop.y, + 'lon': current_stop.x + })) + + return stop_times + + def _is_dropoff_stop(self, current_stop, total_distance): + return current_stop["distance"] >= 0.5 * total_distance + + def _is_pickup_stop(self, current_stop, total_distance): + return current_stop["distance"] < 0.5 * total_distance + +def load_carpool(agency_id: str, carpool_id: str, folder: str ='data/enhanced') -> Carpool: + with open(f'{folder}/{agency_id}/{carpool_id}.json', 'r', encoding='utf-8') as f: + dict = json.load(f) + carpool = Carpool(**dict) + return carpool + +def carpool_exists(agency_id: str, carpool_id: str, folder: str ='data/enhanced'): + return os.path.exists(f"{folder}/{agency_id}/{carpool_id}.json") + +def remove_carpool_file(agency_id: str, carpool_id: str, folder: str ='data/enhanced'): + return os.remove(f"{folder}/{agency_id}/{carpool_id}.json")