moved gtfs generation to export plugin

This commit is contained in:
Csaba 2024-02-08 14:32:54 +01:00
parent 3656f15599
commit ef4f0a03e2
4 changed files with 6 additions and 524 deletions

View file

@ -27,7 +27,11 @@ def configure_container():
def configure_enhancer_services():
configure_services()
configure_container()
# configure_container may have been called already by an exporter, no need to run it again
if 'stops_store' not in container:
configure_container()
else: logger.info("Container already configured")
logger.info("Restore carpools...")
@ -47,5 +51,3 @@ def configure_enhancer_services():
container['carpools'].delete(carpool.agency, carpool.id)
logger.info("Restored carpools: %s", container['carpools'].get_all_ids())
logger.info("Starting scheduler")
gtfs_generator.start_schedule()

View file

@ -1,137 +0,0 @@
import amarillo.plugins.enhancer.services.gtfsrt.gtfs_realtime_pb2 as gtfs_realtime_pb2
import amarillo.plugins.enhancer.services.gtfsrt.realtime_extension_pb2 as mfdzrte
from amarillo.plugins.enhancer.services.gtfs_constants import *
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import ParseDict
from datetime import datetime, timedelta
import json
import re
import time
class GtfsRtProducer():
def __init__(self, trip_store):
self.trip_store = trip_store
def generate_feed(self, time, format='protobuf', bbox=None):
# See https://developers.google.com/transit/gtfs-realtime/reference
# https://github.com/mfdz/carpool-gtfs-rt/blob/master/src/main/java/de/mfdz/resource/CarpoolResource.java
gtfsrt_dict = {
'header': {
'gtfsRealtimeVersion': '1.0',
'timestamp': int(time)
},
'entity': self._get_trip_updates(bbox)
}
feed = gtfs_realtime_pb2.FeedMessage()
ParseDict(gtfsrt_dict, feed)
if "message" == format.lower():
return feed
elif "json" == format.lower():
return MessageToDict(feed)
else:
return feed.SerializeToString()
def export_feed(self, timestamp, file_path, bbox=None):
"""
Exports gtfs-rt feed as .json and .pbf file to file_path
"""
feed = self.generate_feed(timestamp, "message", bbox)
with open(f"{file_path}.pbf", "wb") as f:
f.write(feed.SerializeToString())
with open(f"{file_path}.json", "w") as f:
json.dump(MessageToDict(feed), f)
def _get_trip_updates(self, bbox = None):
trips = []
trips.extend(self._get_added(bbox))
trips.extend(self._get_deleted(bbox))
trip_updates = []
for num, trip in enumerate(trips):
trip_updates.append( {
'id': f'carpool-update-{num}',
'tripUpdate': trip
}
)
return trip_updates
def _get_deleted(self, bbox = None):
return self._get_updates(
self.trip_store.recently_deleted_trips(),
self._as_delete_updates,
bbox)
def _get_added(self, bbox = None):
return self._get_updates(
self.trip_store.recently_added_trips(),
self._as_added_updates,
bbox)
def _get_updates(self, trips, update_func, bbox = None):
updates = []
today = datetime.today()
for t in trips:
if bbox == None or t.intersects(bbox):
updates.extend(update_func(t, today))
return updates
def _as_delete_updates(self, trip, fromdate):
return [{
'trip': {
'tripId': trip.trip_id,
'startTime': trip.start_time_str(),
'startDate': trip_date,
'scheduleRelationship': 'CANCELED',
'routeId': trip.trip_id
}
} for trip_date in trip.next_trip_dates(fromdate)]
def _to_seconds(self, fromdate, stop_time):
startdate = datetime.strptime(fromdate, '%Y%m%d')
m = re.search(r'(\d+):(\d+):(\d+)', stop_time)
delta = timedelta(
hours=int(m.group(1)),
minutes=int(m.group(2)),
seconds=int(m.group(3)))
return time.mktime((startdate + delta).timetuple())
def _to_stop_times(self, trip, fromdate):
return [{
'stopSequence': stoptime.stop_sequence,
'arrival': {
'time': self._to_seconds(fromdate, stoptime.arrival_time),
'uncertainty': MFDZ_DEFAULT_UNCERTAINITY
},
'departure': {
'time': self._to_seconds(fromdate, stoptime.departure_time),
'uncertainty': MFDZ_DEFAULT_UNCERTAINITY
},
'stopId': stoptime.stop_id,
'scheduleRelationship': 'SCHEDULED',
'stop_time_properties': {
'[transit_realtime.stop_time_properties]': {
'dropoffType': 'COORDINATE_WITH_DRIVER' if stoptime.drop_off_type == STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER else 'NONE',
'pickupType': 'COORDINATE_WITH_DRIVER' if stoptime.pickup_type == STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER else 'NONE'
}
}
}
for stoptime in trip.stop_times]
def _as_added_updates(self, trip, fromdate):
return [{
'trip': {
'tripId': trip.trip_id,
'startTime': trip.start_time_str(),
'startDate': trip_date,
'scheduleRelationship': 'ADDED',
'routeId': trip.trip_id,
'[transit_realtime.trip_descriptor]': {
'routeUrl' : trip.url,
'agencyId' : trip.agency,
'route_long_name' : trip.route_long_name(),
'route_type': RIDESHARING_ROUTE_TYPE
}
},
'stopTimeUpdate': self._to_stop_times(trip, trip_date)
} for trip_date in trip.next_trip_dates(fromdate)]

