added GTFS export code from enhancer

This commit is contained in:
Csaba 2024-02-08 12:55:30 +01:00 committed by Holger Bruch
parent fbb3c0f6d8
commit 8148df41e8
11 changed files with 594 additions and 1 deletions

View file

View file

@ -0,0 +1,137 @@
import amarillo.plugins.gtfs_export.gtfsrt.gtfs_realtime_pb2 as gtfs_realtime_pb2
import amarillo.plugins.gtfs_export.gtfsrt.realtime_extension_pb2 as mfdzrte
from amarillo.plugins.gtfs_export.gtfs_constants import *
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import ParseDict
from datetime import datetime, timedelta
import json
import re
import time
class GtfsRtProducer():
def __init__(self, trip_store):
self.trip_store = trip_store
def generate_feed(self, time, format='protobuf', bbox=None):
# See https://developers.google.com/transit/gtfs-realtime/reference
# https://github.com/mfdz/carpool-gtfs-rt/blob/master/src/main/java/de/mfdz/resource/CarpoolResource.java
gtfsrt_dict = {
'header': {
'gtfsRealtimeVersion': '1.0',
'timestamp': int(time)
},
'entity': self._get_trip_updates(bbox)
}
feed = gtfs_realtime_pb2.FeedMessage()
ParseDict(gtfsrt_dict, feed)
if "message" == format.lower():
return feed
elif "json" == format.lower():
return MessageToDict(feed)
else:
return feed.SerializeToString()
def export_feed(self, timestamp, file_path, bbox=None):
"""
Exports gtfs-rt feed as .json and .pbf file to file_path
"""
feed = self.generate_feed(timestamp, "message", bbox)
with open(f"{file_path}.pbf", "wb") as f:
f.write(feed.SerializeToString())
with open(f"{file_path}.json", "w") as f:
json.dump(MessageToDict(feed), f)
def _get_trip_updates(self, bbox = None):
trips = []
trips.extend(self._get_added(bbox))
trips.extend(self._get_deleted(bbox))
trip_updates = []
for num, trip in enumerate(trips):
trip_updates.append( {
'id': f'carpool-update-{num}',
'tripUpdate': trip
}
)
return trip_updates
def _get_deleted(self, bbox = None):
return self._get_updates(
self.trip_store.recently_deleted_trips(),
self._as_delete_updates,
bbox)
def _get_added(self, bbox = None):
return self._get_updates(
self.trip_store.recently_added_trips(),
self._as_added_updates,
bbox)
def _get_updates(self, trips, update_func, bbox = None):
updates = []
today = datetime.today()
for t in trips:
if bbox == None or t.intersects(bbox):
updates.extend(update_func(t, today))
return updates
def _as_delete_updates(self, trip, fromdate):
return [{
'trip': {
'tripId': trip.trip_id,
'startTime': trip.start_time_str(),
'startDate': trip_date,
'scheduleRelationship': 'CANCELED',
'routeId': trip.trip_id
}
} for trip_date in trip.next_trip_dates(fromdate)]
def _to_seconds(self, fromdate, stop_time):
startdate = datetime.strptime(fromdate, '%Y%m%d')
m = re.search(r'(\d+):(\d+):(\d+)', stop_time)
delta = timedelta(
hours=int(m.group(1)),
minutes=int(m.group(2)),
seconds=int(m.group(3)))
return time.mktime((startdate + delta).timetuple())
def _to_stop_times(self, trip, fromdate):
return [{
'stopSequence': stoptime.stop_sequence,
'arrival': {
'time': self._to_seconds(fromdate, stoptime.arrival_time),
'uncertainty': MFDZ_DEFAULT_UNCERTAINITY
},
'departure': {
'time': self._to_seconds(fromdate, stoptime.departure_time),
'uncertainty': MFDZ_DEFAULT_UNCERTAINITY
},
'stopId': stoptime.stop_id,
'scheduleRelationship': 'SCHEDULED',
'stop_time_properties': {
'[transit_realtime.stop_time_properties]': {
'dropoffType': 'COORDINATE_WITH_DRIVER' if stoptime.drop_off_type == STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER else 'NONE',
'pickupType': 'COORDINATE_WITH_DRIVER' if stoptime.pickup_type == STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER else 'NONE'
}
}
}
for stoptime in trip.stop_times]
def _as_added_updates(self, trip, fromdate):
return [{
'trip': {
'tripId': trip.trip_id,
'startTime': trip.start_time_str(),
'startDate': trip_date,
'scheduleRelationship': 'ADDED',
'routeId': trip.trip_id,
'[transit_realtime.trip_descriptor]': {
'routeUrl' : trip.url,
'agencyId' : trip.agency,
'route_long_name' : trip.route_long_name(),
'route_type': RIDESHARING_ROUTE_TYPE
}
},
'stopTimeUpdate': self._to_stop_times(trip, trip_date)
} for trip_date in trip.next_trip_dates(fromdate)]

