Added services and models

This commit is contained in:
Csaba 2023-12-11 11:00:32 +01:00
parent 89f217aebd
commit f374b04f9c
17 changed files with 1436 additions and 0 deletions

View file

@ -0,0 +1,26 @@
from pydantic import ConfigDict, BaseModel, Field
class AgencyConf(BaseModel):
agency_id: str = Field(
description="ID of the agency that uses this token.",
min_length=1,
max_length=20,
pattern='^[a-zA-Z0-9]+$',
examples=["mfdz"])
api_key: str = Field(
description="The agency's API key for using the API",
min_length=20,
max_length=256,
pattern=r'^[a-zA-Z0-9]+$',
examples=["d8yLuY4DqMEUCLcfJASi"])
model_config = ConfigDict(json_schema_extra={
"title": "Agency Configuration",
"description": "Configuration for an agency.",
"example":
{
"agency_id": "mfdz",
"api_key": "d8yLuY4DqMEUCLcfJASi"
}
})

View file

@ -0,0 +1,284 @@
from datetime import time, date, datetime
from pydantic import ConfigDict, BaseModel, Field, HttpUrl, EmailStr
from typing import List, Union, Set, Optional, Tuple
from datetime import time
from pydantic import BaseModel, Field
from geojson_pydantic.geometries import LineString
from enum import Enum
NumType = Union[float, int]
MAX_STOPS_PER_TRIP = 100
class Weekday(str, Enum):
monday = "monday"
tuesday = "tuesday"
wednesday = "wednesday"
thursday = "thursday"
friday = "friday"
saturday = "saturday"
sunday = "sunday"
class PickupDropoffType(str, Enum):
pickup_and_dropoff = "pickup_and_dropoff"
only_pickup = "only_pickup"
only_dropoff = "only_dropoff"
class StopTime(BaseModel):
id: Optional[str] = Field(
None,
description="Optional Stop ID. If given, it should conform to the "
"IFOPT specification. For official transit stops, "
"it should be their official IFOPT. In Germany, this is "
"the DHID which is available via the 'zentrales "
"Haltestellenverzeichnis (zHV)', published by DELFI e.V. "
"Note, that currently carpooling location.",
pattern=r"^([a-zA-Z]{2,6}):\d+:\d+(:\d*(:\w+)?)?$|^osm:[nwr]\d+$",
examples=["de:12073:900340137::2"])
name: str = Field(
description="Name of the location. Use a name that people will "
"understand in the local and tourist vernacular.",
min_length=1,
max_length=256,
examples=["Angermünde, Breitscheidstr."])
departureTime: Optional[str] = Field(
None,
description="Departure time from a specific stop for a specific "
"carpool trip. For times occurring after midnight on the "
"service day, the time is given as a value greater than "
"24:00:00 in HH:MM:SS local time for the day on which the "
"trip schedule begins. If there are not separate times for "
"arrival and departure at a stop, the same value for arrivalTime "
"and departureTime. Note, that arrivalTime/departureTime of "
"the stops are not mandatory, and might then be estimated by "
"this service.",
pattern=r"^[0-9][0-9]:[0-5][0-9](:[0-5][0-9])?$",
examples=["17:00"]
)
arrivalTime: Optional[str] = Field(
None,
description="Arrival time at a specific stop for a specific trip on a "
"carpool route. If there are not separate times for arrival "
"and departure at a stop, enter the same value for arrivalTime "
"and departureTime. For times occurring after midnight on the "
"service day, the time as a value greater than 24:00:00 in "
"HH:MM:SS local time for the day on which the trip schedule "
"begins. Note, that arrivalTime/departureTime of the stops "
"are not mandatory, and might then be estimated by this "
"service.",
pattern=r"^[0-9][0-9]:[0-5][0-9](:[0-5][0-9])?$",
examples=["18:00"])
lat: float = Field(
description="Latitude of the location. Should describe the location "
"where a passenger may mount/dismount the vehicle.",
ge=-90,
lt=90,
examples=["53.0137311391"])
lon: float = Field(
description="Longitude of the location. Should describe the location "
"where a passenger may mount/dismount the vehicle.",
ge=-180,
lt=180,
examples=["13.9934706687"])
pickup_dropoff: Optional[PickupDropoffType] = Field(
None, description="If passengers may be picked up, dropped off or both at this stop. "
"If not specified, this service may assign this according to some custom rules. "
"E.g. Amarillo may allow pickup only for the first third of the distance travelled, "
"and dropoff only for the last third." ,
examples=["only_pickup"]
)
model_config = ConfigDict(json_schema_extra={
"example": "{'id': 'de:12073:900340137::2', 'name': "
"'Angermünde, Breitscheidstr.', 'lat': 53.0137311391, "
"'lon': 13.9934706687}"
})
class Region(BaseModel):
id: str = Field(
description="ID of the region.",
min_length=1,
max_length=20,
pattern='^[a-zA-Z0-9]+$',
examples=["bb"])
bbox: Tuple[NumType, NumType, NumType, NumType] = Field(
description="Bounding box of the region. Format is [minLon, minLat, maxLon, maxLat]",
examples=[[10.5,49.2,11.3,51.3]])
class Agency(BaseModel):
id: str = Field(
description="ID of the agency.",
min_length=1,
max_length=20,
pattern='^[a-zA-Z0-9]+$',
examples=["mfdz"])
name: str = Field(
description="Name",
min_length=1,
max_length=48,
pattern=r'^[\w -\.\|]+$',
examples=["MITFAHR|DE|ZENTRALE"])
url: HttpUrl = Field(
description="URL of the carpool agency.",
examples=["https://mfdz.de/"])
timezone: str = Field(
description="Timezone where the carpool agency is located.",
min_length=1,
max_length=48,
pattern=r'^[\w/]+$',
examples=["Europe/Berlin"])
lang: str = Field(
description="Primary language used by this carpool agency.",
min_length=1,
max_length=2,
pattern=r'^[a-zA-Z_]+$',
examples=["de"])
email: EmailStr = Field(
description="""Email address actively monitored by the agencys
customer service department. This email address should be a direct
contact point where carpool riders can reach a customer service
representative at the agency.""",
examples=["info@mfdz.de"])
terms_url: Optional[HttpUrl] = Field(
None, description="""A fully qualified URL pointing to the terms of service
(also often called "terms of use" or "terms and conditions")
for the service.""",
examples=["https://mfdz.de/nutzungsbedingungen"])
privacy_url: Optional[HttpUrl] = Field(
None, description="""A fully qualified URL pointing to the privacy policy for the service.""",
examples=["https://mfdz.de/datenschutz"])
model_config = ConfigDict(json_schema_extra={
"title": "Agency",
"description": "Carpool agency.",
"example":
#"""
{
"id": "mfdz",
"name": "MITFAHR|DE|ZENTRALE",
"url": "http://mfdz.de",
"timezone": "Europe/Berlin",
"lang": "de",
"email": "info@mfdz.de",
"terms_url": "https://mfdz.de/nutzungsbedingungen",
"privacy_url": "https://mfdz.de/datenschutz",
}
#"""
})
class Carpool(BaseModel):
id: str = Field(
description="ID of the carpool. Should be supplied and managed by the "
"carpooling platform which originally published this "
"offer.",
min_length=1,
max_length=256,
pattern='^[a-zA-Z0-9_-]+$',
examples=["103361"])
agency: str = Field(
description="Short one string name of the agency, used as a namespace "
"for ids.",
min_length=1,
max_length=20,
pattern='^[a-zA-Z0-9]+$',
examples=["mfdz"])
deeplink: HttpUrl = Field(
description="Link to an information page providing detail information "
"for this offer, and, especially, an option to book the "
"trip/contact the driver.",
examples=["https://mfdz.de/trip/103361"])
stops: List[StopTime] = Field(
...,
min_length=2,
max_length=MAX_STOPS_PER_TRIP,
description="Stops which this carpool passes by and offers to pick "
"up/drop off passengers. This list must at minimum "
"include two stops, the origin and destination of this "
"carpool trip. Note that for privacy reasons, the stops "
"usually should be official locations, like meeting "
"points, carpool parkings, ridesharing benches or "
"similar.",
examples=["""[
{
"id": "03",
"name": "drei",
"lat": 45,
"lon": 9
},
{
"id": "03b",
"name": "drei b",
"lat": 45,
"lon": 9
}
]"""])
# TODO can be removed, as first stop has departureTime as well
departureTime: time = Field(
description="Time when the carpool leaves at the first stop. Note, "
"that this API currently does not support flexible time "
"windows for departure, though drivers might be flexible."
"For recurring trips, the weekdays this trip will run. ",
examples=["17:00"])
# TODO think about using googlecal Format
departureDate: Union[date, Set[Weekday]] = Field(
description="Date when the trip will start, in case it is a one-time "
"trip. For recurring trips, specify weekdays. "
"Note, that when for different weekdays different "
"departureTimes apply, multiple carpool offers should be "
"published.",
examples=['A single date 2022-04-04 or a list of weekdays ["saturday", '
'"sunday"]'])
path: Optional[LineString] = Field(
None, description="Optional route geometry as json LineString.")
lastUpdated: Optional[datetime] = Field(
None,
description="LastUpdated should reflect the last time, the user "
"providing this offer, made an update or confirmed, "
"the offer is still valid. Note that this service might "
"purge outdated offers (e.g. older than 180 days). If not "
"passed, the service may assume 'now'",
examples=["2022-02-13T20:20:39+00:00"])
model_config = ConfigDict(json_schema_extra={
"title": "Carpool",
# description ...
"example":
"""
{
"id": "1234",
"agency": "mfdz",
"deeplink": "http://mfdz.de",
"stops": [
{
"id": "de:12073:900340137::2", "name": "ABC",
"lat": 45, "lon": 9
},
{
"id": "de:12073:900340137::3", "name": "XYZ",
"lat": 45, "lon": 9
}
],
"departureTime": "12:34",
"departureDate": "2022-03-30",
"lastUpdated": "2022-03-30T12:34:00+00:00"
}
"""
})

