8000 feat: allow to access parameters of data request wo loading data by samsja · Pull Request #4991 · jina-ai/serve · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: allow to access parameters of data request wo loading data #4991

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 49 additions & 30 deletions jina/types/request/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(
request: Optional[RequestSourceType] = None,
):
self.buffer = None
self._pb_body = None

try:
if isinstance(request, jina_pb2.DataRequestProto):
Expand All @@ -127,25 +128,34 @@ def __init__(
f'fail to construct a {self.__class__} object from {request}'
) from ex

self._pb_body_wo_data = None

@property
def is_decompressed(self) -> bool:
"""
Checks if the underlying proto object was already deserialized into a :class:`jina.proto.jina_pb2.DataRequestProto`
Checks if the underlying proto object was already deserialized into a :class:`jina.proto.jina_pb2.DataRequestProto` or
:class:`jina.proto.jina_pb2.DataRequestProtoWoData`. This does not necessarily mean that the data (docs) inside the request is also decompressed.
:return: True if the proto was deserialized before
"""
return self.buffer is None
return type(self._pb_body) in [
jina_pb2.DataRequestProto,
jina_pb2.DataRequestProtoWoData,
]

@property
def is_decompressed_with_data(self) -> bool:
"""
Checks if the underlying proto object was already deserialized into a :class:`jina.proto.jina_pb2.DataRequestProto`. In this case the full proto is decompressed, including the data (docs).
:return: True if the proto was deserialized before, including the data (docs)
"""
return type(self._pb_body) is jina_pb2.DataRequestProto

@property
def is_decompressed_wo_data(self) -> bool:
"""
Checks if the underlying proto object was already deserialized into a :class:`jina.proto.jina_pb2.DataRequestProtoWoData` i,e
a DataRequest without docs
Checks if the underlying proto object was already deserialized into a :class:`jina.proto.jina_pb2.DataRequestProtoWoData`. It means that the proto is loaded without the data ( docs ).

:return: True if the proto was deserialized before into a DataRequest without docs
"""
return self._pb_body_wo_data is not None
return type(self._pb_body) is jina_pb2.DataRequestProtoWoData