View file

@ -0,0 +1,14 @@
# Constants
NO_BIKES_ALLOWED = 2
RIDESHARING_ROUTE_TYPE = 1551
CALENDAR_DATES_EXCEPTION_TYPE_ADDED = 1
CALENDAR_DATES_EXCEPTION_TYPE_REMOVED = 2
STOP_TIMES_STOP_TYPE_REGULARLY = 0
STOP_TIMES_STOP_TYPE_NONE = 1
STOP_TIMES_STOP_TYPE_PHONE_AGENCY = 2
STOP_TIMES_STOP_TYPE_COORDINATE_DRIVER = 3
STOP_TIMES_TIMEPOINT_APPROXIMATE = 0
STOP_TIMES_TIMEPOINT_EXACT = 1
MFDZ_DEFAULT_UNCERTAINITY = 600

View file

@ -0,0 +1,229 @@
from collections.abc import Iterable
from datetime import datetime, timedelta
from zipfile import ZipFile
import csv
import gettext
import logging
import re
from amarillo.utils.utils import assert_folder_exists
from amarillo.plugins.gtfs_export.models.gtfs import GtfsTimeDelta, GtfsFeedInfo, GtfsAgency, GtfsRoute, GtfsStop, GtfsStopTime, GtfsTrip, GtfsCalendar, GtfsCalendarDate, GtfsShape
from amarillo.plugins.enhancer.services.stops import is_carpooling_stop
from amarillo.plugins.gtfs_export.gtfs_constants import *
logger = logging.getLogger(__name__)
class GtfsExport:
stops_counter = 0
trips_counter = 0
routes_counter = 0
stored_stops = {}
def __init__(self, agencies, feed_info, ridestore, stopstore, bbox = None):
self.stops = {}
self.routes = []
self.calendar_dates = []
self.calendar = []
self.trips = []
self.stop_times = []
self.calendar = []
self.shapes = []
self.agencies = agencies
self.feed_info = feed_info
self.localized_to = " nach "
self.localized_short_name = "Mitfahrgelegenheit"
self.stopstore = stopstore
self.ridestore = ridestore
self.bbox = bbox
def export(self, gtfszip_filename, gtfsfolder):
assert_folder_exists(gtfsfolder)
self._prepare_gtfs_feed(self.ridestore, self.stopstore)
self._write_csvfile(gtfsfolder, 'agency.txt', self.agencies)
self._write_csvfile(gtfsfolder, 'feed_info.txt', self.feed_info)
self._write_csvfile(gtfsfolder, 'routes.txt', self.routes)
self._write_csvfile(gtfsfolder, 'trips.txt', self.trips)
self._write_csvfile(gtfsfolder, 'calendar.txt', self.calendar)
self._write_csvfile(gtfsfolder, 'calendar_dates.txt', self.calendar_dates)
self._write_csvfile(gtfsfolder, 'stops.txt', self.stops.values())
self._write_csvfile(gtfsfolder, 'stop_times.txt', self.stop_times)
self._write_csvfile(gtfsfolder, 'shapes.txt', self.shapes)
self._zip_files(gtfszip_filename, gtfsfolder)
def _zip_files(self, gtfszip_filename, gtfsfolder):
gtfsfiles = ['agency.txt', 'feed_info.txt', 'routes.txt', 'trips.txt',
'calendar.txt', 'calendar_dates.txt', 'stops.txt', 'stop_times.txt', 'shapes.txt']
with ZipFile(gtfszip_filename, 'w') as gtfszip:
for gtfsfile in gtfsfiles:
gtfszip.write(gtfsfolder+'/'+gtfsfile, gtfsfile)
def _prepare_gtfs_feed(self, ridestore, stopstore):
"""
Prepares all gtfs objects in memory before they are written
to their respective streams.
For all wellknown stops a GTFS stop is created and
afterwards all ride offers are transformed into their
gtfs equivalents.
"""
for stopSet in stopstore.stopsDataFrames:
for stop in stopSet["stops"].itertuples():
self._load_stored_stop(stop)
cloned_trips = dict(ridestore.trips)
for url, trip in cloned_trips.items():
if self.bbox is None or trip.intersects(self.bbox):
self._convert_trip(trip)
def _convert_trip(self, trip):
self.routes_counter += 1
self.routes.append(self._create_route(trip))
self.calendar.append(self._create_calendar(trip))
if not trip.runs_regularly:
self.calendar_dates.append(self._create_calendar_date(trip))
self.trips.append(self._create_trip(trip, self.routes_counter))
self._append_stops_and_stop_times(trip)
self._append_shapes(trip, self.routes_counter)
def _trip_headsign(self, destination):
destination = destination.replace('(Deutschland)', '')
destination = destination.replace(', Deutschland', '')
appendix = ''
if 'Schweiz' in destination or 'Switzerland' in destination:
appendix = ', Schweiz'
destination = destination.replace('(Schweiz)', '')
destination = destination.replace(', Schweiz', '')
destination = destination.replace('(Switzerland)', '')
try:
matches = re.match(r"(.*,)? ?(\d{4,5})? ?(.*)", destination)
match = matches.group(3).strip() if matches != None else destination.strip()
if match[-1]==')' and not '(' in match:
match = match[0:-1]
return match + appendix
except Exception as ex:
logger.error("error for "+destination )
logger.exception(ex)
return destination
def _create_route(self, trip):
return GtfsRoute(trip.agency, trip.trip_id, trip.route_long_name(), RIDESHARING_ROUTE_TYPE, trip.url, "")
def _create_calendar(self, trip):
# TODO currently, calendar is not provided by Fahrgemeinschaft.de interface.
# We could apply some heuristics like requesting multiple days and extrapolate
# if multiple trips are found, but better would be to have these provided by the
# offical interface. Then validity periods should be provided as well (not
# sure if these are available)
# For fahrgemeinschaft.de, regurlar trips are recognizable via their url
# which contains "regelmaessig". However, we don't know on which days of the week,
# nor until when. As a first guess, if datetime is a mo-fr, we assume each workday,
# if it's sa/su, only this...
feed_start_date = datetime.today()
stop_date = self._convert_stop_date(feed_start_date)
return GtfsCalendar(trip.trip_id, stop_date, self._convert_stop_date(feed_start_date + timedelta(days=31)), *(trip.weekdays))
def _create_calendar_date(self, trip):
return GtfsCalendarDate(trip.trip_id, self._convert_stop_date(trip.start), CALENDAR_DATES_EXCEPTION_TYPE_ADDED)
def _create_trip(self, trip, shape_id):
return GtfsTrip(trip.trip_id, trip.trip_id, trip.trip_id, shape_id, trip.trip_headsign, NO_BIKES_ALLOWED)
def _convert_stop(self, stop):
"""
Converts a stop represented as pandas row to a gtfs stop.
Expected attributes of stop: id, stop_name, x, y (in wgs84)
"""
if stop.id:
id = stop.id
else:
self.stops_counter += 1
id = "tmp-{}".format(self.stops_counter)
stop_name = "k.A." if stop.stop_name is None else stop.stop_name
return GtfsStop(id, stop.y, stop.x, stop_name)
def _append_stops_and_stop_times(self, trip):
# Assumptions:
# arrival_time = departure_time
# pickup_type, drop_off_type for origin: = coordinate/none
# pickup_type, drop_off_type for destination: = none/coordinate
# timepoint = approximate for origin and destination (not sure what consequences this might have for trip planners)
for stop_time in trip.stop_times:
# retrieve stop from stored_stops and mark it to be exported
wkn_stop = self.stored_stops.get(stop_time.stop_id)
if not wkn_stop:
logger.warning("No stop found in stop_store for %s. Will skip stop_time %s of trip %s", stop_time.stop_id, stop_time.stop_sequence, trip.trip_id)
else:
self.stops[stop_time.stop_id] = wkn_stop
# Append stop_time
self.stop_times.append(stop_time)
def _append_shapes(self, trip, shape_id):
counter = 0
for point in trip.path.coordinates:
counter += 1
self.shapes.append(GtfsShape(shape_id, point[0], point[1], counter))
def _stop_hash(self, stop):
return "{}#{}#{}".format(stop.stop_name,stop.x,stop.y)
def _should_always_export(self, stop):
"""
Returns true, if the given stop shall be exported to GTFS,
regardless, if it's part of a trip or not.
This is necessary, as potential stops are required
to be part of the GTFS to be referenced later on
by dynamicly added trips.
"""
if self.bbox:
return (self.bbox[0] <= stop.stop_lon <= self.bbox[2] and
self.bbox[1] <= stop.stop_lat <= self.bbox[3])
else:
return is_carpooling_stop(stop.stop_id, stop.stop_name)
def _load_stored_stop(self, stop):
gtfsstop = self._convert_stop(stop)
stop_hash = self._stop_hash(stop)
self.stored_stops[gtfsstop.stop_id] = gtfsstop
if self._should_always_export(gtfsstop):
self.stops[gtfsstop.stop_id] = gtfsstop
def _get_stop_by_hash(self, stop_hash):
return self.stops.get(stop_hash, self.stored_stops.get(stop_hash))
def _get_or_create_stop(self, stop):
stop_hash = self._stop_hash(stop)
gtfsstop = self.stops.get(stop_hash)
if gtfsstop is None:
gtfsstop = self.stored_stops.get(stop_hash, self._convert_stop(stop))
self.stops[stop_hash] = gtfsstop
return gtfsstop
def _convert_stop_date(self, date_time):
return date_time.strftime("%Y%m%d")
def _write_csvfile(self, gtfsfolder, filename, content):
with open(gtfsfolder+"/"+filename, 'w', newline="\n", encoding="utf-8") as csvfile:
self._write_csv(csvfile, content)
def _write_csv(self, csvfile, content):
if hasattr(content, '_fields'):
writer = csv.DictWriter(csvfile, content._fields)
writer.writeheader()
writer.writerow(content._asdict())
else:
if content:
writer = csv.DictWriter(csvfile, next(iter(content))._fields)
writer.writeheader()
for record in content:
writer.writerow(record._asdict())

