8000 Track long outages by hamima-halim · Pull Request #61 · transitmatters/gobble · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Track long outages #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def reduce_update_event(update: dict) -> Tuple:


@tracer.wrap()
def process_event(update, current_stop_state: dict):
def process_event(update: dict, current_stop_state: dict):
"""Process a single event from the MBTA's realtime API."""
(
current_status,
Expand Down Expand Up @@ -101,6 +101,8 @@ def process_event(update, current_stop_state: dict):
prev["updated_at"] = datetime.fromisoformat(prev["updated_at"])

if stop_id is None:
# TODO (hamima): attempt to enrich update with stop information. if successful,
# continue. otherwise, return.
return

is_departure_event, is_arrival_event = arr_or_dep_event(
Expand Down
75 changes: 75 additions & 0 deletions src/stop_guessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from datetime import datetime
import pandas as pd
from typing import Dict, List, Optional

# a data structure of negligent vehicles.
# purge this on a new service date. i dont like that this is a dict--should this be a redis cache? dynamodb? also route shapes

# TODO: should we track degenerate vehicles? the knowledge isnt actionable but it is neat
cache_key_fmt = "{vehicle_label}_{trip_id}"
OUTAGES_BY_VEHICLE_AND_TRIP: Dict[str, List[Dict]] = {}

SAME_OUTAGE_TIMEDELTA = datetime.timedelta(seconds=30)
LONG_OUTAGE_TIMEDELTA = datetime.timedelta(minutes=1)


def _add_update_to_sequence(cache_key: str, update: Dict) -> None:
if cache_key in OUTAGES_BY_VEHICLE_AND_TRIP:
OUTAGES_BY_VEHICLE_AND_TRIP[cache_key].append(update)
else:
OUTAGES_BY_VEHICLE_AND_TRIP[cache_key] = [update]


def _remove(cache_key: str) -> None:
OUTAGES_BY_VEHICLE_AND_TRIP[cache_key] = []


def purge_cache():
nonlocal OUTAGES_BY_VEHICLE_AND_TRIP
OUTAGES_BY_VEHICLE_AND_TRIP = {}


def attempt_enrich_update(update: Dict) -> Optional[Dict]:
# if direction is none, attempt pull from trips.txt
# if stop is null, interpolate along shapes.txt (trip-shape relationship in trips.txt)
# maybe fetch and cache shape?
# if there are a couple of options, tie-break with the schedule?
# if current_status is IN_TRANSIT...
# check previous location stamp. if its TOO CLOSE, it might be stopped.
# if its TOO CLOSE to a stop location, its stopped at a stop.
# otherwise, it might be bogged down in traffic, sitting at a red, whatever.
# update event_type accordingly
return None


def report_outage(update: Dict) -> Optional[pd.DataFrame]:
"""Given an outage event, cache it and potentially try fill the missing information.

If the outage duration is small (<1 minute,) it will return nothing.
For longer outages, attempt to fill the missing information using gtfs data
This might still fail and return nothing.
"""
cache_key = cache_key_fmt.format(vehicle_label=update["vehicle_label"], trip_id=update["trip_id"])
outage_sequence = OUTAGES_BY_VEHICLE_AND_TRIP.get(cache_key, [])
if len(outage_sequence) == 0:
_add_update_to_sequence(cache_key, update)
return None

# compare latest outage timestamp....
last_ts = outage_sequence[-1]["updated_at"]
first_ts = outage_sequence[0]["updated_at"]
current_ts = update["updated_at"]

# if timestamps are not close, remove entire cache entry. this is now the first instance of a new outage.
if current_ts - last_ts >= SAME_OUTAGE_TIMEDELTA:
_remove(cache_key)
_add_update_to_sequence(cache_key, update)
return None
else:
_add_update_to_sequence(cache_key, update)
# if the outage is <1 min, not long enough to bother with shape interpolation yet.
if current_ts - first_ts <= LONG_OUTAGE_TIMEDELTA:
return None
else:
# else, attempt to intuit the stop information via shapes and trip info
return attempt_enrich_update(update)
2 changes: 1 addition & 1 deletion src/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def output_dir_path(route_id: str, direction_id: str, stop_id: str, ts: datetime
delimiter = "_"
mode = "cr"
# rapid transit may rarely have dashes AND SPACES in stop id/route id!
# ex, Green_D_1-Union Square-02
# ex, Green-D_1_Union Square-02
elif route_id in ROUTES_RAPID:
delimiter = "_"
mode = "rapid"
Expand Down
0