View file

View file

@ -0,0 +1,29 @@
from collections import namedtuple
from datetime import timedelta
GtfsFeedInfo = namedtuple('GtfsFeedInfo', 'feed_id feed_publisher_name feed_publisher_url feed_lang feed_version')
GtfsAgency = namedtuple('GtfsAgency', 'agency_id agency_name agency_url agency_timezone agency_lang agency_email')
GtfsRoute = namedtuple('GtfsRoute', 'agency_id route_id route_long_name route_type route_url route_short_name')
GtfsStop = namedtuple('GtfsStop', 'stop_id stop_lat stop_lon stop_name')
GtfsStopTime = namedtuple('GtfsStopTime', 'trip_id departure_time arrival_time stop_id stop_sequence pickup_type drop_off_type timepoint')
GtfsTrip = namedtuple('GtfsTrip', 'route_id trip_id service_id shape_id trip_headsign bikes_allowed')
GtfsCalendar = namedtuple('GtfsCalendar', 'service_id start_date end_date monday tuesday wednesday thursday friday saturday sunday')
GtfsCalendarDate = namedtuple('GtfsCalendarDate', 'service_id date exception_type')
GtfsShape = namedtuple('GtfsShape','shape_id shape_pt_lon shape_pt_lat shape_pt_sequence')
# TODO Move to utils
class GtfsTimeDelta(timedelta):
def __str__(self):
seconds = self.total_seconds()
hours = seconds // 3600
minutes = (seconds % 3600) // 60
seconds = seconds % 60
str = '{:02d}:{:02d}:{:02d}'.format(int(hours), int(minutes), int(seconds))
return (str)
def __add__(self, other):
if isinstance(other, timedelta):
return self.__class__(self.days + other.days,
self.seconds + other.seconds,
self.microseconds + other.microseconds)
return NotImplemented

View file

@ -0,0 +1,25 @@
import json
from glob import glob
from typing import Dict
from app.models.Carpool import Agency
# TODO FG HB this service should also listen to pyinotify
# because the (updated) agencies are needed in the enhancer
# as well.
class AgencyService:
def __init__(self):
self.agencies: Dict[str, Agency] = {}
for agency_file_name in glob('conf/agency/*.json'):
with open(agency_file_name) as agency_file:
dict = json.load(agency_file)
agency = Agency(**dict)
agency_id = agency.id
self.agencies[agency_id] = agency
def get_agency(self, agency_id: str) -> Agency:
agency = self.agencies.get(agency_id)
return agency

View file

