moved gtfs generation to export plugin

This commit is contained in:
Csaba 2024-02-08 14:32:54 +01:00
parent 6b1f18d5ce
commit c24c83a5b8
4 changed files with 6 additions and 441 deletions

View file

@ -27,7 +27,11 @@ def configure_container():
def configure_enhancer_services():
configure_services()
configure_container()
# configure_container may have been called already by an exporter, no need to run it again
if 'stops_store' not in container:
configure_container()
else: logger.info("Container already configured")
logger.info("Restore carpools...")
@ -47,5 +51,3 @@ def configure_enhancer_services():
container['carpools'].delete(carpool.agency, carpool.id)
logger.info("Restored carpools: %s", container['carpools'].get_all_ids())
logger.info("Starting scheduler")
gtfs_generator.start_schedule()

View file

@ -1,137 +0,0 @@
import amarillo.plugins.enhancer.services.gtfsrt.gtfs_realtime_pb2 as gtfs_realtime_pb2
import amarillo.plugins.enhancer.services.gtfsrt.realtime_extension_pb2 as mfdzrte
from amarillo.plugins.enhancer.services.gtfs_constants import *
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import ParseDict
from datetime import datetime, timedelta
import json
import re
import time
class GtfsRtProducer():
def __init__(self, trip_store):
self.trip_store = trip_store
def generate_feed(self, time, format='protobuf', bbox=None):
# See https://developers.google.com/transit/gtfs-realtime/reference
# https://github.com/mfdz/carpool-gtfs-rt/blob/master/src/main/java/de/mfdz/resource/CarpoolResource.java
gtfsrt_dict = {
'header': {
'gtfsRealtimeVersion': '1.0',
'timestamp': int(time)
},
'entity': self._get_trip_updates(bbox)
}
feed = gtfs_realtime_pb2.FeedMessage()
ParseDict(gtfsrt_dict, feed)
if "message" == format.lower():
return feed
elif "json" == format.lower():
return MessageToDict(feed)
else:
return feed.SerializeToString()
def export_feed(self, timestamp, file_path, bbox=None):
"""
Exports gtfs-rt feed as .json and .pbf file to file_path
"""
feed = self.generate_feed(timestamp, "message", bbox)
with open(f"{file_path}.pbf", "wb") as f:
f.write(feed.SerializeToString())
with open(f"{file_path}.json", "w") as f:
json.dump(MessageToDict(feed), f)
def _get_trip_updates(self, bbox = None):
trips = []
trips.extend(self._get_added(bbox))
trips.extend(self._get_deleted(bbox))
trip_updates = []
for num, trip in enumerate(trips):
trip_updates.append( {
'id': f'carpool-update-{num}',
'tripUpdate': trip
}
)
return trip_updates
def _get_deleted(self, bbox = None):
return self._get_updates(
self.trip_store.recently_deleted_trips(),
self._as_delete_updates,
bbox)
def _get_added(self, bbox = None):
return self._get_updates(
self.trip_store.recently_added_trips(),
self._as_added_updates,
bbox)
def _get_updates(self, trips, update_func, bbox = None):
updates = []
today = datetime.today()
for t in trips:
if bbox == None or t.intersects(bbox):
updates.extend(update_func(t, today))
return updates
def _as_delete_updates(self, trip, fromdate):
return [{
'trip': {
'tripId': trip.trip_id,
'startTime': trip.start_time_str(),
'startDate': trip_date,
'scheduleRelationship': 'CANCELED',
'routeId': trip.trip_id
}
} for trip_date in trip.next_trip_dates(fromdate)]
def _to_seconds(self, fromdate, stop_time):
startdate = datetime.strptime(fromdate, '%Y%m%d')
m = re.search(r'(\d+):(\d+):(\d+)', stop_time)
delta = timedelta(
hours=int(m.group(1)),
minutes=int(m.group(2)),
seconds=int(m.group(3)))
return time.mktime((startdate + delta).timetuple())
def _to_stop_times(self, trip, fromdate):
return [{
'stopSequence': stoptime.stop_sequence,
'arrival': {
'time': self._to_seconds(fromdate, stoptime.arrival_time),
'uncertainty': MFDZ_DEFAULT_UNCERTAINITY
},
'departure': {
'time': self._to_seconds(fromdate, stoptime.departure_time),
'uncertainty': MFDZ_DEFAULT_UNCERTAINITY
},
'stopId': stoptime.stop_id,
'scheduleRelationship': 'SCHEDULED',
'stop_time_properties': {
'[transit_realtime.stop_time_properties]': {
'dropoffType': 'COORDINATE_WITH_DRIVER' if stoptime.drop_off_type == STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER else 'NONE',
'pickupType': 'COORDINATE_WITH_DRIVER' if stoptime.pickup_type == STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER else 'NONE'
}
}
}
for stoptime in trip.stop_times]
def _as_added_updates(self, trip, fromdate):
return [{
'trip': {
'tripId': trip.trip_id,
'startTime': trip.start_time_str(),
'startDate': trip_date,
'scheduleRelationship': 'ADDED',
'routeId': trip.trip_id,
'[transit_realtime.trip_descriptor]': {
'routeUrl' : trip.url,
'agencyId' : trip.agency,
'route_long_name' : trip.route_long_name(),
'route_type': RIDESHARING_ROUTE_TYPE
}
},
'stopTimeUpdate': self._to_stop_times(trip, trip_date)
} for trip_date in trip.next_trip_dates(fromdate)]

