8000 [fix] Fallback to union db if index is missing by alberttorosyan · Pull Request #3317 · aimhubio/aim · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[fix] Fallback to union db if index is missing #3317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions aim/cli/runs/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ def update_metrics(ctx, yes):
if not confirmed:
return

index_manager = RepoIndexManager.get_index_manager(repo, disable_monitoring=True)
index_manager = RepoIndexManager.get_index_manager(repo)
hashes = repo.list_all_runs()
for run_hash in tqdm.tqdm(hashes, desc='Updating runs', total=len(hashes)):
meta_tree = repo.request_tree('meta', run_hash, read_only=False, from_union=False)
meta_tree = repo.request_tree('meta', run_hash, read_only=False)
meta_run_tree = meta_tree.subtree(('meta', 'chunks', run_hash))
try:
# check if the Run has already been updated.
Expand Down
4 changes: 2 additions & 2 deletions aim/cli/storage/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def to_3_11(ctx, hashes, yes):
if not confirmed:
return

index_manager = RepoIndexManager.get_index_manager(repo, disable_monitoring=True)
index_manager = RepoIndexManager.get_index_manager(repo)
for run_hash in tqdm(matched_hashes):
try:
run = Run(run_hash, repo=repo)
Expand Down Expand Up @@ -97,7 +97,7 @@ def restore_runs(ctx, hashes, yes):
return