@property
def proto_wo_data(
Expand All @@ -157,32 +167,34 @@ def proto_wo_data(
calling :meth:`SerializeToString`.
:return: protobuf instance containing parameters
"""
if self.is_decompressed:
return self._pb_body
elif self.is_decompressed_wo_data:
return self._pb_body_wo_data
else:
if self._pb_body is None:
self._decompress_wo_data()
return self._pb_body_wo_data
return self._pb_body

@property
def proto(self) -> 'jina_pb2.DataRequestProto':
def proto(
self,
) -> Union['jina_pb2.DataRequestProto', 'jina_pb2.DataRequestProtoWoData']:
"""
Cast ``self`` to a :class:`jina_pb2.DataRequestProto`. Laziness will be broken and serialization will be recomputed when calling.
Under the hood it is calling :meth:`jina.types.request.data.DataRequest.proto_wo_data`. This method is keep for legacy.
Cast ``self`` to a :class:`jina_pb2.DataRequestProto` or a :class:`jina_pb2.DataRequestProto`. Laziness will be broken and serialization will be recomputed when calling.
it returns the underlying proto if it already exists (even if he is loaded without data) or creates a new one.
:meth:`SerializeToString`.
:return: DataRequestProto protobuf instance
"""
return self.proto_data
if not self.is_decompressed:
self._decompress()
return self._pb_body

@property
def proto_data(self) -> 'jina_pb2.DataRequestProto':
def proto_with_data(
self,
) -> 'jina_pb2.DataRequestProto':
"""
Transform the current buffer to a :class:`jina_pb2.DataRequestProto`. Laziness will be broken and
serialization will be recomputed when calling :meth:`SerializeToString`. :return: protobuf instance
Cast ``self`` to a :class:`jina_pb2.DataRequestProto`. Laziness will be broken and serialization will be recomputed when calling.
:meth:`SerializeToString`.
:return: DataRequestProto protobuf instance
"""
if not self.is_decompressed:
if not self.is_decompressed_with_data:
self._decompress()
return self._pb_body

Expand All @@ -192,15 +204,22 @@ def _decompress_wo_data(self):

# Under the hood it used a different DataRequestProto (the DataRequestProtoWoData) that will just ignore the
# bytes from the bytes related to the docs that are store at the end of the Proto buffer
self._pb_body_wo_data = jina_pb2.DataRequestProtoWoData()
self._pb_body_wo_data.ParseFromString(self.buffer)
self._pb_body = jina_pb2.DataRequestProtoWoData()
self._pb_body.ParseFromString(self.buffer)
self.buffer = None

def _decompress(self):
"""Decompress the buffer into a DataRequestProto"""
self._pb_body = jina_pb2.DataRequestProto()
self._pb_body.ParseFromString(self.buffer)
self.buffer = None
self._pb_body_wo_data = None
if self.buffer:
self._pb_body = jina_pb2.DataRequestProto()
self._pb_body.ParseFromString(self.buffer)
self.buffer = None
elif self.is_decompressed_wo_data:
self._pb_body_old = self._pb_body
self._pb_body = jina_pb2.DataRequestProto()
self._pb_body.ParseFromString(self._pb_body_old.SerializePartialToString())
else:
raise ValueError('the buffer is already decompressed')

def to_dict(self) -> Dict:
"""Return the object in Python dictionary.
Expand Down Expand Up @@ -234,7 +253,7 @@ def data(self) -> 'DataRequest._DataContent':

:return: the data content as an instance of _DataContent wrapping docs
"""
return DataRequest._DataContent(self.proto.data)
return DataRequest._DataContent(self.proto_with_data.data)

@property
def parameters(self) -> Dict:
Expand All @@ -249,8 +268,8 @@ def parameters(self, value: Dict):
"""Set the `parameters` field of this Request to a Python dict
:param value: a Python dict
"""
self.proto.parameters.Clear()
self.proto.parameters.update(value)
self.proto_wo_data.parameters.Clear()
self.proto_wo_data.parameters.update(value)

@property
def response(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/gateway_clients/test_clients_gateways.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async def task_wrapper():
import random

await asyncio.sleep(1 / (random.randint(1, 3) * 10))
if requests[0].is_decompressed:
if requests[0].is_decompressed_with_data:
return (
DataRequest(request=requests[0].proto.SerializePartialToString()),
{},
Expand Down
131 changes: 118 additions & 13 deletions tests/unit/types/request/test_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ def test_lazy_serialization():
byte_array = DataRequestProto.SerializeToString(r)

deserialized_request = DataRequestProto.FromString(byte_array)
assert not deserialized_request.is_decompressed
assert not deserialized_request.is_decompressed_with_data
assert len(deserialized_request.docs) == doc_count
assert deserialized_request.docs == r.docs
assert deserialized_request.is_decompressed
assert deserialized_request.is_decompressed_with_data
assert not deserialized_request.is_decompressed_wo_data


def test_lazy_serialization_bytes():
def test_lazy_serialization_bytes(request_proto_bytes):
doc_count = 1000
r = DataRequest()
da = r.docs
Expand All @@ -102,10 +102,10 @@ def test_lazy_serialization_bytes():
byte_array = DataRequestProto.SerializeToString(r)

deserialized_request = DataRequestProto.FromString(byte_array)
assert not deserialized_request.is_decompressed
assert not deserialized_request.is_decompressed_with_data
assert len(deserialized_request.docs) == doc_count
assert deserialized_request.docs == r.docs
assert deserialized_request.is_decompressed
assert deserialized_request.is_decompressed_with_data
assert not deserialized_request.is_decompressed_wo_data


Expand All @@ -116,10 +116,10 @@ def test_status():
byte_array = DataRequestProto.SerializeToString(r)

deserialized_request = DataRequestProto.FromString(byte_array)
assert not deserialized_request.is_decompressed
assert not deserialized_request.is_decompressed_with_data
assert deserialized_request.status.code == jina_pb2.StatusProto.ERROR
assert deserialized_request.is_decompressed_wo_data
assert not deserialized_request.is_decompressed
assert not deserialized_request.is_decompressed_with_data


def test_load_parameters_wo_loading_data(): # test that accessing parameters does not load the data
Expand All @@ -134,13 +134,38 @@ def test_load_parameters_wo_loading_data(): # test that accessing parameters do
byte_array = DataRequestProto.SerializeToString(r)

deserialized_request = DataRequest(byte_array)
assert not deserialized_request.is_decompressed
assert not deserialized_request.is_decompressed_with_data
assert deserialized_request.parameters == parameters
assert deserialized_request.is_decompressed_wo_data
assert not deserialized_request.is_decompressed
assert not deserialized_request.is_decompressed_with_data

with pytest.raises(AttributeError):
deserialized_request._pb_body_wo_data.data

def test_change_parameters_wo_loading_data(): # test that changing parameters does not load the data
doc_count = 1000
r = DataRequest()
da = r.docs
da.extend([Document(text='534534534er5yr5y645745675675675345')] * doc_count)
r.data.docs = da

parameters = {'a': 0}
new_parameters = {'b': 1}

r.parameters = parameters
byte_array = DataRequestProto.SerializeToString(r)

deserialized_request = DataRequest(byte_array)
assert not deserialized_request.is_decompressed_with_data
assert deserialized_request.parameters == parameters
assert deserialized_request.is_decompressed_wo_data

deserialized_request.parameters = new_parameters

new_byte_array = DataRequestProto.SerializeToString(deserialized_request)
new_deserialized_request = DataRequest(new_byte_array)

assert new_deserialized_request.parameters == new_parameters
new_deserialized_request.docs
assert new_deserialized_request.docs == da


def test_send_data_request_wo_data(): # check that when sending a DataRequestWoData the docs are sent
Expand Down Expand Up @@ -180,7 +205,7 @@ def test_delete_of_pb2_wo_data(): # ensure that pb2_wo_data is destroyed when a
deserialized_request.parameters is not None
) # access the parameters and create the proto wo data
assert deserialized_request.is_decompressed_wo_data
assert not deserialized_request.is_decompressed
assert not deserialized_request.is_decompressed_with_data

assert (
deserialized_request.docs == r.docs
Expand All @@ -189,4 +214,84 @@ def test_delete_of_pb2_wo_data(): # ensure that pb2_wo_data is destroyed when a
assert (
not deserialized_request.is_decompressed_wo_data
) # check that it is destroyed
assert deserialized_request.is_decompressed
assert deserialized_request.is_decompressed_with_data


def test_change_only_params(): # check that when sending a DataRequestWoData the docs are sent
doc_count = 1000
r = DataRequest()
da = r.docs
da.extend([Document(text='534534534er5yr5y645745675675675345')] * doc_count)
r.data.docs = da

byte_array = DataRequestProto.SerializeToString(r)

deserialized_request = DataRequest(byte_array)

assert deserialized_request.parameters is not None
assert deserialized_request.is_decompressed_wo_data

final_request = DataRequestProto.FromString(
DataRequestProto.SerializeToString(deserialized_request)
)

assert len(final_request.docs) == doc_count
assert final_request.docs == r.docs


def test_proto_wo_data_to_data(request_proto_bytes):
proto_wo_data = jina_pb2.DataRequestProtoWoData()
proto_wo_data.ParseFromString(request_proto_bytes)

proto_data = jina_pb2.DataRequestProto()
proto_data.ParseFromString(request_proto_bytes)

assert ( # check that once we serialize both proto have the same content
proto_wo_data.SerializePartialToString()
== proto_data.SerializePartialToString()
)


@pytest.fixture()
def request_proto_bytes():
doc_count = 1000
r = DataRequest()
da = r.docs
da.extend([Document(text='534534534er5yr5y645745675675675345')] * doc_count)
r.data.docs = da
return DataRequestProto.SerializeToString(r)


def test_proto_wo_data_to_param_change_data(request_proto_bytes):

proto_wo_data = jina_pb2.DataRequestProtoWoData()
proto_wo_data.ParseFromString(request_proto_bytes)

proto_data = jina_pb2.DataRequestProto()
proto_data.ParseFromString(request_proto_bytes)

for proto in [proto_data, proto_wo_data]:
proto.parameters.Clear()
proto.parameters.update({'b': 1})

assert ( # check that once we serialize both proto have the same content
proto_wo_data.SerializePartialToString()
== proto_data.SerializePartialToString()
)


def test_proto_wo_data_docs(): # check if we can access the docs after deserializing from a proto_wo_data
doc_count = 1000
r = DataRequest()
da = r.docs
da.extend([Document(text='534534534er5yr5y645745675675675345')] * doc_count)
r.data.docs = da

proto_wo_data = jina_pb2.DataRequestProtoWoData()
proto_wo_data.ParseFromString(DataRequestProto.SerializeToString(r))

bytes_ = proto_wo_data.SerializePartialToString()

new_data_request = DataRequest(bytes_)

assert new_data_request.docs == r.docs
0