View file

@ -1,229 +0,0 @@
from collections.abc import Iterable
from datetime import datetime, timedelta
from zipfile import ZipFile
import csv
import gettext
import logging
import re
from amarillo.utils.utils import assert_folder_exists
from amarillo.plugins.enhancer.models.gtfs import GtfsTimeDelta, GtfsFeedInfo, GtfsAgency, GtfsRoute, GtfsStop, GtfsStopTime, GtfsTrip, GtfsCalendar, GtfsCalendarDate, GtfsShape
from amarillo.plugins.enhancer.services.stops import is_carpooling_stop
from amarillo.plugins.enhancer.services.gtfs_constants import *
logger = logging.getLogger(__name__)
class GtfsExport:
stops_counter = 0
trips_counter = 0
routes_counter = 0
stored_stops = {}
def __init__(self, agencies, feed_info, ridestore, stopstore, bbox = None):
self.stops = {}
self.routes = []
self.calendar_dates = []
self.calendar = []
self.trips = []
self.stop_times = []
self.calendar = []
self.shapes = []
self.agencies = agencies
self.feed_info = feed_info
self.localized_to = " nach "
self.localized_short_name = "Mitfahrgelegenheit"
self.stopstore = stopstore
self.ridestore = ridestore
self.bbox = bbox
def export(self, gtfszip_filename, gtfsfolder):
assert_folder_exists(gtfsfolder)
self._prepare_gtfs_feed(self.ridestore, self.stopstore)
self._write_csvfile(gtfsfolder, 'agency.txt', self.agencies)
self._write_csvfile(gtfsfolder, 'feed_info.txt', self.feed_info)
self._write_csvfile(gtfsfolder, 'routes.txt', self.routes)
self._write_csvfile(gtfsfolder, 'trips.txt', self.trips)
self._write_csvfile(gtfsfolder, 'calendar.txt', self.calendar)
self._write_csvfile(gtfsfolder, 'calendar_dates.txt', self.calendar_dates)
self._write_csvfile(gtfsfolder, 'stops.txt', self.stops.values())
self._write_csvfile(gtfsfolder, 'stop_times.txt', self.stop_times)
self._write_csvfile(gtfsfolder, 'shapes.txt', self.shapes)
self._zip_files(gtfszip_filename, gtfsfolder)
def _zip_files(self, gtfszip_filename, gtfsfolder):
gtfsfiles = ['agency.txt', 'feed_info.txt', 'routes.txt', 'trips.txt',
'calendar.txt', 'calendar_dates.txt', 'stops.txt', 'stop_times.txt', 'shapes.txt']
with ZipFile(gtfszip_filename, 'w') as gtfszip:
for gtfsfile in gtfsfiles:
gtfszip.write(gtfsfolder+'/'+gtfsfile, gtfsfile)
def _prepare_gtfs_feed(self, ridestore, stopstore):
"""
Prepares all gtfs objects in memory before they are written
to their respective streams.
For all wellknown stops a GTFS stop is created and
afterwards all ride offers are transformed into their
gtfs equivalents.
"""
for stopSet in stopstore.stopsDataFrames:
for stop in stopSet["stops"].itertuples():
self._load_stored_stop(stop)
cloned_trips = dict(ridestore.trips)
for url, trip in cloned_trips.items():
if self.bbox is None or trip.intersects(self.bbox):
self._convert_trip(trip)
def _convert_trip(self, trip):
self.routes_counter += 1
self.routes.append(self._create_route(trip))
self.calendar.append(self._create_calendar(trip))
if not trip.runs_regularly:
self.calendar_dates.append(self._create_calendar_date(trip))
self.trips.append(self._create_trip(trip, self.routes_counter))
self._append_stops_and_stop_times(trip)
self._append_shapes(trip, self.routes_counter)
def _trip_headsign(self, destination):
destination = destination.replace('(Deutschland)', '')
destination = destination.replace(', Deutschland', '')
appendix = ''
if 'Schweiz' in destination or 'Switzerland' in destination:
appendix = ', Schweiz'
destination = destination.replace('(Schweiz)', '')
destination = destination.replace(', Schweiz', '')
destination = destination.replace('(Switzerland)', '')
try:
matches = re.match(r"(.*,)? ?(\d{4,5})? ?(.*)", destination)
match = matches.group(3).strip() if matches != None else destination.strip()
if match[-1]==')' and not '(' in match:
match = match[0:-1]
return match + appendix
except Exception as ex:
logger.error("error for "+destination )
logger.exception(ex)
return destination
def _create_route(self, trip):
return GtfsRoute(trip.agency, trip.trip_id, trip.route_long_name(), RIDESHARING_ROUTE_TYPE, trip.url, "")
def _create_calendar(self, trip):
# TODO currently, calendar is not provided by Fahrgemeinschaft.de interface.
# We could apply some heuristics like requesting multiple days and extrapolate
# if multiple trips are found, but better would be to have these provided by the
# offical interface. Then validity periods should be provided as well (not
# sure if these are available)
# For fahrgemeinschaft.de, regurlar trips are recognizable via their url
# which contains "regelmaessig". However, we don't know on which days of the week,
# nor until when. As a first guess, if datetime is a mo-fr, we assume each workday,
# if it's sa/su, only this...
feed_start_date = datetime.today()
stop_date = self._convert_stop_date(feed_start_date)
return GtfsCalendar(trip.trip_id, stop_date, self._convert_stop_date(feed_start_date + timedelta(days=31)), *(trip.weekdays))
def _create_calendar_date(self, trip):
return GtfsCalendarDate(trip.trip_id, self._convert_stop_date(trip.start), CALENDAR_DATES_EXCEPTION_TYPE_ADDED)
def _create_trip(self, trip, shape_id):
return GtfsTrip(trip.trip_id, trip.trip_id, trip.trip_id, shape_id, trip.trip_headsign, NO_BIKES_ALLOWED)
def _convert_stop(self, stop):
"""
Converts a stop represented as pandas row to a gtfs stop.
Expected attributes of stop: id, stop_name, x, y (in wgs84)
"""
if stop.id:
id = stop.id
else:
self.stops_counter += 1
id = "tmp-{}".format(self.stops_counter)
stop_name = "k.A." if stop.stop_name is None else stop.stop_name
return GtfsStop(id, stop.y, stop.x, stop_name)
def _append_stops_and_stop_times(self, trip):
# Assumptions:
# arrival_time = departure_time
# pickup_type, drop_off_type for origin: = coordinate/none
# pickup_type, drop_off_type for destination: = none/coordinate
# timepoint = approximate for origin and destination (not sure what consequences this might have for trip planners)
for stop_time in trip.stop_times:
# retrieve stop from stored_stops and mark it to be exported
wkn_stop = self.stored_stops.get(stop_time.stop_id)
if not wkn_stop:
logger.warning("No stop found in stop_store for %s. Will skip stop_time %s of trip %s", stop_time.stop_id, stop_time.stop_sequence, trip.trip_id)
else:
self.stops[stop_time.stop_id] = wkn_stop
# Append stop_time
self.stop_times.append(stop_time)
def _append_shapes(self, trip, shape_id):
counter = 0
for point in trip.path.coordinates:
counter += 1
self.shapes.append(GtfsShape(shape_id, point[0], point[1], counter))
def _stop_hash(self, stop):
return "{}#{}#{}".format(stop.stop_name,stop.x,stop.y)
def _should_always_export(self, stop):
"""
Returns true, if the given stop shall be exported to GTFS,
regardless, if it's part of a trip or not.
This is necessary, as potential stops are required
to be part of the GTFS to be referenced later on
by dynamicly added trips.
"""
if self.bbox:
return (self.bbox[0] <= stop.stop_lon <= self.bbox[2] and
self.bbox[1] <= stop.stop_lat <= self.bbox[3])
else:
return is_carpooling_stop(stop.stop_id, stop.stop_name)
def _load_stored_stop(self, stop):
gtfsstop = self._convert_stop(stop)
stop_hash = self._stop_hash(stop)
self.stored_stops[gtfsstop.stop_id] = gtfsstop
if self._should_always_export(gtfsstop):
self.stops[gtfsstop.stop_id] = gtfsstop
def _get_stop_by_hash(self, stop_hash):
return self.stops.get(stop_hash, self.stored_stops.get(stop_hash))
def _get_or_create_stop(self, stop):
stop_hash = self._stop_hash(stop)
gtfsstop = self.stops.get(stop_hash)
if gtfsstop is None:
gtfsstop = self.stored_stops.get(stop_hash, self._convert_stop(stop))
self.stops[stop_hash] = gtfsstop
return gtfsstop
def _convert_stop_date(self, date_time):
return date_time.strftime("%Y%m%d")
def _write_csvfile(self, gtfsfolder, filename, content):
with open(gtfsfolder+"/"+filename, 'w', newline="\n", encoding="utf-8") as csvfile:
self._write_csv(csvfile, content)
def _write_csv(self, csvfile, content):
if hasattr(content, '_fields'):
writer = csv.DictWriter(csvfile, content._fields)
writer.writeheader()
writer.writerow(content._asdict())
else:
if content:
writer = csv.DictWriter(csvfile, next(iter(content))._fields)
writer.writeheader()
for record in content:
writer.writerow(record._asdict())

