From 05a0bcdb66e598445eaa86016c859d6c100aa254 Mon Sep 17 00:00:00 2001 From: Francia Csaba Date: Mon, 11 Dec 2023 12:32:30 +0100 Subject: [PATCH] Added gtfs_export and gtfs_generator --- amarillo/app/services/gtfs_export.py | 229 ++++++++++++++++++++++++ amarillo/app/services/gtfs_generator.py | 71 ++++++++ 2 files changed, 300 insertions(+) create mode 100644 amarillo/app/services/gtfs_export.py create mode 100644 amarillo/app/services/gtfs_generator.py diff --git a/amarillo/app/services/gtfs_export.py b/amarillo/app/services/gtfs_export.py new file mode 100644 index 0000000..b60af4e --- /dev/null +++ b/amarillo/app/services/gtfs_export.py @@ -0,0 +1,229 @@ + +from collections.abc import Iterable +from datetime import datetime, timedelta +from zipfile import ZipFile +import csv +import gettext +import logging +import re + +from app.utils.utils import assert_folder_exists +from app.models.gtfs import GtfsTimeDelta, GtfsFeedInfo, GtfsAgency, GtfsRoute, GtfsStop, GtfsStopTime, GtfsTrip, GtfsCalendar, GtfsCalendarDate, GtfsShape +from app.services.stops import is_carpooling_stop +from app.services.gtfs_constants import * + + +logger = logging.getLogger(__name__) + +class GtfsExport: + + stops_counter = 0 + trips_counter = 0 + routes_counter = 0 + + stored_stops = {} + + def __init__(self, agencies, feed_info, ridestore, stopstore, bbox = None): + self.stops = {} + self.routes = [] + self.calendar_dates = [] + self.calendar = [] + self.trips = [] + self.stop_times = [] + self.calendar = [] + self.shapes = [] + self.agencies = agencies + self.feed_info = feed_info + self.localized_to = " nach " + self.localized_short_name = "Mitfahrgelegenheit" + self.stopstore = stopstore + self.ridestore = ridestore + self.bbox = bbox + + def export(self, gtfszip_filename, gtfsfolder): + assert_folder_exists(gtfsfolder) + self._prepare_gtfs_feed(self.ridestore, self.stopstore) + self._write_csvfile(gtfsfolder, 'agency.txt', self.agencies) + self._write_csvfile(gtfsfolder, 'feed_info.txt', self.feed_info) + self._write_csvfile(gtfsfolder, 'routes.txt', self.routes) + self._write_csvfile(gtfsfolder, 'trips.txt', self.trips) + self._write_csvfile(gtfsfolder, 'calendar.txt', self.calendar) + self._write_csvfile(gtfsfolder, 'calendar_dates.txt', self.calendar_dates) + self._write_csvfile(gtfsfolder, 'stops.txt', self.stops.values()) + self._write_csvfile(gtfsfolder, 'stop_times.txt', self.stop_times) + self._write_csvfile(gtfsfolder, 'shapes.txt', self.shapes) + self._zip_files(gtfszip_filename, gtfsfolder) + + def _zip_files(self, gtfszip_filename, gtfsfolder): + gtfsfiles = ['agency.txt', 'feed_info.txt', 'routes.txt', 'trips.txt', + 'calendar.txt', 'calendar_dates.txt', 'stops.txt', 'stop_times.txt', 'shapes.txt'] + with ZipFile(gtfszip_filename, 'w') as gtfszip: + for gtfsfile in gtfsfiles: + gtfszip.write(gtfsfolder+'/'+gtfsfile, gtfsfile) + + def _prepare_gtfs_feed(self, ridestore, stopstore): + """ + Prepares all gtfs objects in memory before they are written + to their respective streams. + + For all wellknown stops a GTFS stop is created and + afterwards all ride offers are transformed into their + gtfs equivalents. + """ + for stopSet in stopstore.stopsDataFrames: + for stop in stopSet["stops"].itertuples(): + self._load_stored_stop(stop) + cloned_trips = dict(ridestore.trips) + for url, trip in cloned_trips.items(): + if self.bbox is None or trip.intersects(self.bbox): + self._convert_trip(trip) + + def _convert_trip(self, trip): + self.routes_counter += 1 + self.routes.append(self._create_route(trip)) + self.calendar.append(self._create_calendar(trip)) + if not trip.runs_regularly: + self.calendar_dates.append(self._create_calendar_date(trip)) + self.trips.append(self._create_trip(trip, self.routes_counter)) + self._append_stops_and_stop_times(trip) + self._append_shapes(trip, self.routes_counter) + + def _trip_headsign(self, destination): + destination = destination.replace('(Deutschland)', '') + destination = destination.replace(', Deutschland', '') + appendix = '' + if 'Schweiz' in destination or 'Switzerland' in destination: + appendix = ', Schweiz' + destination = destination.replace('(Schweiz)', '') + destination = destination.replace(', Schweiz', '') + destination = destination.replace('(Switzerland)', '') + + try: + matches = re.match(r"(.*,)? ?(\d{4,5})? ?(.*)", destination) + + match = matches.group(3).strip() if matches != None else destination.strip() + if match[-1]==')' and not '(' in match: + match = match[0:-1] + + return match + appendix + except Exception as ex: + logger.error("error for "+destination ) + logger.exception(ex) + return destination + + def _create_route(self, trip): + return GtfsRoute(trip.agency, trip.trip_id, trip.route_long_name(), RIDESHARING_ROUTE_TYPE, trip.url, "") + + def _create_calendar(self, trip): + # TODO currently, calendar is not provided by Fahrgemeinschaft.de interface. + # We could apply some heuristics like requesting multiple days and extrapolate + # if multiple trips are found, but better would be to have these provided by the + # offical interface. Then validity periods should be provided as well (not + # sure if these are available) + # For fahrgemeinschaft.de, regurlar trips are recognizable via their url + # which contains "regelmaessig". However, we don't know on which days of the week, + # nor until when. As a first guess, if datetime is a mo-fr, we assume each workday, + # if it's sa/su, only this... + + feed_start_date = datetime.today() + stop_date = self._convert_stop_date(feed_start_date) + return GtfsCalendar(trip.trip_id, stop_date, self._convert_stop_date(feed_start_date + timedelta(days=31)), *(trip.weekdays)) + + def _create_calendar_date(self, trip): + return GtfsCalendarDate(trip.trip_id, self._convert_stop_date(trip.start), CALENDAR_DATES_EXCEPTION_TYPE_ADDED) + + def _create_trip(self, trip, shape_id): + return GtfsTrip(trip.trip_id, trip.trip_id, trip.trip_id, shape_id, trip.trip_headsign, NO_BIKES_ALLOWED) + + def _convert_stop(self, stop): + """ + Converts a stop represented as pandas row to a gtfs stop. + Expected attributes of stop: id, stop_name, x, y (in wgs84) + """ + if stop.id: + id = stop.id + else: + self.stops_counter += 1 + id = "tmp-{}".format(self.stops_counter) + + stop_name = "k.A." if stop.stop_name is None else stop.stop_name + return GtfsStop(id, stop.y, stop.x, stop_name) + + def _append_stops_and_stop_times(self, trip): + # Assumptions: + # arrival_time = departure_time + # pickup_type, drop_off_type for origin: = coordinate/none + # pickup_type, drop_off_type for destination: = none/coordinate + # timepoint = approximate for origin and destination (not sure what consequences this might have for trip planners) + for stop_time in trip.stop_times: + # retrieve stop from stored_stops and mark it to be exported + wkn_stop = self.stored_stops.get(stop_time.stop_id) + if not wkn_stop: + logger.warning("No stop found in stop_store for %s. Will skip stop_time %s of trip %s", stop_time.stop_id, stop_time.stop_sequence, trip.trip_id) + else: + self.stops[stop_time.stop_id] = wkn_stop + # Append stop_time + self.stop_times.append(stop_time) + + def _append_shapes(self, trip, shape_id): + counter = 0 + for point in trip.path.coordinates: + counter += 1 + self.shapes.append(GtfsShape(shape_id, point[0], point[1], counter)) + + def _stop_hash(self, stop): + return "{}#{}#{}".format(stop.stop_name,stop.x,stop.y) + + def _should_always_export(self, stop): + """ + Returns true, if the given stop shall be exported to GTFS, + regardless, if it's part of a trip or not. + + This is necessary, as potential stops are required + to be part of the GTFS to be referenced later on + by dynamicly added trips. + """ + if self.bbox: + return (self.bbox[0] <= stop.stop_lon <= self.bbox[2] and + self.bbox[1] <= stop.stop_lat <= self.bbox[3]) + else: + return is_carpooling_stop(stop.stop_id, stop.stop_name) + + def _load_stored_stop(self, stop): + gtfsstop = self._convert_stop(stop) + stop_hash = self._stop_hash(stop) + self.stored_stops[gtfsstop.stop_id] = gtfsstop + if self._should_always_export(gtfsstop): + self.stops[gtfsstop.stop_id] = gtfsstop + + def _get_stop_by_hash(self, stop_hash): + return self.stops.get(stop_hash, self.stored_stops.get(stop_hash)) + + def _get_or_create_stop(self, stop): + stop_hash = self._stop_hash(stop) + gtfsstop = self.stops.get(stop_hash) + if gtfsstop is None: + gtfsstop = self.stored_stops.get(stop_hash, self._convert_stop(stop)) + self.stops[stop_hash] = gtfsstop + return gtfsstop + + def _convert_stop_date(self, date_time): + return date_time.strftime("%Y%m%d") + + def _write_csvfile(self, gtfsfolder, filename, content): + with open(gtfsfolder+"/"+filename, 'w', newline="\n", encoding="utf-8") as csvfile: + self._write_csv(csvfile, content) + + def _write_csv(self, csvfile, content): + if hasattr(content, '_fields'): + writer = csv.DictWriter(csvfile, content._fields) + writer.writeheader() + writer.writerow(content._asdict()) + else: + if content: + writer = csv.DictWriter(csvfile, next(iter(content))._fields) + writer.writeheader() + for record in content: + writer.writerow(record._asdict()) + + \ No newline at end of file diff --git a/amarillo/app/services/gtfs_generator.py b/amarillo/app/services/gtfs_generator.py new file mode 100644 index 0000000..2698494 --- /dev/null +++ b/amarillo/app/services/gtfs_generator.py @@ -0,0 +1,71 @@ +from app.models.Carpool import Region +from app.services.gtfs_export import GtfsExport, GtfsFeedInfo, GtfsAgency +from app.services.gtfs import GtfsRtProducer +from app.utils.container import container +from glob import glob +import json +import schedule +import threading +import time +import logging +from datetime import date, timedelta + +logger = logging.getLogger(__name__) + +regions = {} +for region_file_name in glob('conf/region/*.json'): + with open(region_file_name) as region_file: + dict = json.load(region_file) + region = Region(**dict) + region_id = region.id + regions[region_id] = region + +agencies = [] +for agency_file_name in glob('conf/agency/*.json'): + with open(agency_file_name) as agency_file: + dict = json.load(agency_file) + agency = GtfsAgency(dict["id"], dict["name"], dict["url"], dict["timezone"], dict["lang"], dict["email"]) + agency_id = agency.agency_id + agencies.append(agency) + +def run_schedule(): + while 1: + try: + schedule.run_pending() + except Exception as e: + logger.exception(e) + time.sleep(1) + +def midnight(): + container['stops_store'].load_stop_sources() + container['trips_store'].unflag_unrecent_updates() + container['carpools'].purge_outdated_offers() + generate_gtfs() + +def generate_gtfs(): + logger.info("Generate GTFS") + + for region in regions.values(): + # TODO make feed producer infos configurable + feed_info = GtfsFeedInfo('mfdz', 'MITFAHR|DE|ZENTRALE', 'http://www.mitfahrdezentrale.de', 'de', 1) + exporter = GtfsExport( + agencies, + feed_info, + container['trips_store'], + container['stops_store'], + region.bbox) + exporter.export(f"data/gtfs/amarillo.{region.id}.gtfs.zip", "data/tmp/") + +def generate_gtfs_rt(): + logger.info("Generate GTFS-RT") + producer = GtfsRtProducer(container['trips_store']) + for region in regions.values(): + rt = producer.export_feed(time.time(), f"data/gtfs/amarillo.{region.id}.gtfsrt", bbox=region.bbox) + +def start_schedule(): + schedule.every().day.at("00:00").do(midnight) + schedule.every(60).seconds.do(generate_gtfs_rt) + # Create all feeds once at startup + schedule.run_all() + job_thread = threading.Thread(target=run_schedule, daemon=True) + job_thread.start() \ No newline at end of file