View file

@ -1,312 +0,0 @@
from collections.abc import Iterable
from datetime import datetime, timedelta
from zipfile import ZipFile
import csv
import gettext
import logging
import re
from amarillo.utils.utils import assert_folder_exists
from amarillo.plugins.enhancer.models.gtfs import GtfsTimeDelta, GtfsFeedInfo, GtfsAgency, GtfsRoute, GtfsStop, GtfsTrip, GtfsCalendar, GtfsCalendarDate, GtfsShape, GtfsDriver, GtfsAdditionalRidesharingInfo
from amarillo.models.Carpool import Driver, RidesharingInfo
from amarillo.plugins.enhancer.services.stops import is_carpooling_stop
from amarillo.plugins.enhancer.services.gtfs_constants import *
from amarillo.utils.utils import geodesic_distance_in_m
from amarillo.plugins.enhancer.services.trips import Trip
logger = logging.getLogger(__name__)
class GtfsExport:
stops_counter = 0
trips_counter = 0
trip_counter = 0
stored_stops = {}
def __init__(self, agencies, feed_info, ridestore, stopstore, bbox = None):
self.stops = {}
self.routes = []
self.calendar_dates = []
self.calendar = []
self.trips = []
self.stop_times = []
self.calendar = []
self.shapes = []
self.drivers = {} #use a dictionary to avoid duplicate ids
self.additional_ridesharing_infos = []
self.agencies = agencies
self.feed_info = feed_info
self.localized_to = " nach "
self.localized_short_name = "Mitfahrgelegenheit"
self.stopstore = stopstore
self.ridestore = ridestore
self.bbox = bbox
def export(self, gtfszip_filename, gtfsfolder):
assert_folder_exists(gtfsfolder)
self._prepare_gtfs_feed(self.ridestore, self.stopstore)
self._write_csvfile(gtfsfolder, 'agency.txt', self.agencies)
self._write_csvfile(gtfsfolder, 'feed_info.txt', self.feed_info)
self._write_csvfile(gtfsfolder, 'routes.txt', self.routes)
self._write_csvfile(gtfsfolder, 'trips.txt', self.trips)
self._write_csvfile(gtfsfolder, 'calendar.txt', self.calendar)
self._write_csvfile(gtfsfolder, 'calendar_dates.txt', self.calendar_dates)
self._write_csvfile(gtfsfolder, 'stops.txt', self.stops.values())
self._write_csvfile(gtfsfolder, 'stop_times.txt', self.stop_times)
self._write_csvfile(gtfsfolder, 'shapes.txt', self.shapes)
self._write_csvfile(gtfsfolder, 'driver.txt', self.drivers.values())
self._write_csvfile(gtfsfolder, 'additional_ridesharing_info.txt', self.additional_ridesharing_infos)
self._zip_files(gtfszip_filename, gtfsfolder)
def _zip_files(self, gtfszip_filename, gtfsfolder):
gtfsfiles = ['agency.txt', 'feed_info.txt', 'routes.txt', 'trips.txt',
'calendar.txt', 'calendar_dates.txt', 'stops.txt', 'stop_times.txt',
'shapes.txt', 'driver.txt', 'additional_ridesharing_info.txt']
with ZipFile(gtfszip_filename, 'w') as gtfszip:
for gtfsfile in gtfsfiles:
gtfszip.write(gtfsfolder+'/'+gtfsfile, gtfsfile)
def _prepare_gtfs_feed(self, ridestore, stopstore):
"""
Prepares all gtfs objects in memory before they are written
to their respective streams.
For all wellknown stops a GTFS stop is created and
afterwards all ride offers are transformed into their
gtfs equivalents.
"""
for stopSet in stopstore.stopsDataFrames:
for stop in stopSet["stops"].itertuples():
self._load_stored_stop(stop)
cloned_trips = dict(ridestore.trips)
groups, cloned_trips = self.group_trips_into_routes(cloned_trips)
for group in groups:
if self.bbox is None or any(trip.intersects(self.bbox) for trip in group.values()):
self.convert_route(group)
for url, trip in cloned_trips.items():
# TODO: convert ridesharing info and driver data
if self.bbox is None or trip.intersects(self.bbox):
self._convert_trip(trip)
def group_trips_into_routes(self, trips: dict):
ungrouped_trips = dict(trips)
route_groups = list()
current_route_id = 1
while len(ungrouped_trips) > 0:
trip_id, current_trip = ungrouped_trips.popitem()
current_group = {trip_id: current_trip}
current_trip.route_id = current_route_id
for other_id, other_trip in list(ungrouped_trips.items()):
# if an ungrouped trip is close to any of the grouped trips, add it to the route group
if (any(self.trips_are_close(other_trip, grouped_trip) for grouped_trip in current_group.values())):
current_group[other_id] = ungrouped_trips.pop(other_id)
current_group[other_id].route_id = current_route_id
route_groups.append(current_group)
current_route_id += 1
return route_groups, trips
def trips_are_close(self, trip1, trip2):
trip1_start = trip1.path.coordinates[0]
trip1_end = trip1.path.coordinates[-1]
trip2_start = trip2.path.coordinates[0]
trip2_end = trip2.path.coordinates[-1]
res = self.within_range(trip1_start, trip2_start) and self.within_range(trip1_end, trip2_end)
return res
def within_range(self, stop1, stop2):
MERGE_RANGE_M = 500
return geodesic_distance_in_m(stop1, stop2) <= MERGE_RANGE_M
def convert_route(self, route_group):
agency = "multiple"
#if there is only one agency, use that
agencies = set(trip.agency for id, trip in route_group.items())
if len(agencies) == 1: agency = agencies.pop()
trip = next(iter(route_group.values())) # grab any trip, relevant values should be the same
self.routes.append(self._create_route(agency, trip.route_id, trip.route_name))
def _convert_trip(self, trip: Trip):
self.trip_counter += 1
self.calendar.append(self._create_calendar(trip))
if not trip.runs_regularly:
self.calendar_dates.append(self._create_calendar_date(trip))
self.trips.append(self._create_trip(trip, self.trip_counter))
self._append_stops_and_stop_times(trip)
self._append_shapes(trip, self.trip_counter)
if(trip.driver is not None):
self.drivers[trip.driver.driver_id] = self._convert_driver(trip.driver)
if(trip.additional_ridesharing_info is not None):
self.additional_ridesharing_infos.append(
self._convert_additional_ridesharing_info(trip.trip_id, trip.additional_ridesharing_info))
def _convert_driver(self, driver: Driver):
return GtfsDriver(driver.driver_id, driver.profile_picture, driver.rating)
def _convert_additional_ridesharing_info(self, trip_id, info: RidesharingInfo):
# if we don't specify .value, the enum will appear in the export as e.g. LuggageSize.large
# and missing optional values get None
def get_enum_value(enum):
return enum.value if enum is not None else None
def format_date(date: datetime):
return date.strftime("%Y%m%d %H:%M:%S")
return GtfsAdditionalRidesharingInfo(
trip_id, info.number_free_seats, get_enum_value(info.same_gender), get_enum_value(info.luggage_size), get_enum_value(info.animal_car),
info.car_model, info.car_brand, format_date(info.creation_date), get_enum_value(info.smoking), info.payment_method)
def _trip_headsign(self, destination):
destination = destination.replace('(Deutschland)', '')
destination = destination.replace(', Deutschland', '')
appendix = ''
if 'Schweiz' in destination or 'Switzerland' in destination:
appendix = ', Schweiz'
destination = destination.replace('(Schweiz)', '')
destination = destination.replace(', Schweiz', '')
destination = destination.replace('(Switzerland)', '')
try:
matches = re.match(r"(.*,)? ?(\d{4,5})? ?(.*)", destination)
match = matches.group(3).strip() if matches != None else destination.strip()
if match[-1]==')' and not '(' in match:
match = match[0:-1]
return match + appendix
except Exception as ex:
logger.error("error for "+destination )
logger.exception(ex)
return destination
def _create_route(self, agency, route_id, long_name):
return GtfsRoute(agency, route_id, long_name, RIDESHARING_ROUTE_TYPE, "")
def _create_calendar(self, trip):
# TODO currently, calendar is not provided by Fahrgemeinschaft.de interface.
# We could apply some heuristics like requesting multiple days and extrapolate
# if multiple trips are found, but better would be to have these provided by the
# offical interface. Then validity periods should be provided as well (not
# sure if these are available)
# For fahrgemeinschaft.de, regurlar trips are recognizable via their url
# which contains "regelmaessig". However, we don't know on which days of the week,
# nor until when. As a first guess, if datetime is a mo-fr, we assume each workday,
# if it's sa/su, only this...
feed_start_date = datetime.today()
stop_date = self._convert_stop_date(feed_start_date)
return GtfsCalendar(trip.trip_id, stop_date, self._convert_stop_date(feed_start_date + timedelta(days=31)), *(trip.weekdays))
def _create_calendar_date(self, trip):
return GtfsCalendarDate(trip.trip_id, self._convert_stop_date(trip.start), CALENDAR_DATES_EXCEPTION_TYPE_ADDED)
def _create_trip(self, trip : Trip, shape_id):
driver_id = None if trip.driver is None else trip.driver.driver_id
return GtfsTrip(trip.route_id, trip.trip_id, driver_id, trip.trip_id, shape_id, trip.trip_headsign, NO_BIKES_ALLOWED, trip.url)
def _convert_stop(self, stop):
"""
Converts a stop represented as pandas row to a gtfs stop.
Expected attributes of stop: id, stop_name, x, y (in wgs84)
"""
if stop.id:
id = stop.id
else:
self.stops_counter += 1
id = "tmp-{}".format(self.stops_counter)
stop_name = "k.A." if stop.stop_name is None else stop.stop_name
return GtfsStop(id, stop.y, stop.x, stop_name)
def _append_stops_and_stop_times(self, trip):
# Assumptions:
# arrival_time = departure_time
# pickup_type, drop_off_type for origin: = coordinate/none
# pickup_type, drop_off_type for destination: = none/coordinate
# timepoint = approximate for origin and destination (not sure what consequences this might have for trip planners)
for stop_time in trip.stop_times:
# retrieve stop from stored_stops and mark it to be exported
wkn_stop = self.stored_stops.get(stop_time.stop_id)
if not wkn_stop:
logger.warning("No stop found in stop_store for %s. Will skip stop_time %s of trip %s", stop_time.stop_id, stop_time.stop_sequence, trip.trip_id)
else:
self.stops[stop_time.stop_id] = wkn_stop
# Append stop_time
self.stop_times.append(stop_time)
def _append_shapes(self, trip, shape_id):
counter = 0
for point in trip.path.coordinates:
counter += 1
self.shapes.append(GtfsShape(shape_id, point[0], point[1], counter))
def _stop_hash(self, stop):
return "{}#{}#{}".format(stop.stop_name,stop.x,stop.y)
def _should_always_export(self, stop):
"""
Returns true, if the given stop shall be exported to GTFS,
regardless, if it's part of a trip or not.
This is necessary, as potential stops are required
to be part of the GTFS to be referenced later on
by dynamicly added trips.
"""
if self.bbox:
return (self.bbox[0] <= stop.stop_lon <= self.bbox[2] and
self.bbox[1] <= stop.stop_lat <= self.bbox[3])
else:
return is_carpooling_stop(stop.stop_id, stop.stop_name)
def _load_stored_stop(self, stop):
gtfsstop = self._convert_stop(stop)
stop_hash = self._stop_hash(stop)
self.stored_stops[gtfsstop.stop_id] = gtfsstop
if self._should_always_export(gtfsstop):
self.stops[gtfsstop.stop_id] = gtfsstop
def _get_stop_by_hash(self, stop_hash):
return self.stops.get(stop_hash, self.stored_stops.get(stop_hash))
def _get_or_create_stop(self, stop):
stop_hash = self._stop_hash(stop)
gtfsstop = self.stops.get(stop_hash)
if gtfsstop is None:
gtfsstop = self.stored_stops.get(stop_hash, self._convert_stop(stop))
self.stops[stop_hash] = gtfsstop
return gtfsstop
def _convert_stop_date(self, date_time):
return date_time.strftime("%Y%m%d")
def _write_csvfile(self, gtfsfolder, filename, content):
with open(gtfsfolder+"/"+filename, 'w', newline="\n", encoding="utf-8") as csvfile:
self._write_csv(csvfile, content)
def _write_csv(self, csvfile, content):
if hasattr(content, '_fields'):
writer = csv.DictWriter(csvfile, content._fields)
writer.writeheader()
writer.writerow(content._asdict())
else:
if content:
writer = csv.DictWriter(csvfile, next(iter(content))._fields)
writer.writeheader()
for record in content:
writer.writerow(record._asdict())