View file

@ -1,71 +0,0 @@
from amarillo.models.Carpool import Region
from amarillo.plugins.enhancer.services.gtfs_export import GtfsExport, GtfsFeedInfo, GtfsAgency
from amarillo.plugins.enhancer.services.gtfs import GtfsRtProducer
from amarillo.utils.container import container
from glob import glob
import json
import schedule
import threading
import time
import logging
from datetime import date, timedelta
logger = logging.getLogger(__name__)
regions = {}
for region_file_name in glob('conf/region/*.json'):
with open(region_file_name) as region_file:
dict = json.load(region_file)
region = Region(**dict)
region_id = region.id
regions[region_id] = region
agencies = []
for agency_file_name in glob('conf/agency/*.json'):
with open(agency_file_name) as agency_file:
dict = json.load(agency_file)
agency = GtfsAgency(dict["id"], dict["name"], dict["url"], dict["timezone"], dict["lang"], dict["email"])
agency_id = agency.agency_id
agencies.append(agency)
def run_schedule():
while 1:
try:
schedule.run_pending()
except Exception as e:
logger.exception(e)
time.sleep(1)
def midnight():
container['stops_store'].load_stop_sources()
container['trips_store'].unflag_unrecent_updates()
container['carpools'].purge_outdated_offers()
generate_gtfs()
def generate_gtfs():
logger.info("Generate GTFS")
for region in regions.values():
# TODO make feed producer infos configurable
feed_info = GtfsFeedInfo('mfdz', 'MITFAHR|DE|ZENTRALE', 'http://www.mitfahrdezentrale.de', 'de', 1)
exporter = GtfsExport(
agencies,
feed_info,
container['trips_store'],
container['stops_store'],
region.bbox)
exporter.export(f"data/gtfs/amarillo.{region.id}.gtfs.zip", "data/tmp/")
def generate_gtfs_rt():
logger.info("Generate GTFS-RT")
producer = GtfsRtProducer(container['trips_store'])
for region in regions.values():
rt = producer.export_feed(time.time(), f"data/gtfs/amarillo.{region.id}.gtfsrt", bbox=region.bbox)
def start_schedule():
schedule.every().day.at("00:00").do(midnight)
schedule.every(60).seconds.do(generate_gtfs_rt)
# Create all feeds once at startup
schedule.run_all()
job_thread = threading.Thread(target=run_schedule, daemon=True)
job_thread.start()