remaining_runs = []
index_manager = RepoIndexManager.get_index_manager(repo, disable_monitoring=True)
index_manager = RepoIndexManager.get_index_manager(repo)
for run_hash in tqdm(matched_hashes):
try:
restore_run_backup(repo, run_hash)
Expand Down
4 changes: 3 additions & 1 deletion aim/cli/up/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ def up(
if profiler:
os.environ[AIM_PROFILER_KEY] = '1'

RepoIndexManager.get_index_manager(repo_inst)
index_mng = RepoIndexManager.get_index_manager(repo_inst)
index_mng.start()

run_status_mng = RunStatusManager(repo_inst)
run_status_mng.start()
try:
Expand Down
4 changes: 1 addition & 3 deletions aim/ext/transport/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,12 @@ def get_tree(**kwargs):
name = kwargs['name']
sub = kwargs['sub']
read_only = kwargs['read_only']
from_union = kwargs['from_union']
index = kwargs['index']
timeout = kwargs['timeout']
no_cache = kwargs.get('no_cache', False)
if index:
return ResourceRef(repo._get_index_tree(name, timeout))
else:
return ResourceRef(repo.request_tree(name, sub, read_only=read_only, from_union=from_union, no_cache=no_cache))
return ResourceRef(repo.request_tree(name, sub, read_only=read_only))


def get_structured_run(hash_, read_only, created_at, **kwargs):
Expand Down
5 changes: 2 additions & 3 deletions aim/sdk/base_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(
if self.read_only:
assert run_hash is not None
self.hash = run_hash
self.meta_tree: TreeView = self.repo.request_tree('meta', read_only=True).subtree('meta')
else:
if run_hash is None:
self.hash = generate_run_hash()
Expand All @@ -48,10 +49,8 @@ def __init__(
raise MissingRunError(f'Cannot find Run {run_hash} in aim Repo {self.repo.path}.')
self._lock = self.repo.request_run_lock(self.hash)
self._lock.lock(force=force_resume)
self.meta_tree: TreeView = self.repo.request_tree('meta', self.hash, read_only=False).subtree('meta')

self.meta_tree: TreeView = self.repo.request_tree(
'meta', self.hash, read_only=read_only, from_union=True
).subtree('meta')
self.meta_run_tree: TreeView = self.meta_tree.subtree('chunks').subtree(self.hash)

self._series_run_trees: Dict[int, TreeView] = None
Expand Down
140 changes: 107 additions & 33 deletions aim/sdk/index_manager.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import hashlib
import logging
import os
import queue
import threading
import time

from pathlib import Path
from typing import Dict

import aimrocks.errors

from aim.sdk.repo import Repo
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from watchdog.observers.api import ObservedWatch
from watchdog.observers.polling import PollingObserver


Expand All @@ -19,12 +23,17 @@
class NewChunkCreatedHandler(FileSystemEventHandler):
def __init__(self, manager):
self.manager = manager
self.known_chunks = set(p.name for p in self.manager.chunks_dir.iterdir() if p.is_dir())

def on_created(self, event):
if event.is_directory and Path(event.src_path).parent == self.manager.chunks_dir:
chunk_name = os.path.basename(event.src_path)
logger.debug(f'Detected new chunk directory: {chunk_name}')
self.manager.monitor_chunk_directory(event.src_path)
def on_modified(self, event):
if event.is_directory and Path(event.src_path) == self.manager.chunks_dir:
current_chunks = set(p.name for p in self.manager.chunks_dir.iterdir() if p.is_dir())
new_chunks = current_chunks - self.known_chunks
for chunk_name in new_chunks:
chunk_path = self.manager.chunks_dir / chunk_name
logger.debug(f'Detected new chunk directory: {chunk_name}')
self.manager.monitor_chunk_directory(chunk_path)
self.known_chunks = current_chunks


class ChunkChangedHandler(FileSystemEventHandler):
Expand Down Expand Up @@ -71,50 +80,86 @@ class RepoIndexManager:
index_manager_pool = {}

@classmethod
def get_index_manager(cls, repo: Repo, disable_monitoring: bool = False):
def get_index_manager(cls, repo: Repo):
mng = cls.index_manager_pool.get(repo.path, None)
if mng is None:
mng = RepoIndexManager(repo, disable_monitoring)
mng = RepoIndexManager(repo)
cls.index_manager_pool[repo.path] = mng
return mng

def __init__(self, repo: Repo, disable_monitoring: bool):
def __init__(self, repo: Repo):
self.repo_path = repo.path
self.repo = repo
self.chunks_dir = Path(self.repo_path) / 'meta' / 'chunks'
self.chunks_dir.mkdir(parents=True, exist_ok=True)

self._corrupted_runs = set()

if not disable_monitoring:
self.indexing_queue = queue.PriorityQueue()
self.lock = threading.Lock()
self.indexing_queue = queue.PriorityQueue()
self.lock = threading.Lock()

self.new_chunk_observer = Observer()
self.chunk_change_observer = PollingObserver()

self.new_chunk_handler = NewChunkCreatedHandler(self)
self.chunk_change_handler = ChunkChangedHandler(self)
F438 self._watches: Dict[str, ObservedWatch] = dict()
self.new_chunk_observer.schedule(self.new_chunk_handler, self.chunks_dir, recursive=False)

self.new_chunk_observer = Observer()
self.chunk_change_observer = PollingObserver()
self._stop_event = threading.Event()
self._index_thread = None
self._monitor_thread = None

self.new_chunk_handler = NewChunkCreatedHandler(self)
self.chunk_change_handler = ChunkChangedHandler(self)
def start(self):
self._stop_event.clear()
self.new_chunk_observer.start()
self.chunk_change_observer.start()

self.new_chunk_observer.schedule(self.new_chunk_handler, self.chunks_dir, recursive=True)
self.new_chunk_observer.start()
if not self._index_thread or not self._index_thread.is_alive():
self._index_thread = threading.Thread(target=self._process_indexing_queue, daemon=True)
self._index_thread.start()

self._monitor_existing_chunks()
self.chunk_change_observer.start()
if not self._monitor_thread or not self._monitor_thread.is_alive():
self._monitor_thread = threading.Thread(target=self._monitor_existing_chunks, daemon=True)
self._monitor_thread.start()

self._reindex_thread = threading.Thread(target=self._process_queue, daemon=True)
self._reindex_thread.start()
def stop(self):
self._stop_event.set()
self.new_chunk_observer.stop()
self.chunk_change_observer.stop()
if self._monitor_thread:
self._monitor_thread.join()
if self._index_thread:
self._index_thread.join()

def _monitor_existing_chunks(self):
for chunk_path in self.chunks_dir.iterdir():
if chunk_path.is_dir():
logger.debug(f'Monitoring existing chunk: {chunk_path}')
self.monitor_chunk_directory(chunk_path)
while not self._stop_event.is_set():
index_db = self.repo.request_tree('meta', read_only=True)
monitored_chunks = set(self._watches.keys())
for chunk_path in self.chunks_dir.iterdir():
if (
chunk_path.is_dir()
and chunk_path.name not in monitored_chunks
and self._is_run_index_outdated(chunk_path.name, index_db)
):
logger.debug(f'Monitoring existing chunk: {chunk_path}')
self.monitor_chunk_directory(chunk_path)
logger.debug(f'Triggering indexing for run {chunk_path.name}')
self.add_run_to_queue(chunk_path.name)
self.repo.container_pool.clear()
time.sleep(5)

def _stop_monitoring_chunk(self, run_hash):
watch = self._watches.pop(run_hash, None)
if watch:
self.chunk_change_observer.unschedule(watch)
logger.debug(f'Stopped monitoring chunk: {run_hash}')

def monitor_chunk_directory(self, chunk_path):
"""Ensure chunk directory is monitored using a single handler."""
if str(chunk_path) not in self.chunk_change_observer._watches:
self.chunk_change_observer.schedule(self.chunk_change_handler, chunk_path, recursive=True)
if chunk_path.name not in self._watches:
watch = self.chunk_change_observer.schedule(self.chunk_change_handler, chunk_path, recursive=True)
self._watches[chunk_path.name] = watch
logger.debug(f'Started monitoring chunk directory: {chunk_path}')
else:
logger.debug(f'Chunk directory already monitored: {chunk_path}')
Expand All @@ -127,8 +172,8 @@ def add_run_to_queue(self, run_hash):
self.indexing_queue.put((timestamp, run_hash))
logger.debug(f'Run {run_hash} added to indexing queue with timestamp {timestamp}')

def _process_queue(self):
while True:
def _process_indexing_queue(self):
while not self._stop_event.is_set():
_, run_hash = self.indexing_queue.get()
logger.debug(f'Indexing run {run_hash}...')
self.index(run_hash)
Expand All @@ -137,12 +182,41 @@ def _process_queue(self):
def index(self, run_hash):
index = self.repo._get_index_tree('meta', 0).view(())
try:
meta_tree = self.repo.request_tree(
'meta', run_hash, read_only=True, from_union=False, no_cache=True, skip_read_optimization=True
).subtree('meta')
run_checksum = self._get_run_checksum(run_hash)
meta_tree = self.repo.request_tree('meta', run_hash, read_only=True, skip_read_optimization=True).subtree(
'meta'
)
meta_run_tree = meta_tree.subtree('chunks').subtree(run_hash)
meta_run_tree.finalize(index=index)
index['index_cache', run_hash] = run_checksum

if meta_run_tree.get('end_time') is not None:
logger.debug(f'Indexing thread detected finished run: {run_hash}. Stopping monitoring...')
self._stop_monitoring_chunk(run_hash)

except (aimrocks.errors.RocksIOError, aimrocks.errors.Corruption):
logger.warning(f"Indexing thread detected corrupted run '{run_hash}'. Skipping.")
logger.warning(f'Indexing thread detected corrupted run: {run_hash}. Skipping.')
self._corrupted_runs.add(run_hash)
return True

def _is_run_index_outdated(self, run_hash, index_db):
return self._get_run_checksum(run_hash) != index_db.get(('index_cache', run_hash))

def _get_run_checksum(self, run_hash):
hash_obj = hashlib.md5()

for root, dirs, files in os.walk(os.path.join(self.chunks_dir, run_hash)):
for name in sorted(files): # sort to ensure consistent order
if name.startswith('LOG'): # skip access logs
continue
filepath = os.path.join(root, name)
try:
stat = os.stat(filepath)
hash_obj.update(filepath.encode('utf-8'))
hash_obj.update(str(stat.st_mtime).encode('utf-8'))
hash_obj.update(str(stat.st_size).encode('utf-8'))
except FileNotFoundError:
# File might have been deleted between os.walk and os.stat
continue

return hash_obj.hexdigest()
Loading
Loading
0