View file

@ -1,71 +0,0 @@
from amarillo.models.Carpool import Region
from amarillo.plugins.enhancer.services.gtfs_export import GtfsExport, GtfsFeedInfo, GtfsAgency
from amarillo.plugins.enhancer.services.gtfs import GtfsRtProducer
from amarillo.utils.container import container
from glob import glob
import json
import schedule
import threading
import time
import logging
from datetime import date, timedelta
logger = logging.getLogger(__name__)
regions = {}
for region_file_name in glob('conf/region/*.json'):
with open(region_file_name) as region_file:
dict = json.load(region_file)
region = Region(**dict)
region_id = region.id
regions[region_id] = region
agencies = []
for agency_file_name in glob('conf/agency/*.json'):
with open(agency_file_name) as agency_file:
dict = json.load(agency_file)
agency = GtfsAgency(dict["id"], dict["name"], dict["url"], dict["timezone"], dict["lang"], dict["email"])
agency_id = agency.agency_id
agencies.append(agency)
def run_schedule():
while 1:
try:
schedule.run_pending()
except Exception as e:
logger.exception(e)
time.sleep(1)
def midnight():
container['stops_store'].load_stop_sources()
container['trips_store'].unflag_unrecent_updates()
container['carpools'].purge_outdated_offers()
generate_gtfs()
def generate_gtfs():
logger.info("Generate GTFS")
for region in regions.values():
# TODO make feed producer infos configurable
feed_info = GtfsFeedInfo('mfdz', 'MITFAHR|DE|ZENTRALE', 'http://www.mitfahrdezentrale.de', 'de', 1)
exporter = GtfsExport(
agencies,
feed_info,
container['trips_store'],
container['stops_store'],
region.bbox)
exporter.export(f"data/gtfs/amarillo.{region.id}.gtfs.zip", "data/tmp/")
def generate_gtfs_rt():
logger.info("Generate GTFS-RT")
producer = GtfsRtProducer(container['trips_store'])
for region in regions.values():
rt = producer.export_feed(time.time(), f"data/gtfs/amarillo.{region.id}.gtfsrt", bbox=region.bbox)
def start_schedule():
schedule.every().day.at("00:00").do(midnight)
schedule.every(60).seconds.do(generate_gtfs_rt)
# Create all feeds once at startup
schedule.run_all()
job_thread = threading.Thread(target=run_schedule, daemon=True)
job_thread.start()