8000 python: add integration tests for 'Streaming-Cold-GET' feature · NVIDIA/aistore@ff78492 · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit ff78492

Browse files
python: add integration tests for 'Streaming-Cold-GET' feature
Signed-off-by: Abhishek Gaikwad <gaikwadabhishek1997@gmail.com>
1 parent 24029f9 commit ff78492

File tree

1 file changed

+261
-0
lines changed

1 file changed

+261
-0
lines changed
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
#
2+
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
3+
#
4+
5+
import json
6+
import unittest
7+
import logging
8+
9+
from typing import Optional
10+
import pytest
11+
from requests.exceptions import ConnectionError as RequestsConnectionError, Timeout
12+
13+
from tests.integration import REMOTE_SET, REMOTE_BUCKET, CLUSTER_ENDPOINT
14+
from tests.integration.sdk import TEST_RETRY_CONFIG, DEFAULT_TEST_CLIENT
15+
from tests.utils import random_string
16+
from tests.const import MIB, GIB
17+
18+
from aistore.sdk import Bucket, Object, Client
19+
from aistore.sdk.const import HTTP_METHOD_PATCH
20+
21+
22+
logging.basicConfig(level=logging.DEBUG)
23+
logger = logging.getLogger(__name__)
24+
25+
STREAMING_COLD_GET = 1 << 12 # 4096
26+
27+
28+
# pylint: disable=too-few-public-methods
29+
class MidStreamDropper:
30+
"""
31+
Simulates a mid-stream disconnect by raising `ConnectionResetError` after a
32+
specified number of bytes have been read.
33+
"""
34+
35+
def __init__(self, stream, fail_after_bytes):
36+
self.stream = stream
37+
self.read_bytes = 0
38+
self.fail_after = fail_after_bytes
39+
40+
def read(self, size=-1):
41+
# Simulate fail-before-read if limit already crossed
42+
if self.read_bytes >= self.fail_after:
43+
raise ConnectionResetError("Simulated mid-read disconnect")
44+
45+
chunk = self.stream.read(size)
46+
47+
self.read_bytes += len(chunk)
48+
49+
# Simulate fail-after-read if this chunk crosses the limit
50+
if self.read_bytes >= self.fail_after:
51+
raise ConnectionResetError("Simulated mid-read disconnect")
52+
53+
return chunk
54+
55+
56+
class TestStreamingColdGet(unittest.TestCase):
57+
bucket: Optional[Bucket] = None
58+
object: Optional[Object] = None
59+
OBJECT_NAME = f"TestStreamingColdGet-{random_string(6)}"
60+
OBJECT_SIZE = GIB # 1 GiB object for testing
61+
62+
@classmethod
63+
def setUpClass(cls) -> None:
64+
if not REMOTE_SET:
65+
return
66+
67+
cls.client = DEFAULT_TEST_CLIENT
68+
69+
provider, bucket_name = REMOTE_BUCKET.split("://")
70+
cls.bucket = cls.client.bucket(bucket_name, provider=provider)
71+
cls.object = cls.bucket.object(cls.OBJECT_NAME)
72+
73+
# Change client's timeout to None for the initial upload to avoid timeout
74+
# issues on large objects
75+
Client(CLUSTER_ENDPOINT, timeout=None).bucket(
76+
bucket_name, provider=provider
77+
).object(cls.OBJECT_NAME).get_writer().put_content(
78+
content=b"0" * cls.OBJECT_SIZE
79+
)
80+
81+
@classmethod
82+
def tearDownClass(cls) -> None:
83+
if not REMOTE_SET:
84+
return
85+
cls.object.delete()
86+
87+
def setUp(self) -> None:
88+
# Evict the object before each test
89+
eviction_job = self.bucket.objects(obj_names=[self.OBJECT_NAME]).evict()
90+
self.client.job(job_id=eviction_job).wait()
91+
92+
self.bucket_uri = f"{self.bucket.provider.value}://{self.bucket.name}"
93+
94+
# Enable Streaming-Cold-GET for the bucket before each test
95+
self.toggle_streaming_cold_get(enable=True)
96+
features = self.get_bucket_features()
97+
self.assertTrue(
98+
features & STREAMING_COLD_GET,
99+
"Streaming-Cold-GET feature is not enabled on the bucket.",
100+
)
101+
102+
def toggle_streaming_cold_get(self, enable: bool = True) -> None:
103+
feature_value = str(STREAMING_COLD_GET) if enable else "0"
104+
105+
self.bucket.make_request(
106+
method=HTTP_METHOD_PATCH,
107+
action="set-bprops",
108+
value={"features": feature_value},
109+
)
110+
111+
def get_bucket_features(self) -> int:
112+
"""
113+
Get the features value from the bucket info.
114+
"""
115+
bucket_info_str = self.bucket.info()[0]
116+
try:
117+
bucket_props = json.loads(bucket_info_str)
118+
return int(bucket_props.get("features", 0))
119+
except json.JSONDecodeError as exc:
120+
raise ValueError("Failed to parse bucket info JSON") from exc
121+
122+
@unittest.skipUnless(REMOTE_SET, "Remote bucket is not set")
123+
@pytest.mark.extended
124+
def test_streaming_cold_get_enabled_read_all(self):
125+
"""
126+
Test that the object content size matches the expected size
127+
when reading with Streaming-Cold-GET enabled.
128+
129+
Even if we are trying to read the entire object, the streaming
130+
cold get feature sends 200 response as soon as it starts reading.
131+
"""
132+
content = self.object.get_reader().read_all()
133+
self.assertEqual(
134+
len(content),
135+
self.OBJECT_SIZE,
136+
"Object content size mismatch when reading with Streaming-Cold-GET enabled.",
137+
)
138+
139+
@unittest.skipUnless(REMOTE_SET, "Remote bucket is not set")
140+
@pytest.mark.extended
141+
def test_streaming_cold_get_enabled_read_raw(self):
142+
"""
143+
Test that the object content size matches the expected size
144+
when reading with Streaming-Cold-GET enabled.
145+
"""
146+
chunk = (
147+
self.object.get_reader()
148+
.raw()
149+
.read(MIB) # Read the first 1 MB chunk directly using raw
150+
)
151+
self.assertEqual(len(chunk), MIB, "No initial chunk received.")
152+
153+
@unittest.skipUnless(REMOTE_SET, "Remote bucket is not set")
154+
@pytest.mark.extended
155+
def test_streaming_cold_get_enabled_obj_file(self):
156+
"""
157+
Test if we receive the first chunk before the timeout.
158+
"""
159+
reader = self.object.get_reader().as_file()
160+
first_chunk = reader.read(MIB) # Explicitly read first chunk (1 MB)
161+
162+
self.assertEqual(len(first_chunk), MIB, "No initial chunk received.")
163+
164+
@unittest.skipUnless(REMOTE_SET, "Remote bucket is not set")
165+
@pytest.mark.extended
166+
def test_streaming_cold_get_disabled(self):
167+
"""
168+
Test that a `ConnectionError` (here `RequestsConnectionError`) is raised when attempting to read
169+
a huge object with Streaming-Cold-GET disabled.
170+
"""
171+
# Disable Streaming-Cold-GET on the bucket
172+
self.toggle_streaming_cold_get(enable=False)
173+
174+
features = self.get_bucket_features()
175+
self.assertEqual(
176+
features, 0, "Streaming-Cold-GET was not disabled on the bucket."
177+
)
178+
179+
# Read the object with smaller timeout to trigger the connection error
180+
obj = (
181+
Client(
182+
CLUSTER_ENDPOINT,
183+
retry_config=TEST_RETRY_CONFIG,
184+
timeout=(
185+
3,
186+
5,
187+
), # Use a shorter timeout so that whole object cannot be read
188+
)
189+
.bucket(
190+
self.bucket.name,
191+
provider=self.bucket.provider.value,
192+
)
193+
.object(self.OBJECT_NAME)
194+
)
195+
196+
with self.assertRaises((RequestsConnectionError, Timeout)):
197+
obj.get_reader().as_file().read()
198+
199+
# Verify that the object is not cached
200+
assert (
201+
not obj.props.present
202+
), "The object should not be cached when reading partially with Streaming-Cold-GET enabled."
203+
204+
with self.assertRaises((RequestsConnectionError, Timeout)):
205+
obj.get_reader().read_all()
206+
207+
# Verify that the object is not cached
208+
assert (
209+
not obj.props.present
210+
), "The object should not be cached when reading partially with Streaming-Cold-GET enabled."
211+
212+
@unittest.skipUnless(REMOTE_SET, "Remote bucket is not set")
213+
@pytest.mark.extended
214+
def test_streaming_cold_get_cached(self):
215+
"""
216+
Test that the object will be cached only when its read in entirety
217+
with Streaming-Cold-GET enabled.
218+
"""
219+
# Read only partial object
220+
content = self.object.get_reader().as_file().read(MIB)
221+
self.assertEqual(
222+
len(content),
223+
MIB,
224+
"The content should be only be 1 MiB when reading "
225+
"partially from the object with Streaming-Cold-GET enabled.",
226+
)
227+
# Verify that the object is not cached
228+
assert (
229+
not self.object.props.present
230+
), "The object should not be cached when reading partially with Streaming-Cold-GET enabled."
231+
232+
# Read the entire object to cache it
233+
full_content = self.object.get_reader().as_file().read()
234+
self.assertEqual(
235+
len(full_content),
236+
self.OBJECT_SIZE,
237+
"The full content should be read when reading the entire object with Streaming-Cold-GET enabled.",
238+
)
239+
# Verify that the object is now cached
240+
assert (
241+
self.object.props.present
242+
), "The object should be cached after reading the entire object with Streaming-Cold-GET enabled."
243+
244+
@unittest.skipUnless(REMOTE_SET, "Remote bucket is not set")
245+
@pytest.mark.extended
246+
def test_conn_err_mid_stream(self):
247+
"""
248+
Test that `ConnectionResetError` is raised when a simulated mid-stream disconnect occurs.
249+
"""
250+
stream = self.object.get_reader().raw()
251+
mid_stream_reader = MidStreamDropper(stream, fail_after_bytes=100 * MIB)
252+
253+
with self.assertRaises(ConnectionResetError):
254+
while True:
255+
data = mid_stream_reader.read(128 * 1024) # 128 KiB chunks
256+
if not data:
257+
break
258+
# Verify that the object is not cached after a mid-stream disconnect
259+
assert (
260+
not self.object.props.present
261+
), "The object should not be cached there was a problem in reading Streaming-Cold-GET enabled."

0 commit comments

Comments
 (0)
0