View file

@ -0,0 +1,71 @@
from amarillo.models.Carpool import Region
from amarillo.plugins.gtfs_export.gtfs_export import GtfsExport, GtfsFeedInfo, GtfsAgency
from amarillo.plugins.gtfs_export.gtfs import GtfsRtProducer
from amarillo.utils.container import container
from glob import glob
import json
import schedule
import threading
import time
import logging
from datetime import date, timedelta
logger = logging.getLogger(__name__)
regions = {}
for region_file_name in glob('conf/region/*.json'):
with open(region_file_name) as region_file:
dict = json.load(region_file)
region = Region(**dict)
region_id = region.id
regions[region_id] = region
agencies = []
for agency_file_name in glob('conf/agency/*.json'):
with open(agency_file_name) as agency_file:
dict = json.load(agency_file)
agency = GtfsAgency(dict["id"], dict["name"], dict["url"], dict["timezone"], dict["lang"], dict["email"])
agency_id = agency.agency_id
agencies.append(agency)
def run_schedule():
while 1:
try:
schedule.run_pending()
except Exception as e:
logger.exception(e)
time.sleep(1)
def midnight():
container['stops_store'].load_stop_sources()
container['trips_store'].unflag_unrecent_updates()
container['carpools'].purge_outdated_offers()
generate_gtfs()
def generate_gtfs():
logger.info("Generate GTFS")
for region in regions.values():
# TODO make feed producer infos configurable
feed_info = GtfsFeedInfo('mfdz', 'MITFAHR|DE|ZENTRALE', 'http://www.mitfahrdezentrale.de', 'de', 1)
exporter = GtfsExport(
agencies,
feed_info,
container['trips_store'],
container['stops_store'],
region.bbox)
exporter.export(f"data/gtfs/amarillo.{region.id}.gtfs.zip", "data/tmp/")
def generate_gtfs_rt():
logger.info("Generate GTFS-RT")
producer = GtfsRtProducer(container['trips_store'])
for region in regions.values():
rt = producer.export_feed(time.time(), f"data/gtfs/amarillo.{region.id}.gtfsrt", bbox=region.bbox)
def start_schedule():
schedule.every().day.at("00:00").do(midnight)
schedule.every(60).seconds.do(generate_gtfs_rt)
# Create all feeds once at startup
schedule.run_all()
job_thread = threading.Thread(target=run_schedule, daemon=True)
job_thread.start()

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: realtime_extension.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
import amarillo.plugins.enhancer.services.gtfsrt.gtfs_realtime_pb2 as gtfs__realtime__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x18realtime_extension.proto\x12\x10transit_realtime\x1a\x13gtfs-realtime.proto\"p\n\x1bMfdzTripDescriptorExtension\x12\x11\n\troute_url\x18\x01 \x01(\t\x12\x11\n\tagency_id\x18\x02 \x01(\t\x12\x17\n\x0froute_long_name\x18\x03 \x01(\t\x12\x12\n\nroute_type\x18\x04 \x01(\r\"\xb0\x02\n\x1fMfdzStopTimePropertiesExtension\x12X\n\x0bpickup_type\x18\x01 \x01(\x0e\x32\x43.transit_realtime.MfdzStopTimePropertiesExtension.DropOffPickupType\x12Y\n\x0c\x64ropoff_type\x18\x02 \x01(\x0e\x32\x43.transit_realtime.MfdzStopTimePropertiesExtension.DropOffPickupType\"X\n\x11\x44ropOffPickupType\x12\x0b\n\x07REGULAR\x10\x00\x12\x08\n\x04NONE\x10\x01\x12\x10\n\x0cPHONE_AGENCY\x10\x02\x12\x1a\n\x16\x43OORDINATE_WITH_DRIVER\x10\x03:i\n\x0ftrip_descriptor\x12 .transit_realtime.TripDescriptor\x18\xf5\x07 \x01(\x0b\x32-.transit_realtime.MfdzTripDescriptorExtension:\x90\x01\n\x14stop_time_properties\x12>.transit_realtime.TripUpdate.StopTimeUpdate.StopTimeProperties\x18\xf5\x07 \x01(\x0b\x32\x31.transit_realtime.MfdzStopTimePropertiesExtensionB\t\n\x07\x64\x65.mfdz')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'realtime_extension_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
gtfs__realtime__pb2.TripDescriptor.RegisterExtension(trip_descriptor)
gtfs__realtime__pb2.TripUpdate.StopTimeUpdate.StopTimeProperties.RegisterExtension(stop_time_properties)
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\007de.mfdz'
_MFDZTRIPDESCRIPTOREXTENSION._serialized_start=67
_MFDZTRIPDESCRIPTOREXTENSION._serialized_end=179
_MFDZSTOPTIMEPROPERTIESEXTENSION._serialized_start=182
_MFDZSTOPTIMEPROPERTIESEXTENSION._serialized_end=486
_MFDZSTOPTIMEPROPERTIESEXTENSION_DROPOFFPICKUPTYPE._serialized_start=398
_MFDZSTOPTIMEPROPERTIESEXTENSION_DROPOFFPICKUPTYPE._serialized_end=486
# @@protoc_insertion_point(module_scope)

