8000 [feat] Mark stalled runs as finished by alberttorosyan · Pull Request #3314 · aimhubio/aim · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[feat] Mark stalled runs as finished #3314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions aim/cli/up/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from aim.sdk.index_manager import RepoIndexManager
from aim.sdk.repo import Repo
from aim.sdk.run_status_manager import RunStatusManager
from aim.sdk.utils import clean_repo_path
from aim.web.configs import (
AIM_ENV_MODE_KEY,
Expand Down Expand Up @@ -124,6 +125,8 @@ def up(
os.environ[AIM_PROFILER_KEY] = '1'

RepoIndexManager.get_index_manager(repo_inst)
run_status_mng = RunStatusManager(repo_inst)
run_status_mng.start()
try:
server_cmd = build_uvicorn_command(
'aim.web.run:app',
Expand Down
23 changes: 16 additions & 7 deletions aim/sdk/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ def get_version(cls, path: str):
def is_remote_path(cls, path: str):
return path.startswith('aim://')

def _get_container(self, name: str, read_only: bool, from_union: bool = False, skip_read_optimization: bool = False) -> Container:
def _get_container(
self, name: str, read_only: bool, from_union: bool = False, skip_read_optimization: bool = False
) -> Container:
# TODO [AT]: refactor get container/tree logic to make it more simple
if self.read_only and not read_only:
raise ValueError('Repo is read-only')
Expand Down Expand Up @@ -317,11 +319,17 @@ def request_tree(
read_only: bool,
from_union: bool = False, # TODO maybe = True by default
no_cache: bool = False,
skip_read_optimization: bool = False
skip_read_optimization: bool = False,
):
if not self.is_remote_repo:
return self.request(name, sub, read_only=read_only, from_union=from_union, no_cache=no_cache,
skip_read_optimization=skip_read_optimization).tree()
return self.request(
name,
sub,
read_only=read_only,
from_union=from_union,
no_cache=no_cache,
skip_read_optimization=skip_read_optimization,
).tree()
else:
return ProxyTree(self._client, name, sub, read_only=read_only, from_union=from_union, no_cache=no_cache)

Expand All @@ -333,7 +341,7 @@ def request(
read_only: bool,
from_union: bool = False, # TODO maybe = True by default
no_cache: bool = False,
skip_read_optimization: bool = False
skip_read_optimization: bool = False,
):
container_config = ContainerConfig(name, sub, read_only)
container_view = self.container_view_pool.get(container_config)
Expand All @@ -344,8 +352,9 @@ def request(
else:
assert sub is not None
path = os.path.join(name, 'chunks', sub)
container = self._get_container(path, read_only=True, from_union=from_union,
skip_read_optimization=skip_read_optimization)
container = self._get_container(
path, read_only=True, from_union=from_union, skip_read_optimization=skip_read_optimization
)
else:
assert sub is not None
path = os.path.join(name, 'chunks', sub)
Expand Down
6 changes: 4 additions & 2 deletions aim/sdk/reporter/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@

class FileManager(object):
@abstractmethod
def poll(self, pattern: str) -> Optional[str]: ...
def poll(self, pattern: str) -> Optional[str]:
...

@abstractmethod
def touch(self, filename: str, cleanup_file_pattern: Optional[str] = None): ...
def touch(self, filename: str, cleanup_file_pattern: Optional[str] = None):
...


class LocalFileManager(FileManager):
Expand Down
95 changes: 95 additions & 0 deletions aim/sdk/run_status_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import time
import os
import datetime
import pytz
import threading
from pathlib import Path

from typing import Iterable

import aimrocks.errors

from aim import Repo
from aim.sdk.run_status_watcher import Event


class RunStatusManager:
INDEXING_GRACE_PERIOD = 10

def __init__(self, repo: Repo, scan_interval: int = 60):
self.repo = repo
self.scan_interval = scan_interval

self.progress_dir = Path(self.repo.path) / 'meta' / 'progress'
self.progress_dir.mkdir(parents=True, exist_ok=True)

self.heartbeat_dir = Path(self.repo.path) / 'check_ins'
self.run_heartbeat_cache = {}

self._stop_event = threa EDBE ding.Event()
self._monitor_thread = None
self._corrupted_runs = set()

def start(self):
if not self._monitor_thread or not self._monitor_thread.is_alive():
self._stop_event.clear()
self._monitor_thread = threading.Thread(target=self._run_forever, daemon=True)
self._monitor_thread.start()

def stop(self):
self._stop_event.set()
if self._monitor_thread:
self._monitor_thread.join()

def _run_forever(self):
while not self._stop_event.is_set():
self.check_and_terminate_stalled_runs()
time.sleep(self.scan_interval)

def _runs_with_progress(self) -> Iterable[str]:
runs_with_progress = filter(lambda x: x not in self._corrupted_runs, os.listdir(self.progress_dir))
run_hashes = sorted(runs_with_progress, key=lambda r: os.path.getmtime(os.path.join(self.progress_dir, r)))
return run_hashes

def check_and_terminate_stalled_runs(self):
for run_hash in self._runs_with_progress():
if self._is_run_stalled(run_hash):
self._mark_run_as_terminated(run_hash)

def _is_run_stalled(self, run_hash: str) -> bool:
stalled = False

heartbeat_files = list(sorted(self.heartbeat_dir.glob(f'{run_hash}-*-progress-*-*'), reverse=True))
if heartbeat_files:
latest_file = heartbeat_files[0].name
last_heartbeat = Event(latest_file)

last_recorded_heartbeat = self.run_heartbeat_cache.get(run_hash)
if last_recorded_heartbeat is None:
# First time seeing a heartbeat for this run; store and move on
self.run_heartbeat_cache[run_hash] = last_heartbeat
elif last_heartbeat.idx > last_recorded_heartbeat.idx:
# Newer heartbeat arrived, so the run isn't stalled
self.run_heartbeat_cache[run_hash] = last_heartbeat
else:
# No new heartbeat event since last time; check if enough time passed
time_passed = time.time() - last_recorded_heartbeat.detected_epoch_time
if (last_recorded_heartbeat.next_event_in + RunStatusManager.INDEXING_GRACE_PERIOD) < time_passed:
stalled = True
else:
stalled = True

return stalled

def _mark_run_as_terminated(self, run_hash: str):
# TODO [AT]: Add run state handling once decided on terms (finished, terminated, aborted, etc.)
try:
meta_run_tree = self.repo.request_tree('meta', run_hash, read_only=False).subtree(
('meta', 'chunks', run_hash)
)
if meta_run_tree.get('end_time') is None:
meta_run_tree['end_time'] = datetime.datetime.now(pytz.utc).timestamp()
progress_path = self.progress_dir / run_hash
progress_path.unlink(missing_ok=True)
except (aimrocks.errors.RocksIOError, aimrocks.errors.Corruption):
self._corrupted_runs.add(run_hash)
9 changes: 6 additions & 3 deletions aim/sdk/run_status_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,16 @@ def __init__(self, *, obj_idx: Optional[str] = None, rank: Optional[int] = None,
self.message = message

@abstractmethod
def is_sent(self): ...
def is_sent(self):
...

@abstractmethod
def update_last_sent(self): ...
def update_last_sent(self):
...

@abstractmethod
def get_msg_details(self): ...
def get_msg_details(self):
...


class StatusNotification(Notification):
Expand Down
12 changes: 8 additions & 4 deletions aim/storage/arrayview.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ class ArrayView:
when index values are not important.
"""

def __iter__(self) -> Iterator[Any]: ...
def __iter__(self) -> Iterator[Any]:
...

def keys(self) -> Iterator[int]:
"""Return sparse indices iterator.
Expand Down Expand Up @@ -43,13 +44,16 @@ def items(self) -> Iterator[Tuple[int, Any]]:
"""
...

def __len__(self) -> int: ...
def __len__(self) -> int:
...

def __getitem__(self, idx: Union[int, slice]): ...
def __getitem__(self, idx: Union[int, slice]):
...

# TODO implement append

def __setitem__(self, idx: int, val: Any): ...
def __setitem__(self, idx: int, val: Any):
...

def sparse_list(self) -> Tuple[List[int], List[Any]]:
"""Get sparse indices and values as :obj:`list`s."""
Expand Down
9 changes: 6 additions & 3 deletions aim/storage/artifacts/artifact_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ def __init__(self, url: str):
self.url = url

@abstractmethod
def upload_artifact(self, file_path: str, artifact_path: str, block: bool = False): ...
def upload_artifact(self, file_path: str, artifact_path: str, block: bool = False):
...

@abstractmethod
def download_artifact(self, artifact_path: str, dest_dir: Optional[str] = None) -> str: ...
def download_artifact(self, artifact_path: str, dest_dir: Optional[str] = None) -> str:
...

@abstractmethod
def delete_artifact(self, artifact_path: str): ...
def delete_artifact(self, artifact_path: str):
...
10000
6 changes: 4 additions & 2 deletions aim/storage/inmemorytreeview.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ def iterlevel(
def array(self, path: Union[AimObjectKey, AimObjectPath] = (), dtype: Any = None) -> TreeArrayView:
return TreeArrayView(self.subtree(path), dtype=dtype)

def first_key(self, path: Union[AimObjectKey, AimObjectPath] = ()) -> AimObjectKey: ...
def first_key(self, path: Union[AimObjectKey, AimObjectPath] = ()) -> AimObjectKey:
...

def last_key(self, path: Union[AimObjectKey, AimObjectPath] = ()) -> AimObjectKey: ...
def last_key(self, path: Union[AimObjectKey, AimObjectPath] = ()) -> AimObjectKey:
...
3 changes: 2 additions & 1 deletion aim/storage/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def __init__(self, expr: str):
self.expr = expr

@abstractmethod
def check(self, **params) -> bool: ...
def check(self, **params) -> bool:
...

def __call__(self, **params):
return self.check(**params)
Expand Down
12 changes: 5 additions & 7 deletions aim/storage/rockscontainer.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class RocksAutoClean(AutoClean):
super().__init__(instance)
self._lock = None
self._db = None
self._progress_path = None

def _close(self):
"""
Expand All @@ -48,6 +49,9 @@ class RocksAutoClean(AutoClean):
self._db = None
self._lock.release()
self._lock = None
if self._progress_path is not None:
self._progress_path.unlink(missing_ok=True)
self._progress_path = None
if self._db is not None:
self._db = None

Expand Down Expand Up @@ -104,6 +108,7 @@ class RocksContainer(Container):
if not self.read_only:
progress_dir.mkdir(parents=True, exist_ok=True)
self._progress_path.touch(exist_ok=True)
self._resources._progress_path = self._progress_path

self.db
# TODO check if Containers are reopenable
Expand Down Expand Up @@ -159,16 +164,9 @@ class RocksContainer(Container):
Store the collection of `(key, value)` records in the :obj:`Container`
`index` for fast reads.
"""
if not self._progress_path:
return

for k, v in self.items():
index[k] = v

if self._progress_path.exists():
self._progress_path.unlink()
self._progress_path = None

def close(self):
"""Close all the resources."""
if self._resources is None:
Expand Down
Loading
Loading
0