8000 sdk/python: add workaround for Streaming-Cold-GET limitation in Objec… · NVIDIA/aistore@1016bda · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit 1016bda

Browse files
committed
sdk/python: add workaround for Streaming-Cold-GET limitation in ObjectFileReader
Signed-off-by: Ryan Koo <rbk65@cornell.edu>
1 parent 8a4eea5 commit 1016bda

File tree

6 files changed

+184
-44
lines changed

6 files changed

+184
-44
lines changed

python/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ We structure this changelog in accordance with [Keep a Changelog](https://keepac
66

77
---
88

9-
## Unreleased
9+
## [1.13.8] - 2025-05-16
1010

1111
### Changed
1212

1313
- Improve `ObjectFileReader` logging to include the full exception details and traceback when retrying and resuming.
14+
- Update `ObjectFileReader` resume logic to accommodate for limitation w/ the `Streaming-Cold-GET` and read range as to not cause a timeout.
15+
- If the remote object is not cached, resuming via stream is not possible and reading must restart from the beginning.
1416

1517
## [1.13.8] - 2025-05-15
1618

python/aistore/sdk/obj/obj_file/object_file.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from aistore.sdk.utils import get_logger
1212
from aistore.sdk.obj.obj_file.utils import (
1313
handle_broken_stream,
14+
compute_loop_size,
1415
)
1516

1617
logger = get_logger(__name__)
@@ -39,12 +40,13 @@ def __init__(self, content_iterator: ContentIterator, max_resume: int):
3940
self._max_resume = max_resume # Maximum number of resume attempts allowed
4041
self._reset()
4142

42-
def _reset(self):
43+
def _reset(self, retain_resumes: bool = False) -> None:
4344
self._iterable = self._content_iterator.iter()
4445
self._remainder = None
4546
self._resume_position = 0
46-
self._resume_total = 0
4747
self._closed = False
48+
if not retain_resumes:
49+
self._resume_total = 0
4850

4951
@property
5052
def content_iterator(self) -> ContentIterator:
@@ -82,9 +84,13 @@ def read(self, size: Optional[int] = -1) -> bytes:
8284
raise ValueError("I/O operation on closed file.")
8385
if size == 0:
8486
return b""
87+
if size is None:
88+
size = -1
8589

86-
# If size is -1, set it to infinity to read until the end of the stream
87-
size = float("inf") if size == -1 else size
90+
# Cache original requested size in case of reset
91+
original_size = size
92+
93+
size = compute_loop_size(size)
8894
result = []
8995

9096
try:
@@ -119,7 +125,16 @@ def read(self, size: Optional[int] = -1) -> bytes:
119125
self._max_resume,
120126
err,
121127
)
128+
129+
# If the object is remote and not cached, reset the iterator and restart the read operation
130+
# to avoid timeouts when streaming non-cached remote objects with byte ranges (must wait for
131+
# entire object to be cached in-cluster).
132+
if not self._iterable:
133+
self._reset(retain_resumes=True)
134+
size = compute_loop_size(original_size)
135+
122136
continue
137+
123138
except StopIteration:
124139
# End of stream, exit loop
125140
break

python/aistore/sdk/obj/obj_file/utils.py

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
33
#
44

5-
from typing import Iterator, Tuple
5+
from sys import maxsize
6+
from typing import Iterator, Tuple, Optional
67

78
from aistore.sdk.obj.content_iterator import ContentIterator
89
from aistore.sdk.obj.obj_file.errors import ObjectFileReaderMaxResumeError
@@ -11,6 +12,44 @@
1112
logger = get_logger(__name__)
1213

1314

15+
def compute_loop_size(size: int) -> int:
16+
"""
17+
Compute the size of the loop for reading data and return it.
18+
If requested read size is -1, return `sys.maxsize` to loop until `StopIteration`.
19+
20+
Args:
21+
size (int): The requested size to be read.
22+
23+
Returns:
24+
int: The size for the loop.
25+
26+
"""
27+
return maxsize if size == -1 else size
28+
29+
30+
def get_iterator(
31+
content_iterator: ContentIterator, resume_position: int
32+
) -> Optional[Iterator[bytes]]:
33+
"""
34+
Create a new iterator from the content iterator starting at the specified byte position.
35+
Returns None if the object is not cached.
36+
37+
Args:
38+
content_iterator (ContentIterator): The content iterator used to read the data.
39+
resume_position (int): The byte position from which to resume reading.
40+
41+
Returns:
42+
Optional[Iterator[bytes]]: A new iterator starting from the specified byte position.
43+
None if the object is not cached in the bucket.
44+
"""
45+
# If remote object is not cached, start over
46+
if not content_iterator.client.head().present:
47+
return None
48+
# Otherwise, resume from last known position
49+
else:
50+
return content_iterator.iter(offset=resume_position)
51+
52+
1453
def increment_resume(resume_total: int, max_resume: int, err: Exception) -> int:
1554
"""
1655
Increment the number of resume attempts and raise an error if the maximum allowed is exceeded.
@@ -38,7 +77,7 @@ def handle_broken_stream(
3877
resume_total: int,
3978
max_resume: int,
4079
err: Exception,
41-
) -> Tuple[Iterator[bytes], int]:
80+
) -> Tuple[Optional[Iterator[bytes]], int]:
4281
"""
4382
Handle the broken stream/iterator by incrementing the resume count, logging a warning,
4483
and returning a newly instanatiated iterator from the last known position.
@@ -51,7 +90,8 @@ def handle_broken_stream(
5190
err (Exception): The error that caused the resume attempt.
5291
5392
Returns:
54-
Tuple[Iterator[bytes], int]: The new iterator and the updated resume total.
93+
Optional[Iterator[bytes]]: The new iterator. None if the object is not cached.
94+
int: The updated number of resume attempts.
5595
5696
Raises:
5797
ObjectFileReaderMaxResumeError: If the maximum number of resume attempts is exceeded.
@@ -66,6 +106,8 @@ def handle_broken_stream(
66106
exc_info=err,
67107
)
68108

69-
# Create a new iterator from the last read position
70-
new_iter = content_iterator.iter(offset=resume_position)
109+
new_iter = get_iterator(
110+
content_iterator=content_iterator, resume_position=resume_position
111+
)
112+
71113
return new_iter, resume_total

python/aistore/sdk/obj/object_attributes.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
AIS_ACCESS_TIME,
1212
AIS_VERSION,
1313
AIS_CUSTOM_MD,
14+
AIS_PRESENT,
1415
)
1516

1617

@@ -71,6 +72,13 @@ def custom_metadata(self) -> Dict[str, str]:
7172
return self._parse_custom(custom_md_header)
7273
return {}
7374

75+
@property
76+
def present(self) -> bool:
77+
"""
78+
Whether the object is present/cached.
79+
"""
80+
return self._response_headers.get(AIS_PRESENT, "") == "true"
81+
7482
@staticmethod
7583
def _parse_custom(custom_md_header) -> Dict[str, str]:
7684
"""

python/aistore/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.13.8"
1+
__version__ = "1.13.9"

0 commit comments

Comments
 (0)
0