View file

@ -0,0 +1,30 @@
# TODO: move to enhancer
from collections import namedtuple
from datetime import timedelta
GtfsFeedInfo = namedtuple('GtfsFeedInfo', 'feed_id feed_publisher_name feed_publisher_url feed_lang feed_version')
GtfsAgency = namedtuple('GtfsAgency', 'agency_id agency_name agency_url agency_timezone agency_lang agency_email')
GtfsRoute = namedtuple('GtfsRoute', 'agency_id route_id route_long_name route_type route_url route_short_name')
GtfsStop = namedtuple('GtfsStop', 'stop_id stop_lat stop_lon stop_name')
GtfsStopTime = namedtuple('GtfsStopTime', 'trip_id departure_time arrival_time stop_id stop_sequence pickup_type drop_off_type timepoint')
GtfsTrip = namedtuple('GtfsTrip', 'route_id trip_id service_id shape_id trip_headsign bikes_allowed')
GtfsCalendar = namedtuple('GtfsCalendar', 'service_id start_date end_date monday tuesday wednesday thursday friday saturday sunday')
GtfsCalendarDate = namedtuple('GtfsCalendarDate', 'service_id date exception_type')
GtfsShape = namedtuple('GtfsShape','shape_id shape_pt_lon shape_pt_lat shape_pt_sequence')
# TODO Move to utils
class GtfsTimeDelta(timedelta):
def __str__(self):
seconds = self.total_seconds()
hours = seconds // 3600
minutes = (seconds % 3600) // 60
seconds = seconds % 60
str = '{:02d}:{:02d}:{:02d}'.format(int(hours), int(minutes), int(seconds))
return (str)
def __add__(self, other):
if isinstance(other, timedelta):
return self.__class__(self.days + other.days,
self.seconds + other.seconds,
self.microseconds + other.microseconds)
return NotImplemented

View file

@ -1,2 +1 @@
watchdog==3.0.0
schedule==1.2.1 schedule==1.2.1