382 lines
17 KiB
Python
382 lines
17 KiB
Python
from ..models.gtfs import GtfsTimeDelta, GtfsStopTime
|
|
from ..models.Carpool import MAX_STOPS_PER_TRIP, Carpool, Weekday, StopTime, PickupDropoffType, Driver, RidesharingInfo
|
|
# from amarillo.services.config import config
|
|
from ..gtfs_constants import *
|
|
# from amarillo.plugins.enhancer.services.routing import RoutingService, RoutingException
|
|
from amarillo_stops.stops import is_carpooling_stop
|
|
from amarillo.utils.utils import assert_folder_exists, is_older_than_days, yesterday, geodesic_distance_in_m
|
|
from shapely.geometry import Point, LineString, box
|
|
from geojson_pydantic.geometries import LineString as GeoJSONLineString
|
|
from datetime import datetime, timedelta
|
|
import numpy as np
|
|
import os
|
|
import json
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class Trip:
|
|
|
|
def __init__(self, trip_id, route_name, headsign, url, calendar, departureTime, path, agency, lastUpdated, stop_times, driver: Driver, additional_ridesharing_info: RidesharingInfo, route_color, route_text_color, bbox):
|
|
if isinstance(calendar, set):
|
|
self.runs_regularly = True
|
|
self.weekdays = [
|
|
1 if Weekday.monday in calendar else 0,
|
|
1 if Weekday.tuesday in calendar else 0,
|
|
1 if Weekday.wednesday in calendar else 0,
|
|
1 if Weekday.thursday in calendar else 0,
|
|
1 if Weekday.friday in calendar else 0,
|
|
1 if Weekday.saturday in calendar else 0,
|
|
1 if Weekday.sunday in calendar else 0,
|
|
]
|
|
start_in_day = self._total_seconds(departureTime)
|
|
else:
|
|
self.start = datetime.combine(calendar, departureTime)
|
|
self.runs_regularly = False
|
|
self.weekdays = [0,0,0,0,0,0,0]
|
|
|
|
self.start_time = departureTime
|
|
self.path = path
|
|
self.trip_id = trip_id
|
|
self.url = url
|
|
self.agency = agency
|
|
self.stops = []
|
|
self.lastUpdated = lastUpdated
|
|
self.stop_times = stop_times
|
|
self.driver = driver
|
|
self.additional_ridesharing_info = additional_ridesharing_info
|
|
self.route_color = route_color
|
|
self.route_text_color = route_text_color
|
|
self.bbox = bbox
|
|
self.route_name = route_name
|
|
self.trip_headsign = headsign
|
|
|
|
def path_as_line_string(self):
|
|
return self.path
|
|
|
|
def _total_seconds(self, instant):
|
|
return instant.hour * 3600 + instant.minute * 60 + instant.second
|
|
|
|
def start_time_str(self):
|
|
return self.start_time.strftime("%H:%M:%S")
|
|
|
|
def next_trip_dates(self, start_date, day_count=14):
|
|
if self.runs_regularly:
|
|
for single_date in (start_date + timedelta(n) for n in range(day_count)):
|
|
if self.weekdays[single_date.weekday()]==1:
|
|
yield single_date.strftime("%Y%m%d")
|
|
else:
|
|
yield self.start.strftime("%Y%m%d")
|
|
|
|
def route_long_name(self):
|
|
return self.route_name
|
|
|
|
def intersects(self, bbox):
|
|
return self.bbox.intersects(box(*bbox))
|
|
|
|
|
|
class TripStore():
|
|
"""
|
|
TripStore maintains the currently valid trips. A trip is a
|
|
carpool offer enhanced with all stops this
|
|
|
|
Attributes:
|
|
trips Dict of currently valid trips.
|
|
deleted_trips Dict of recently deleted trips.
|
|
"""
|
|
|
|
def __init__(self, stops_store):
|
|
self.transformer = TripTransformer(stops_store)
|
|
self.stops_store = stops_store
|
|
self.trips = {}
|
|
self.deleted_trips = {}
|
|
self.recent_trips = {}
|
|
|
|
|
|
def put_carpool(self, carpool: Carpool):
|
|
"""
|
|
Adds carpool to the TripStore.
|
|
"""
|
|
return self._load_as_trip(carpool)
|
|
# id = "{}:{}".format(carpool.agency, carpool.id)
|
|
# filename = f'data/enhanced/{carpool.agency}/{carpool.id}.json'
|
|
# try:
|
|
# existing_carpool = self._load_carpool_if_exists(carpool.agency, carpool.id)
|
|
# if existing_carpool and existing_carpool.lastUpdated == carpool.lastUpdated:
|
|
# enhanced_carpool = existing_carpool
|
|
# else:
|
|
# if len(carpool.stops) < 2 or self.distance_in_m(carpool) < 1000:
|
|
# logger.warning("Failed to add carpool %s:%s to TripStore, distance too low", carpool.agency, carpool.id)
|
|
# self.handle_failed_carpool_enhancement(carpool)
|
|
# return
|
|
# enhanced_carpool = self.transformer.enhance_carpool(carpool)
|
|
# # TODO should only store enhanced_carpool, if it has 2 or more stops
|
|
# assert_folder_exists(f'data/enhanced/{carpool.agency}/')
|
|
# with open(filename, 'w', encoding='utf-8') as f:
|
|
# f.write(enhanced_carpool.json())
|
|
# logger.info("Added enhanced carpool %s:%s", carpool.agency, carpool.id)
|
|
|
|
# return self._load_as_trip(enhanced_carpool)
|
|
# except RoutingException as err:
|
|
# logger.warning("Failed to add carpool %s:%s to TripStore due to RoutingException %s", carpool.agency, carpool.id, getattr(err, 'message', repr(err)))
|
|
# self.handle_failed_carpool_enhancement(carpool)
|
|
# except Exception as err:
|
|
# logger.error("Failed to add carpool %s:%s to TripStore.", carpool.agency, carpool.id, exc_info=True)
|
|
# self.handle_failed_carpool_enhancement(carpool)
|
|
|
|
def handle_failed_carpool_enhancement(sellf, carpool: Carpool):
|
|
assert_folder_exists(f'data/failed/{carpool.agency}/')
|
|
with open(f'data/failed/{carpool.agency}/{carpool.id}.json', 'w', encoding='utf-8') as f:
|
|
f.write(carpool.json())
|
|
|
|
def distance_in_m(self, carpool):
|
|
if len(carpool.stops) < 2:
|
|
return 0
|
|
s1 = carpool.stops[0]
|
|
s2 = carpool.stops[-1]
|
|
return geodesic_distance_in_m((s1.lon, s1.lat),(s2.lon, s2.lat))
|
|
|
|
def recently_added_trips(self):
|
|
return list(self.recent_trips.values())
|
|
|
|
def recently_deleted_trips(self):
|
|
return list(self.deleted_trips.values())
|
|
|
|
def _load_carpool_if_exists(self, agency_id: str, carpool_id: str):
|
|
if carpool_exists(agency_id, carpool_id, 'data/enhanced'):
|
|
try:
|
|
return load_carpool(agency_id, carpool_id, 'data/enhanced')
|
|
except Exception as e:
|
|
# An error on restore could be caused by model changes,
|
|
# in such a case, it need's to be recreated
|
|
logger.warning("Could not restore enhanced trip %s:%s, reason: %s", agency_id, carpool_id, repr(e))
|
|
|
|
return None
|
|
|
|
def _load_as_trip(self, carpool: Carpool):
|
|
trip = self.transformer.transform_to_trip(carpool)
|
|
id = trip.trip_id
|
|
self.trips[id] = trip
|
|
if not is_older_than_days(carpool.lastUpdated, 1):
|
|
self.recent_trips[id] = trip
|
|
logger.debug("Added trip %s", id)
|
|
|
|
return trip
|
|
|
|
def delete_carpool(self, agency_id: str, carpool_id: str):
|
|
"""
|
|
Deletes carpool from the TripStore.
|
|
"""
|
|
agencyScopedCarpoolId = f"{agency_id}:{carpool_id}"
|
|
trip_to_be_deleted = self.trips.get(agencyScopedCarpoolId)
|
|
if trip_to_be_deleted:
|
|
self.deleted_trips[agencyScopedCarpoolId] = trip_to_be_deleted
|
|
del self.trips[agencyScopedCarpoolId]
|
|
|
|
if self.recent_trips.get(agencyScopedCarpoolId):
|
|
del self.recent_trips[agencyScopedCarpoolId]
|
|
|
|
if carpool_exists(agency_id, carpool_id):
|
|
remove_carpool_file(agency_id, carpool_id)
|
|
|
|
logger.debug("Deleted trip %s", id)
|
|
|
|
def unflag_unrecent_updates(self):
|
|
"""
|
|
Trips that were last updated before yesterday, are not recent
|
|
any longer. As no updates need to be sent for them any longer,
|
|
they will be removed from recent recent_trips and deleted_trips.
|
|
"""
|
|
for key in list(self.recent_trips):
|
|
t = self.recent_trips.get(key)
|
|
if t and t.lastUpdated.date() < yesterday():
|
|
del self.recent_trips[key]
|
|
|
|
for key in list(self.deleted_trips):
|
|
t = self.deleted_trips.get(key)
|
|
if t and t.lastUpdated.date() < yesterday():
|
|
del self.deleted_trips[key]
|
|
|
|
|
|
class TripTransformer:
|
|
REPLACE_CARPOOL_STOPS_BY_CLOSEST_TRANSIT_STOPS = True
|
|
REPLACEMENT_STOPS_SERACH_RADIUS_IN_M = 1000
|
|
SIMPLIFY_TOLERANCE = 0.0001
|
|
|
|
# router = RoutingService(config.graphhopper_base_url)
|
|
|
|
def __init__(self, stops_store):
|
|
self.stops_store = stops_store
|
|
|
|
def transform_to_trip(self, carpool : Carpool):
|
|
stop_times = self._convert_stop_times(carpool)
|
|
route_name = carpool.stops[0].name + " nach " + carpool.stops[-1].name
|
|
headsign= carpool.stops[-1].name
|
|
trip_id = self._trip_id(carpool)
|
|
path = carpool.path
|
|
bbox = box(
|
|
min([pt[0] for pt in path.coordinates]),
|
|
min([pt[1] for pt in path.coordinates]),
|
|
max([pt[0] for pt in path.coordinates]),
|
|
max([pt[1] for pt in path.coordinates]))
|
|
|
|
trip = Trip(trip_id, route_name, headsign, str(carpool.deeplink), carpool.departureDate, carpool.departureTime, carpool.path, carpool.agency, carpool.lastUpdated, stop_times, carpool.driver, carpool.additional_ridesharing_info, carpool.route_color, carpool.route_text_color, bbox)
|
|
|
|
return trip
|
|
|
|
def _trip_id(self, carpool):
|
|
return f"{carpool.agency}:{carpool.id}"
|
|
|
|
def _replace_stops_by_transit_stops(self, carpool, max_search_distance):
|
|
new_stops = []
|
|
for carpool_stop in carpool.stops:
|
|
new_stops.append(self.stops_store.find_closest_stop(carpool_stop, max_search_distance))
|
|
return new_stops
|
|
|
|
def enhance_carpool(self, carpool):
|
|
if self.REPLACE_CARPOOL_STOPS_BY_CLOSEST_TRANSIT_STOPS:
|
|
carpool.stops = self._replace_stops_by_transit_stops(carpool, self.REPLACEMENT_STOPS_SERACH_RADIUS_IN_M)
|
|
|
|
path = self._path_for_ride(carpool)
|
|
lineString_shapely_wgs84 = LineString(coordinates = path["points"]["coordinates"]).simplify(0.0001)
|
|
lineString_wgs84 = GeoJSONLineString(type="LineString", coordinates=list(lineString_shapely_wgs84.coords))
|
|
virtual_stops = self.stops_store.find_additional_stops_around(lineString_wgs84, carpool.stops)
|
|
if not virtual_stops.empty:
|
|
virtual_stops["time"] = self._estimate_times(path, virtual_stops['distance'])
|
|
logger.debug("Virtual stops found: {}".format(virtual_stops))
|
|
if len(virtual_stops) > MAX_STOPS_PER_TRIP:
|
|
# in case we found more than MAX_STOPS_PER_TRIP, we retain first and last
|
|
# half of MAX_STOPS_PER_TRIP
|
|
virtual_stops = virtual_stops.iloc[np.r_[0:int(MAX_STOPS_PER_TRIP/2), int(MAX_STOPS_PER_TRIP/2):]]
|
|
|
|
trip_id = f"{carpool.agency}:{carpool.id}"
|
|
stop_times = self._stops_and_stop_times(carpool.departureTime, trip_id, virtual_stops)
|
|
|
|
enhanced_carpool = carpool.copy()
|
|
enhanced_carpool.stops = stop_times
|
|
enhanced_carpool.path = lineString_wgs84
|
|
return enhanced_carpool
|
|
|
|
def _convert_stop_times(self, carpool):
|
|
|
|
stop_times = [GtfsStopTime(
|
|
self._trip_id(carpool),
|
|
stop.arrivalTime,
|
|
stop.departureTime,
|
|
stop.id,
|
|
seq_nr+1,
|
|
STOP_TIMES_STOP_TYPE_NONE if stop.pickup_dropoff == PickupDropoffType.only_dropoff else STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER,
|
|
STOP_TIMES_STOP_TYPE_NONE if stop.pickup_dropoff == PickupDropoffType.only_pickup else STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER,
|
|
STOP_TIMES_TIMEPOINT_APPROXIMATE)
|
|
for seq_nr, stop in enumerate(carpool.stops)]
|
|
return stop_times
|
|
|
|
def _path_for_ride(self, carpool):
|
|
points = self._stop_coords(carpool.stops)
|
|
return self.router.path_for_stops(points)
|
|
|
|
def _stop_coords(self, stops):
|
|
# Retrieve coordinates of all officially announced stops (start, intermediate, target)
|
|
return [Point(stop.lon, stop.lat) for stop in stops]
|
|
|
|
def _estimate_times(self, path, distances_from_start):
|
|
cumulated_distance = 0
|
|
cumulated_time = 0
|
|
stop_times = []
|
|
instructions = path["instructions"]
|
|
|
|
cnt = 0
|
|
instr_distance = instructions[cnt]["distance"]
|
|
instr_time = instructions[cnt]["time"]
|
|
|
|
for distance in distances_from_start:
|
|
while cnt < len(instructions) and cumulated_distance + instructions[cnt]["distance"] < distance:
|
|
cumulated_distance = cumulated_distance + instructions[cnt]["distance"]
|
|
cumulated_time = cumulated_time + instructions[cnt]["time"]
|
|
cnt = cnt + 1
|
|
|
|
if cnt < len(instructions):
|
|
if instructions[cnt]["distance"] ==0:
|
|
raise Exception("Origin and destinaction too close")
|
|
# raise RoutingException("Origin and destinaction too close")
|
|
percent_dist = (distance - cumulated_distance) / instructions[cnt]["distance"]
|
|
stop_time = cumulated_time + percent_dist * instructions[cnt]["time"]
|
|
stop_times.append(stop_time)
|
|
else:
|
|
logger.debug("distance {} exceeds total length {}, using max arrival time {}".format(distance, cumulated_distance, cumulated_time))
|
|
stop_times.append(cumulated_time)
|
|
return stop_times
|
|
|
|
def _stops_and_stop_times(self, start_time, trip_id, stops_frame):
|
|
# Assumptions:
|
|
# arrival_time = departure_time
|
|
# pickup_type, drop_off_type for origin: = coordinate/none
|
|
# pickup_type, drop_off_type for destination: = none/coordinate
|
|
# timepoint = approximate for origin and destination (not sure what consequences this might have for trip planners)
|
|
number_of_stops = len(stops_frame.index)
|
|
total_distance = stops_frame.iloc[number_of_stops-1]["distance"]
|
|
|
|
first_stop_time = GtfsTimeDelta(hours = start_time.hour, minutes = start_time.minute, seconds = start_time.second)
|
|
stop_times = []
|
|
seq_nr = 0
|
|
for i in range(0, number_of_stops):
|
|
current_stop = stops_frame.iloc[i]
|
|
|
|
if not current_stop.id:
|
|
continue
|
|
elif i == 0:
|
|
if (stops_frame.iloc[1].time-current_stop.time) < 1000:
|
|
# skip custom stop if there is an official stop very close by
|
|
logger.debug("Skipped stop %s", current_stop.id)
|
|
continue
|
|
else:
|
|
if (current_stop.time-stops_frame.iloc[i-1].time) < 5000 and not i==1 and not is_carpooling_stop(current_stop.id, current_stop.stop_name):
|
|
# skip latter stop if it's very close (<5 seconds drive) by the preceding
|
|
logger.debug("Skipped stop %s", current_stop.id)
|
|
continue
|
|
trip_time = timedelta(milliseconds=int(current_stop.time))
|
|
is_dropoff = self._is_dropoff_stop(current_stop, total_distance)
|
|
is_pickup = self._is_pickup_stop(current_stop, total_distance)
|
|
# TODO would be nice if possible to publish a minimum shared distance
|
|
pickup_type = STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER if is_pickup else STOP_TIMES_STOP_TYPE_NONE
|
|
dropoff_type = STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER if is_dropoff else STOP_TIMES_STOP_TYPE_NONE
|
|
|
|
if is_pickup and not is_dropoff:
|
|
pickup_dropoff = PickupDropoffType.only_pickup
|
|
elif not is_pickup and is_dropoff:
|
|
pickup_dropoff = PickupDropoffType.only_dropoff
|
|
else:
|
|
pickup_dropoff = PickupDropoffType.pickup_and_dropoff
|
|
|
|
next_stop_time = first_stop_time + trip_time
|
|
seq_nr += 1
|
|
stop_times.append(StopTime(**{
|
|
'arrivalTime': str(next_stop_time),
|
|
'departureTime': str(next_stop_time),
|
|
'id': current_stop.id,
|
|
'pickup_dropoff': pickup_dropoff,
|
|
'name': str(current_stop.stop_name),
|
|
'lat': current_stop.y,
|
|
'lon': current_stop.x
|
|
}))
|
|
|
|
return stop_times
|
|
|
|
def _is_dropoff_stop(self, current_stop, total_distance):
|
|
return current_stop["distance"] >= 0.5 * total_distance
|
|
|
|
def _is_pickup_stop(self, current_stop, total_distance):
|
|
return current_stop["distance"] < 0.5 * total_distance
|
|
|
|
def load_carpool(agency_id: str, carpool_id: str, folder: str ='data/enhanced') -> Carpool:
|
|
with open(f'{folder}/{agency_id}/{carpool_id}.json', 'r', encoding='utf-8') as f:
|
|
dict = json.load(f)
|
|
carpool = Carpool(**dict)
|
|
return carpool
|
|
|
|
def carpool_exists(agency_id: str, carpool_id: str, folder: str ='data/enhanced'):
|
|
return os.path.exists(f"{folder}/{agency_id}/{carpool_id}.json")
|
|
|
|
def remove_carpool_file(agency_id: str, carpool_id: str, folder: str ='data/enhanced'):
|
|
return os.remove(f"{folder}/{agency_id}/{carpool_id}.json")
|