@ -0,0 +1,111 @@
import json
import os
from glob import glob
from typing import Dict, List
import logging
from fastapi import HTTPException, status
from app.models.AgencyConf import AgencyConf
from app.services.config import config
logger = logging.getLogger(__name__)
agency_conf_directory = 'data/agencyconf'
class AgencyConfService:
def __init__(self):
# Both Dicts to be kept in sync always. The second api_key_to_agency_id is like a reverse
# cache for the first for fast lookup of valid api keys, which happens on *every* request.
self.agency_id_to_agency_conf: Dict[str, AgencyConf] = {}
self.api_key_to_agency_id: Dict[str, str] = {}
for agency_conf_file_name in glob(f'{agency_conf_directory}/*.json'):
with open(agency_conf_file_name) as agency_conf_file:
dictionary = json.load(agency_conf_file)
agency_conf = AgencyConf(**dictionary)
agency_id = agency_conf.agency_id
api_key = agency_conf.api_key
self.agency_id_to_agency_conf[agency_id] = agency_conf
self.api_key_to_agency_id[api_key] = agency_conf.agency_id
def get_agency_conf(self, agency_id: str) -> AgencyConf:
agency_conf = self.agency_id_to_agency_conf.get(agency_id)
return agency_conf
def check_api_key(self, api_key: str) -> str:
"""Check if the API key is valid
The agencies' api keys are checked first, and the admin's key.
The agency_id or "admin" is returned for further checks in the caller if the
request is permitted, like {agency_id} == agency_id.
"""
agency_id = self.api_key_to_agency_id.get(api_key)
is_agency = agency_id is not None
if is_agency:
return agency_id
is_admin = api_key == config.admin_token
if is_admin:
return "admin"
message = "X-API-Key header invalid"
logger.error(message)
raise HTTPException(status_code=400, detail=message)
def add(self, agency_conf: AgencyConf):
agency_id = agency_conf.agency_id
api_key = agency_conf.api_key
agency_id_exists_already = self.agency_id_to_agency_conf.get(agency_id) is not None
if agency_id_exists_already:
message = f"Agency {agency_id} exists already. To update, delete it first."
logger.error(message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
agency_using_this_api_key_already = self.api_key_to_agency_id.get(api_key)
a_different_agency_is_using_this_api_key_already = \
agency_using_this_api_key_already is not None and \
agency_using_this_api_key_already != agency_id
if a_different_agency_is_using_this_api_key_already:
message = f"Duplicate API Key for {agency_id} not permitted. Use a different key."
logger.error(message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
with open(f'{agency_conf_directory}/{agency_id}.json', 'w', encoding='utf-8') as f:
f.write(agency_conf.json())
self.agency_id_to_agency_conf[agency_id] = agency_conf
self.api_key_to_agency_id[api_key] = agency_id
logger.info(f"Added configuration for agency {agency_id}.")
def get_agency_ids(self) -> List[str]:
return list(self.agency_id_to_agency_conf.keys())
def delete(self, agency_id):
agency_conf = self.agency_id_to_agency_conf.get(agency_id)
api_key = agency_conf.api_key
del self.api_key_to_agency_id[api_key]
del self.agency_id_to_agency_conf[agency_id]
os.remove(f'{agency_conf_directory}/{agency_id}.json')
logger.info(f"Deleted configuration for agency {agency_id}.")

View file

@ -0,0 +1,61 @@
import json
import logging
from datetime import datetime
from typing import Dict
from app.models.Carpool import Carpool
from app.services.trips import TripStore
from app.utils.utils import yesterday, is_older_than_days
logger = logging.getLogger(__name__)
class CarpoolService():
MAX_OFFER_AGE_IN_DAYS = 180
def __init__(self, trip_store):
self.trip_store = trip_store
self.carpools: Dict[str, Carpool] = {}
def is_outdated(self, carpool):
"""
A carpool offer is outdated, if
* it's completly in the past (if it's a single date offer).
As we know the start time but not latest arrival, we deem
offers starting the day before yesterday as outdated
* it's last update occured before MAX_OFFER_AGE_IN_DAYS
"""
runs_once = not isinstance(carpool.departureDate, set)
return (is_older_than_days(carpool.lastUpdated.date(), self.MAX_OFFER_AGE_IN_DAYS) or
(runs_once and carpool.departureDate < yesterday()))
def purge_outdated_offers(self):
"""
Iterates over all carpools and deletes those which are outdated
"""
for key in list(self.carpools.keys()):
cp = self.carpools.get(key)
if cp and self.is_outdated(cp):
logger.info("Purge outdated offer %s", key)
self.delete(cp.agency, cp.id)
def get(self, agency_id: str, carpool_id: str):
return self.carpools.get(f"{agency_id}:{carpool_id}")
def get_all_ids(self):
return list(self.carpools)
def put(self, agency_id: str, carpool_id: str, carpool):
self.carpools[f"{agency_id}:{carpool_id}"] = carpool
# Outdated trips (which might have been in the store)
# will be deleted
if self.is_outdated(carpool):
logger.info('Deleting outdated carpool %s:%s', agency_id, carpool_id)
self.delete(agency_id, carpool_id)
else:
self.trip_store.put_carpool(carpool)
def delete(self, agency_id: str, carpool_id: str):
id = f"{agency_id}:{carpool_id}"
if id in self.carpools:
del self.carpools[id]
self.trip_store.delete_carpool(agency_id, carpool_id)

View file

@ -0,0 +1,11 @@
from typing import List
from pydantic_settings import BaseSettings
class Config(BaseSettings):
admin_token: str
ride2go_query_data: str
env: str = 'DEV'
config = Config(_env_file='config', _env_file_encoding='utf-8')

View file

@ -0,0 +1,137 @@
import app.services.gtfsrt.gtfs_realtime_pb2 as gtfs_realtime_pb2
import app.services.gtfsrt.realtime_extension_pb2 as mfdzrte
from app.services.gtfs_constants import *
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import ParseDict
from datetime import datetime, timedelta
import json
import re
import time
class GtfsRtProducer():
def __init__(self, trip_store):
self.trip_store = trip_store
def generate_feed(self, time, format='protobuf', bbox=None):
# See https://developers.google.com/transit/gtfs-realtime/reference
# https://github.com/mfdz/carpool-gtfs-rt/blob/master/src/main/java/de/mfdz/resource/CarpoolResource.java
gtfsrt_dict = {
'header': {
'gtfsRealtimeVersion': '1.0',
'timestamp': int(time)
},
'entity': self._get_trip_updates(bbox)
}
feed = gtfs_realtime_pb2.FeedMessage()
ParseDict(gtfsrt_dict, feed)
if "message" == format.lower():
return feed
elif "json" == format.lower():
return MessageToDict(feed)
else:
return feed.SerializeToString()
def export_feed(self, timestamp, file_path, bbox=None):
"""
Exports gtfs-rt feed as .json and .pbf file to file_path
"""
feed = self.generate_feed(timestamp, "message", bbox)
with open(f"{file_path}.pbf", "wb") as f:
f.write(feed.SerializeToString())
with open(f"{file_path}.json", "w") as f:
json.dump(MessageToDict(feed), f)
def _get_trip_updates(self, bbox = None):
trips = []
trips.extend(self._get_added(bbox))
trips.extend(self._get_deleted(bbox))
trip_updates = []
for num, trip in enumerate(trips):
trip_updates.append( {
'id': f'carpool-update-{num}',
'tripUpdate': trip
}
)
return trip_updates
def _get_deleted(self, bbox = None):
return self._get_updates(
self.trip_store.recently_deleted_trips(),
self._as_delete_updates,
bbox)
def _get_added(self, bbox = None):
return self._get_updates(
self.trip_store.recently_added_trips(),
self._as_added_updates,
bbox)
def _get_updates(self, trips, update_func, bbox = None):
updates = []
today = datetime.today()
for t in trips:
if bbox == None or t.intersects(bbox):
updates.extend(update_func(t, today))
return updates
def _as_delete_updates(self, trip, fromdate):
return [{
'trip': {
'tripId': trip.trip_id,
'startTime': trip.start_time_str(),
'startDate': trip_date,
'scheduleRelationship': 'CANCELED',
'routeId': trip.trip_id
}
} for trip_date in trip.next_trip_dates(fromdate)]
def _to_seconds(self, fromdate, stop_time):
startdate = datetime.strptime(fromdate, '%Y%m%d')
m = re.search(r'(\d+):(\d+):(\d+)', stop_time)
delta = timedelta(
hours=int(m.group(1)),
minutes=int(m.group(2)),
seconds=int(m.group(3)))
return time.mktime((startdate + delta).timetuple())
def _to_stop_times(self, trip, fromdate):
return [{
'stopSequence': stoptime.stop_sequence,
'arrival': {
'time': self._to_seconds(fromdate, stoptime.arrival_time),
'uncertainty': MFDZ_DEFAULT_UNCERTAINITY
},
'departure': {
'time': self._to_seconds(fromdate, stoptime.departure_time),
'uncertainty': MFDZ_DEFAULT_UNCERTAINITY
},
'stopId': stoptime.stop_id,
'scheduleRelationship': 'SCHEDULED',
'stop_time_properties': {
'[transit_realtime.stop_time_properties]': {
'dropoffType': 'COORDINATE_WITH_DRIVER' if stoptime.drop_off_type == STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER else 'NONE',
'pickupType': 'COORDINATE_WITH_DRIVER' if stoptime.pickup_type == STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER else 'NONE'
}
}
}
for stoptime in trip.stop_times]
def _as_added_updates(self, trip, fromdate):
return [{
'trip': {
'tripId': trip.trip_id,
'startTime': trip.start_time_str(),
'startDate': trip_date,
'scheduleRelationship': 'ADDED',
'routeId': trip.trip_id,
'[transit_realtime.trip_descriptor]': {
'routeUrl' : trip.url,
'agencyId' : trip.agency,
'route_long_name' : trip.route_long_name(),
'route_type': RIDESHARING_ROUTE_TYPE
}
},
'stopTimeUpdate': self._to_stop_times(trip, trip_date)
} for trip_date in trip.next_trip_dates(fromdate)]

View file

@ -0,0 +1,14 @@
# Constants
NO_BIKES_ALLOWED = 2
RIDESHARING_ROUTE_TYPE = 1551
CALENDAR_DATES_EXCEPTION_TYPE_ADDED = 1
CALENDAR_DATES_EXCEPTION_TYPE_REMOVED = 2
STOP_TIMES_STOP_TYPE_REGULARLY = 0
STOP_TIMES_STOP_TYPE_NONE = 1
STOP_TIMES_STOP_TYPE_PHONE_AGENCY = 2
STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER = 3
STOP_TIMES_TIMEPOINT_APPROXIMATE = 0
STOP_TIMES_TIMEPOINT_EXACT = 1
MFDZ_DEFAULT_UNCERTAINITY = 600

View file

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: realtime_extension.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
import app.services.gtfsrt.gtfs_realtime_pb2 as gtfs__realtime__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x18realtime_extension.proto\x12\x10transit_realtime\x1a\x13gtfs-realtime.proto\"p\n\x1bMfdzTripDescriptorExtension\x12\x11\n\troute_url\x18\x01 \x01(\t\x12\x11\n\tagency_id\x18\x02 \x01(\t\x12\x17\n\x0froute_long_name\x18\x03 \x01(\t\x12\x12\n\nroute_type\x18\x04 \x01(\r\"\xb0\x02\n\x1fMfdzStopTimePropertiesExtension\x12X\n\x0bpickup_type\x18\x01 \x01(\x0e\x32\x43.transit_realtime.MfdzStopTimePropertiesExtension.DropOffPickupType\x12Y\n\x0c\x64ropoff_type\x18\x02 \x01(\x0e\x32\x43.transit_realtime.MfdzStopTimePropertiesExtension.DropOffPickupType\"X\n\x11\x44ropOffPickupType\x12\x0b\n\x07REGULAR\x10\x00\x12\x08\n\x04NONE\x10\x01\x12\x10\n\x0cPHONE_AGENCY\x10\x02\x12\x1a\n\x16\x43OORDINATE_WITH_DRIVER\x10\x03:i\n\x0ftrip_descriptor\x12 .transit_realtime.TripDescriptor\x18\xf5\x07 \x01(\x0b\x32-.transit_realtime.MfdzTripDescriptorExtension:\x90\x01\n\x14stop_time_properties\x12>.transit_realtime.TripUpdate.StopTimeUpdate.StopTimeProperties\x18\xf5\x07 \x01(\x0b\x32\x31.transit_realtime.MfdzStopTimePropertiesExtensionB\t\n\x07\x64\x65.mfdz')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'realtime_extension_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
gtfs__realtime__pb2.TripDescriptor.RegisterExtension(trip_descriptor)
gtfs__realtime__pb2.TripUpdate.StopTimeUpdate.StopTimeProperties.RegisterExtension(stop_time_properties)
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\007de.mfdz'
_MFDZTRIPDESCRIPTOREXTENSION._serialized_start=67
_MFDZTRIPDESCRIPTOREXTENSION._serialized_end=179
_MFDZSTOPTIMEPROPERTIESEXTENSION._serialized_start=182
_MFDZSTOPTIMEPROPERTIESEXTENSION._serialized_end=486
_MFDZSTOPTIMEPROPERTIESEXTENSION_DROPOFFPICKUPTYPE._serialized_start=398
_MFDZSTOPTIMEPROPERTIESEXTENSION_DROPOFFPICKUPTYPE._serialized_end=486
# @@protoc_insertion_point(module_scope)

View file

@ -0,0 +1,22 @@
import json
from glob import glob
from typing import Dict
from app.models.Carpool import Region
class RegionService:
def __init__(self):
self.regions: Dict[str, Region] = {}
for region_file_name in glob('conf/region/*.json'):
with open(region_file_name) as region_file:
dict = json.load(region_file)
region = Region(**dict)
region_id = region.id
self.regions[region_id] = region
def get_region(self, region_id: str) -> Region:
region = self.regions.get(region_id)
return region

View file

@ -0,0 +1,47 @@
import requests
import logging
logger = logging.getLogger(__name__)
class RoutingException(Exception):
def __init__(self, message):
# Call Exception.__init__(message)
# to use the same Message header as the parent class
super().__init__(message)
class RoutingService():
def __init__(self, gh_url = 'https://api.mfdz.de/gh'):
self.gh_service_url = gh_url
def path_for_stops(self, points):
# Retrieve graphhopper route traversing given points
directions = self._get_directions(points)
if directions and len(directions.get("paths"))>0:
return directions.get("paths")[0]
else:
return {}
def _get_directions(self, points):
req_url = self._create_url(points, True, True)
logger.debug("Get directions via: {}".format(req_url))
response = requests.get(req_url)
status = response.status_code
if status == 200:
# Found route between points
return response.json()
else:
try:
message = response.json().get('message')
except:
raise RoutingException("Get directions failed with status code {}".format(status))
else:
raise RoutingException(message)
def _create_url(self, points, calc_points = False, instructions = False):
""" Creates GH request URL """
locations = ""
for point in points:
locations += "point={0}%2C{1}&".format(point.y, point.x)
return "{0}/route?{1}instructions={2}&calc_points={3}&points_encoded=false".format(
self.gh_service_url, locations, instructions, calc_points)

View file

@ -0,0 +1,182 @@
import csv
import geopandas as gpd
import pandas as pd
from app.models.Carpool import StopTime
from contextlib import closing
from shapely.geometry import Point, LineString
from shapely.ops import transform
from pyproj import Proj, Transformer
import re
import requests
from io import TextIOWrapper
import codecs
import logging
logger = logging.getLogger(__name__)
class StopsStore():
def __init__(self, stop_sources = [], internal_projection = "EPSG:32632"):
self.internal_projection = internal_projection
self.projection = Transformer.from_crs("EPSG:4326", internal_projection, always_xy=True).transform
self.stopsDataFrames = []
self.stop_sources = stop_sources
def load_stop_sources(self):
"""Imports stops from stop_sources and registers them with
the distance they are still associated with a trip.
E.g. bus stops should be registered with a distance of e.g. 30m,
while larger carpool parkings might be registered with e.g. 500m.
Subsequent calls of load_stop_sources will reload all stop_sources
but replace the current stops only if all stops could be loaded successfully.
"""
stopsDataFrames = []
error_occured = False
for stops_source in self.stop_sources:
try:
stopsDataFrame =self._load_stops(stops_source["url"])
stopsDataFrames.append({'distanceInMeter': stops_source["vicinity"],
'stops': stopsDataFrame})
except Exception as err:
error_occured = True
logger.error("Failed to load stops from %s to StopsStore.", stops_source["url"], exc_info=True)
if not error_occured:
self.stopsDataFrames = stopsDataFrames
def find_additional_stops_around(self, line, stops = None):
"""Returns a GeoDataFrame with all stops in vicinity of the
given line, sorted by distance from origin of the line.
Note: for internal projection/distance calculations, the
lat/lon geometries of line and stops are converted to
"""
stops_frames = []
if stops:
stops_frames.append(self._convert_to_dataframe(stops))
transformedLine = transform(self.projection, LineString(line.coordinates))
for stops_to_match in self.stopsDataFrames:
stops_frames.append(self._find_stops_around_transformed(stops_to_match['stops'], transformedLine, stops_to_match['distanceInMeter']))
stops = gpd.GeoDataFrame( pd.concat(stops_frames, ignore_index=True, sort=True))
if not stops.empty:
self._sort_by_distance(stops, transformedLine)
return stops
def find_closest_stop(self, carpool_stop, max_search_distance):
transformedCoord = Point(self.projection(carpool_stop.lon, carpool_stop.lat))
best_dist = max_search_distance + 1
best_stop = None
for stops_with_dist in self.stopsDataFrames:
stops = stops_with_dist['stops']
s, d = stops.sindex.nearest(transformedCoord, return_all= True, return_distance=True, max_distance=max_search_distance)
if len(d) > 0 and d[0] < best_dist:
best_dist = d[0]
row = s[1][0]
best_stop = StopTime(name=stops.at[row, 'stop_name'], lat=stops.at[row, 'y'], lon=stops.at[row, 'x'])
return best_stop if best_stop else carpool_stop
def _normalize_stop_name(self, stop_name):
default_name = 'P+R-Parkplatz'
if stop_name in ('', 'Park&Ride'):
return default_name
normalized_stop_name = re.sub(r"P(ark)?\s?[\+&]\s?R(ail|ide)?",'P+R', stop_name)
return normalized_stop_name
def _load_stops(self, source : str):
"""Loads stops from given source and registers them with
the distance they are still associated with a trip.
E.g. bus stops should be registered with a distance of e.g. 30m,
while larger carpool parkings might be registered with e.g. 500m
"""
logger.info("Load stops from %s", source)
if source.startswith('http'):
if source.endswith('json'):
with requests.get(source) as json_source:
stopsDataFrame = self._load_stops_geojson(json_source.json())
else:
with requests.get(source) as csv_source:
stopsDataFrame = self._load_stops_csv(codecs.iterdecode(csv_source.iter_lines(), 'utf-8'))
else:
with open(source, encoding='utf-8') as csv_source:
stopsDataFrame = self._load_stops_csv(csv_source)
return stopsDataFrame
def _load_stops_csv(self, csv_source):
id = []
lat = []
lon = []
stop_name = []
reader = csv.DictReader(csv_source, delimiter=';')
columns = ['stop_id', 'stop_lat', 'stop_lon', 'stop_name']
lists = [id, lat, lon, stop_name]
for row in reader:
for col, lst in zip(columns, lists):
if col == "stop_lat" or col == "stop_lon":
lst.append(float(row[col].replace(",",".")))
elif col == "stop_name":
row_stop_name = self._normalize_stop_name(row[col])
lst.append(row_stop_name)
else:
lst.append(row[col])
return self._as_dataframe(id, lat, lon, stop_name)
def _load_stops_geojson(self, geojson_source):
id = []
lat = []
lon = []
stop_name = []
columns = ['stop_id', 'stop_lat', 'stop_lon', 'stop_name']
lists = [id, lat, lon, stop_name]
for row in geojson_source['features']:
coord = row['geometry']['coordinates']
if not coord or not row['properties'].get('name'):
logger.error('Stop feature {} has null coord or name'.format(row['id']))
continue
for col, lst in zip(columns, lists):
if col == "stop_lat":
lst.append(coord[1])
elif col == "stop_lon":
lst.append(coord[0])
elif col == "stop_name":
row_stop_name = self._normalize_stop_name(row['properties']['name'])
lst.append(row_stop_name)
elif col == "stop_id":
lst.append(row['id'])
return self._as_dataframe(id, lat, lon, stop_name)
def _as_dataframe(self, id, lat, lon, stop_name):
df = gpd.GeoDataFrame(data={'x':lon, 'y':lat, 'stop_name':stop_name, 'id':id})
stopsGeoDataFrame = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.x, df.y, crs='EPSG:4326'))
stopsGeoDataFrame.to_crs(crs=self.internal_projection, inplace=True)
return stopsGeoDataFrame
def _find_stops_around_transformed(self, stopsDataFrame, transformedLine, distance):
bufferedLine = transformedLine.buffer(distance)
sindex = stopsDataFrame.sindex
possible_matches_index = list(sindex.intersection(bufferedLine.bounds))
possible_matches = stopsDataFrame.iloc[possible_matches_index]
exact_matches = possible_matches[possible_matches.intersects(bufferedLine)]
return exact_matches
def _convert_to_dataframe(self, stops):
return gpd.GeoDataFrame([[stop.name, stop.lon, stop.lat,
stop.id, Point(self.projection(stop.lon, stop.lat))] for stop in stops], columns = ['stop_name','x','y','id','geometry'], crs=self.internal_projection)
def _sort_by_distance(self, stops, transformedLine):
stops['distance']=stops.apply(lambda row: transformedLine.project(row['geometry']), axis=1)
stops.sort_values('distance', inplace=True)
def is_carpooling_stop(stop_id, name):
stop_name = name.lower()
# mfdz: or bbnavi: prefixed stops are custom stops which are explicitly meant to be carpooling stops
return stop_id.startswith('mfdz:') or stop_id.startswith('bbnavi:') or 'mitfahr' in stop_name or 'p&m' in stop_name

View file

@ -0,0 +1,374 @@
from app.models.gtfs import GtfsTimeDelta, GtfsStopTime
from app.models.Carpool import MAX_STOPS_PER_TRIP, Carpool, Weekday, StopTime, PickupDropoffType
from app.services.gtfs_constants import *
from app.services.routing import RoutingService, RoutingException
from app.services.stops import is_carpooling_stop
from app.utils.utils import assert_folder_exists, is_older_than_days, yesterday, geodesic_distance_in_m
from shapely.geometry import Point, LineString, box
from geojson_pydantic.geometries import LineString as GeoJSONLineString
from datetime import datetime, timedelta
import numpy as np
import os
import json
import logging
logger = logging.getLogger(__name__)
class Trip:
def __init__(self, trip_id, route_name, headsign, url, calendar, departureTime, path, agency, lastUpdated, stop_times, bbox):
if isinstance(calendar, set):
self.runs_regularly = True
self.weekdays = [
1 if Weekday.monday in calendar else 0,
1 if Weekday.tuesday in calendar else 0,
1 if Weekday.wednesday in calendar else 0,
1 if Weekday.thursday in calendar else 0,
1 if Weekday.friday in calendar else 0,
1 if Weekday.saturday in calendar else 0,
1 if Weekday.sunday in calendar else 0,
]
start_in_day = self._total_seconds(departureTime)
else:
self.start = datetime.combine(calendar, departureTime)
self.runs_regularly = False
self.weekdays = [0,0,0,0,0,0,0]
self.start_time = departureTime
self.path = path
self.trip_id = trip_id
self.url = url
self.agency = agency
self.stops = []
self.lastUpdated = lastUpdated
self.stop_times = stop_times
self.bbox = bbox
self.route_name = route_name
self.trip_headsign = headsign
def path_as_line_string(self):
return path
def _total_seconds(self, instant):
return instant.hour * 3600 + instant.minute * 60 + instant.second
def start_time_str(self):
return self.start_time.strftime("%H:%M:%S")
def next_trip_dates(self, start_date, day_count=14):
if self.runs_regularly:
for single_date in (start_date + timedelta(n) for n in range(day_count)):
if self.weekdays[single_date.weekday()]==1:
yield single_date.strftime("%Y%m%d")
else:
yield self.start.strftime("%Y%m%d")
def route_long_name(self):
return self.route_name
def intersects(self, bbox):
return self.bbox.intersects(box(*bbox))
class TripStore():
"""
TripStore maintains the currently valid trips. A trip is a
carpool offer enhanced with all stops this
Attributes:
trips Dict of currently valid trips.
deleted_trips Dict of recently deleted trips.
"""
def __init__(self, stops_store):
self.transformer = TripTransformer(stops_store)
self.stops_store = stops_store
self.trips = {}
self.deleted_trips = {}
self.recent_trips = {}
def put_carpool(self, carpool: Carpool):
"""
Adds carpool to the TripStore.
"""
id = "{}:{}".format(carpool.agency, carpool.id)
filename = f'data/enhanced/{carpool.agency}/{carpool.id}.json'
try:
existing_carpool = self._load_carpool_if_exists(carpool.agency, carpool.id)
if existing_carpool and existing_carpool.lastUpdated == carpool.lastUpdated:
enhanced_carpool = existing_carpool
else:
if len(carpool.stops) < 2 or self.distance_in_m(carpool) < 1000:
logger.warning("Failed to add carpool %s:%s to TripStore, distance too low", carpool.agency, carpool.id)
self.handle_failed_carpool_enhancement(carpool)
return
enhanced_carpool = self.transformer.enhance_carpool(carpool)
# TODO should only store enhanced_carpool, if it has 2 or more stops
assert_folder_exists(f'data/enhanced/{carpool.agency}/')
with open(filename, 'w', encoding='utf-8') as f:
f.write(enhanced_carpool.json())
logger.info("Added enhanced carpool %s:%s", carpool.agency, carpool.id)
return self._load_as_trip(enhanced_carpool)
except RoutingException as err:
logger.warning("Failed to add carpool %s:%s to TripStore due to RoutingException %s", carpool.agency, carpool.id, getattr(err, 'message', repr(err)))
self.handle_failed_carpool_enhancement(carpool)
except Exception as err:
logger.error("Failed to add carpool %s:%s to TripStore.", carpool.agency, carpool.id, exc_info=True)
self.handle_failed_carpool_enhancement(carpool)
def handle_failed_carpool_enhancement(sellf, carpool: Carpool):
assert_folder_exists(f'data/failed/{carpool.agency}/')
with open(f'data/failed/{carpool.agency}/{carpool.id}.json', 'w', encoding='utf-8') as f:
f.write(carpool.json())
def distance_in_m(self, carpool):
if len(carpool.stops) < 2:
return 0
s1 = carpool.stops[0]
s2 = carpool.stops[-1]
return geodesic_distance_in_m((s1.lon, s1.lat),(s2.lon, s2.lat))
def recently_added_trips(self):
return list(self.recent_trips.values())
def recently_deleted_trips(self):
return list(self.deleted_trips.values())
def _load_carpool_if_exists(self, agency_id: str, carpool_id: str):
if carpool_exists(agency_id, carpool_id, 'data/enhanced'):
try:
return load_carpool(agency_id, carpool_id, 'data/enhanced')
except Exception as e:
# An error on restore could be caused by model changes,
# in such a case, it need's to be recreated
logger.warning("Could not restore enhanced trip %s:%s, reason: %s", agency_id, carpool_id, repr(e))
return None
def _load_as_trip(self, carpool: Carpool):
trip = self.transformer.transform_to_trip(carpool)
id = trip.trip_id
self.trips[id] = trip
if not is_older_than_days(carpool.lastUpdated, 1):
self.recent_trips[id] = trip
logger.debug("Added trip %s", id)
return trip
def delete_carpool(self, agency_id: str, carpool_id: str):
"""
Deletes carpool from the TripStore.
"""
agencyScopedCarpoolId = f"{agency_id}:{carpool_id}"
trip_to_be_deleted = self.trips.get(agencyScopedCarpoolId)
if trip_to_be_deleted:
self.deleted_trips[agencyScopedCarpoolId] = trip_to_be_deleted
del self.trips[agencyScopedCarpoolId]
if self.recent_trips.get(agencyScopedCarpoolId):
del self.recent_trips[agencyScopedCarpoolId]
if carpool_exists(agency_id, carpool_id):
remove_carpool_file(agency_id, carpool_id)
logger.debug("Deleted trip %s", id)
def unflag_unrecent_updates(self):
"""
Trips that were last updated before yesterday, are not recent
any longer. As no updates need to be sent for them any longer,
they will be removed from recent recent_trips and deleted_trips.
"""
for key in list(self.recent_trips):
t = self.recent_trips.get(key)
if t and t.lastUpdated.date() < yesterday():
del self.recent_trips[key]
for key in list(self.deleted_trips):
t = self.deleted_trips.get(key)
if t and t.lastUpdated.date() < yesterday():
del self.deleted_trips[key]
class TripTransformer:
REPLACE_CARPOOL_STOPS_BY_CLOSEST_TRANSIT_STOPS = True
REPLACEMENT_STOPS_SERACH_RADIUS_IN_M = 1000
SIMPLIFY_TOLERANCE = 0.0001
router = RoutingService()
def __init__(self, stops_store):
self.stops_store = stops_store
def transform_to_trip(self, carpool):
stop_times = self._convert_stop_times(carpool)
route_name = carpool.stops[0].name + " nach " + carpool.stops[-1].name
headsign= carpool.stops[-1].name
trip_id = self._trip_id(carpool)
path = carpool.path
bbox = box(
min([pt[0] for pt in path.coordinates]),
min([pt[1] for pt in path.coordinates]),
max([pt[0] for pt in path.coordinates]),
max([pt[1] for pt in path.coordinates]))
trip = Trip(trip_id, route_name, headsign, str(carpool.deeplink), carpool.departureDate, carpool.departureTime, carpool.path, carpool.agency, carpool.lastUpdated, stop_times, bbox)
return trip
def _trip_id(self, carpool):
return f"{carpool.agency}:{carpool.id}"
def _replace_stops_by_transit_stops(self, carpool, max_search_distance):
new_stops = []
for carpool_stop in carpool.stops:
new_stops.append(self.stops_store.find_closest_stop(carpool_stop, max_search_distance))
return new_stops
def enhance_carpool(self, carpool):
if self.REPLACE_CARPOOL_STOPS_BY_CLOSEST_TRANSIT_STOPS:
carpool.stops = self._replace_stops_by_transit_stops(carpool, self.REPLACEMENT_STOPS_SERACH_RADIUS_IN_M)
path = self._path_for_ride(carpool)
lineString_shapely_wgs84 = LineString(coordinates = path["points"]["coordinates"]).simplify(0.0001)
lineString_wgs84 = GeoJSONLineString(type="LineString", coordinates=list(lineString_shapely_wgs84.coords))
virtual_stops = self.stops_store.find_additional_stops_around(lineString_wgs84, carpool.stops)
if not virtual_stops.empty:
virtual_stops["time"] = self._estimate_times(path, virtual_stops['distance'])
logger.debug("Virtual stops found: {}".format(virtual_stops))
if len(virtual_stops) > MAX_STOPS_PER_TRIP:
# in case we found more than MAX_STOPS_PER_TRIP, we retain first and last
# half of MAX_STOPS_PER_TRIP
virtual_stops = virtual_stops.iloc[np.r_[0:int(MAX_STOPS_PER_TRIP/2), int(MAX_STOPS_PER_TRIP/2):]]
trip_id = f"{carpool.agency}:{carpool.id}"
stop_times = self._stops_and_stop_times(carpool.departureTime, trip_id, virtual_stops)
enhanced_carpool = carpool.copy()
enhanced_carpool.stops = stop_times
enhanced_carpool.path = lineString_wgs84
return enhanced_carpool
def _convert_stop_times(self, carpool):
stop_times = [GtfsStopTime(
self._trip_id(carpool),
stop.arrivalTime,
stop.departureTime,
stop.id,
seq_nr+1,
STOP_TIMES_STOP_TYPE_NONE if stop.pickup_dropoff == PickupDropoffType.only_dropoff else STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER,
STOP_TIMES_STOP_TYPE_NONE if stop.pickup_dropoff == PickupDropoffType.only_pickup else STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER,
STOP_TIMES_TIMEPOINT_APPROXIMATE)
for seq_nr, stop in enumerate(carpool.stops)]
return stop_times
def _path_for_ride(self, carpool):
points = self._stop_coords(carpool.stops)
return self.router.path_for_stops(points)
def _stop_coords(self, stops):
# Retrieve coordinates of all officially announced stops (start, intermediate, target)
return [Point(stop.lon, stop.lat) for stop in stops]
def _estimate_times(self, path, distances_from_start):
cumulated_distance = 0
cumulated_time = 0
stop_times = []
instructions = path["instructions"]
cnt = 0
instr_distance = instructions[cnt]["distance"]
instr_time = instructions[cnt]["time"]
for distance in distances_from_start:
while cnt < len(instructions) and cumulated_distance + instructions[cnt]["distance"] < distance:
cumulated_distance = cumulated_distance + instructions[cnt]["distance"]
cumulated_time = cumulated_time + instructions[cnt]["time"]
cnt = cnt + 1
if cnt < len(instructions):
if instructions[cnt]["distance"] ==0:
raise RoutingException("Origin and destinaction too close")
percent_dist = (distance - cumulated_distance) / instructions[cnt]["distance"]
stop_time = cumulated_time + percent_dist * instructions[cnt]["time"]
stop_times.append(stop_time)
else:
logger.debug("distance {} exceeds total length {}, using max arrival time {}".format(distance, cumulated_distance, cumulated_time))
stop_times.append(cumulated_time)
return stop_times
def _stops_and_stop_times(self, start_time, trip_id, stops_frame):
# Assumptions:
# arrival_time = departure_time
# pickup_type, drop_off_type for origin: = coordinate/none
# pickup_type, drop_off_type for destination: = none/coordinate
# timepoint = approximate for origin and destination (not sure what consequences this might have for trip planners)
number_of_stops = len(stops_frame.index)
total_distance = stops_frame.iloc[number_of_stops-1]["distance"]
first_stop_time = GtfsTimeDelta(hours = start_time.hour, minutes = start_time.minute, seconds = start_time.second)
stop_times = []
seq_nr = 0
for i in range(0, number_of_stops):
current_stop = stops_frame.iloc[i]
if not current_stop.id:
continue
elif i == 0:
if (stops_frame.iloc[1].time-current_stop.time) < 1000:
# skip custom stop if there is an official stop very close by
logger.debug("Skipped stop %s", current_stop.id)
continue
else:
if (current_stop.time-stops_frame.iloc[i-1].time) < 5000 and not i==1 and not is_carpooling_stop(current_stop.id, current_stop.stop_name):
# skip latter stop if it's very close (<5 seconds drive) by the preceding
logger.debug("Skipped stop %s", current_stop.id)
continue
trip_time = timedelta(milliseconds=int(current_stop.time))
is_dropoff = self._is_dropoff_stop(current_stop, total_distance)
is_pickup = self._is_pickup_stop(current_stop, total_distance)
# TODO would be nice if possible to publish a minimum shared distance
pickup_type = STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER if is_pickup else STOP_TIMES_STOP_TYPE_NONE
dropoff_type = STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER if is_dropoff else STOP_TIMES_STOP_TYPE_NONE
if is_pickup and not is_dropoff:
pickup_dropoff = PickupDropoffType.only_pickup
elif not is_pickup and is_dropoff:
pickup_dropoff = PickupDropoffType.only_dropoff
else:
pickup_dropoff = PickupDropoffType.pickup_and_dropoff
next_stop_time = first_stop_time + trip_time
seq_nr += 1
stop_times.append(StopTime(**{
'arrivalTime': str(next_stop_time),
'departureTime': str(next_stop_time),
'id': current_stop.id,
'pickup_dropoff': pickup_dropoff,
'name': str(current_stop.stop_name),
'lat': current_stop.y,
'lon': current_stop.x
}))
return stop_times
def _is_dropoff_stop(self, current_stop, total_distance):
return current_stop["distance"] >= 0.5 * total_distance
def _is_pickup_stop(self, current_stop, total_distance):
return current_stop["distance"] < 0.5 * total_distance
def load_carpool(agency_id: str, carpool_id: str, folder: str ='data/enhanced') -> Carpool:
with open(f'{folder}/{agency_id}/{carpool_id}.json', 'r', encoding='utf-8') as f:
dict = json.load(f)
carpool = Carpool(**dict)
return carpool
def carpool_exists(agency_id: str, carpool_id: str, folder: str ='data/enhanced'):
return os.path.exists(f"{folder}/{agency_id}/{carpool_id}.json")
def remove_carpool_file(agency_id: str, carpool_id: str, folder: str ='data/enhanced'):
return os.remove(f"{folder}/{agency_id}/{carpool_id}.json")