From 3b7442e309c001ec68d9d96a2084a4c40e273896 Mon Sep 17 00:00:00 2001 From: Sami Jaghouar Date: Mon, 4 Jul 2022 10:56:58 +0200 Subject: [PATCH 1/3] feat: allow to access parameters of data request wo loading data --- jina/types/request/data.py | 71 ++++++---- .../gateway_clients/test_clients_gateways.py | 2 +- tests/unit/types/request/test_request.py | 131 ++++++++++++++++-- 3 files changed, 164 insertions(+), 40 deletions(-) diff --git a/jina/types/request/data.py b/jina/types/request/data.py index a118c145546a6..dec584b4c2d8d 100644 --- a/jina/types/request/data.py +++ b/jina/types/request/data.py @@ -104,6 +104,7 @@ def __init__( request: Optional[RequestSourceType] = None, ): self.buffer = None + self._pb_body = None try: if isinstance(request, jina_pb2.DataRequestProto): @@ -127,15 +128,25 @@ def __init__( f'fail to construct a {self.__class__} object from {request}' ) from ex - self._pb_body_wo_data = None - @property def is_decompressed(self) -> bool: + """ + Checks if the underlying proto object was already deserialized into a :class:`jina.proto.jina_pb2.DataRequestProto` or + :class:`jina.proto.jina_pb2.DataRequestProtoWoData` + :return: True if the proto was deserialized before + """ + return type(self._pb_body) in [ + jina_pb2.DataRequestProto, + jina_pb2.DataRequestProtoWoData, + ] + + @property + def is_decompressed_with_data(self) -> bool: """ Checks if the underlying proto object was already deserialized into a :class:`jina.proto.jina_pb2.DataRequestProto` :return: True if the proto was deserialized before """ - return self.buffer is None + return type(self._pb_body) is jina_pb2.DataRequestProto @property def is_decompressed_wo_data(self) -> bool: @@ -145,7 +156,7 @@ def is_decompressed_wo_data(self) -> bool: :return: True if the proto was deserialized before into a DataRequest without docs """ - return self._pb_body_wo_data is not None + return type(self._pb_body) is jina_pb2.DataRequestProtoWoData @property def proto_wo_data( @@ -157,32 +168,33 @@ def proto_wo_data( calling :meth:`SerializeToString`. :return: protobuf instance containing parameters """ - if self.is_decompressed: - return self._pb_body - elif self.is_decompressed_wo_data: - return self._pb_body_wo_data - else: + if self._pb_body is None: self._decompress_wo_data() - return self._pb_body_wo_data + return self._pb_body @property - def proto(self) -> 'jina_pb2.DataRequestProto': + def proto( + self, + ) -> Union['jina_pb2.DataRequestProto', 'jina_pb2.DataRequestProtoWoData']: """ Cast ``self`` to a :class:`jina_pb2.DataRequestProto`. Laziness will be broken and serialization will be recomputed when calling. - Under the hood it is calling :meth:`jina.types.request.data.DataRequest.proto_wo_data`. This method is keep for legacy. :meth:`SerializeToString`. :return: DataRequestProto protobuf instance """ - return self.proto_data + if not self.is_decompressed: + self._decompress() + return self._pb_body @property - def proto_data(self) -> 'jina_pb2.DataRequestProto': + def proto_with_data( + self, + ) -> 'jina_pb2.DataRequestProto': """ - Transform the current buffer to a :class:`jina_pb2.DataRequestProto`. Laziness will be broken and - serialization will be recomputed when calling :meth:`SerializeToString`. :return: protobuf instance + Cast ``self`` to a :class:`jina_pb2.DataRequestProto`. Laziness will be broken and serialization will be recomputed when calling. + :meth:`SerializeToString`. :return: DataRequestProto protobuf instance """ - if not self.is_decompressed: + if not self.is_decompressed_with_data: self._decompress() return self._pb_body @@ -192,15 +204,22 @@ def _decompress_wo_data(self): # Under the hood it used a different DataRequestProto (the DataRequestProtoWoData) that will just ignore the # bytes from the bytes related to the docs that are store at the end of the Proto buffer - self._pb_body_wo_data = jina_pb2.DataRequestProtoWoData() - self._pb_body_wo_data.ParseFromString(self.buffer) + self._pb_body = jina_pb2.DataRequestProtoWoData() + self._pb_body.ParseFromString(self.buffer) + self.buffer = None def _decompress(self): """Decompress the buffer into a DataRequestProto""" - self._pb_body = jina_pb2.DataRequestProto() - self._pb_body.ParseFromString(self.buffer) - self.buffer = None - self._pb_body_wo_data = None + if self.buffer: + self._pb_body = jina_pb2.DataRequestProto() + self._pb_body.ParseFromString(self.buffer) + self.buffer = None + elif self.is_decompressed_wo_data: + self._pb_body_old = self._pb_body + self._pb_body = jina_pb2.DataRequestProto() + self._pb_body.ParseFromString(self._pb_body_old.SerializePartialToString()) + else: + raise ValueError('the buffer is already decompressed') def to_dict(self) -> Dict: """Return the object in Python dictionary. @@ -234,7 +253,7 @@ def data(self) -> 'DataRequest._DataContent': :return: the data content as an instance of _DataContent wrapping docs """ - return DataRequest._DataContent(self.proto.data) + return DataRequest._DataContent(self.proto_with_data.data) @property def parameters(self) -> Dict: @@ -249,8 +268,8 @@ def parameters(self, value: Dict): """Set the `parameters` field of this Request to a Python dict :param value: a Python dict """ - self.proto.parameters.Clear() - self.proto.parameters.update(value) + self.proto_wo_data.parameters.Clear() + self.proto_wo_data.parameters.update(value) @property def response(self): diff --git a/tests/integration/gateway_clients/test_clients_gateways.py b/tests/integration/gateway_clients/test_clients_gateways.py index 8e30e2c15264d..a020ab9c4f827 100644 --- a/tests/integration/gateway_clients/test_clients_gateways.py +++ b/tests/integration/gateway_clients/test_clients_gateways.py @@ -94,7 +94,7 @@ async def task_wrapper(): import random await asyncio.sleep(1 / (random.randint(1, 3) * 10)) - if requests[0].is_decompressed: + if requests[0].is_decompressed_with_data: return ( DataRequest(request=requests[0].proto.SerializePartialToString()), {}, diff --git a/tests/unit/types/request/test_request.py b/tests/unit/types/request/test_request.py index 395e606edd3b0..cf1eaa4c59707 100644 --- a/tests/unit/types/request/test_request.py +++ b/tests/unit/types/request/test_request.py @@ -86,14 +86,14 @@ def test_lazy_serialization(): byte_array = DataRequestProto.SerializeToString(r) deserialized_request = DataRequestProto.FromString(byte_array) - assert not deserialized_request.is_decompressed + assert not deserialized_request.is_decompressed_with_data assert len(deserialized_request.docs) == doc_count assert deserialized_request.docs == r.docs - assert deserialized_request.is_decompressed + assert deserialized_request.is_decompressed_with_data assert not deserialized_request.is_decompressed_wo_data -def test_lazy_serialization_bytes(): +def test_lazy_serialization_bytes(request_proto_bytes): doc_count = 1000 r = DataRequest() da = r.docs @@ -102,10 +102,10 @@ def test_lazy_serialization_bytes(): byte_array = DataRequestProto.SerializeToString(r) deserialized_request = DataRequestProto.FromString(byte_array) - assert not deserialized_request.is_decompressed + assert not deserialized_request.is_decompressed_with_data assert len(deserialized_request.docs) == doc_count assert deserialized_request.docs == r.docs - assert deserialized_request.is_decompressed + assert deserialized_request.is_decompressed_with_data assert not deserialized_request.is_decompressed_wo_data @@ -116,10 +116,10 @@ def test_status(): byte_array = DataRequestProto.SerializeToString(r) deserialized_request = DataRequestProto.FromString(byte_array) - assert not deserialized_request.is_decompressed + assert not deserialized_request.is_decompressed_with_data assert deserialized_request.status.code == jina_pb2.StatusProto.ERROR assert deserialized_request.is_decompressed_wo_data - assert not deserialized_request.is_decompressed + assert not deserialized_request.is_decompressed_with_data def test_load_parameters_wo_loading_data(): # test that accessing parameters does not load the data @@ -134,13 +134,38 @@ def test_load_parameters_wo_loading_data(): # test that accessing parameters do byte_array = DataRequestProto.SerializeToString(r) deserialized_request = DataRequest(byte_array) - assert not deserialized_request.is_decompressed + assert not deserialized_request.is_decompressed_with_data assert deserialized_request.parameters == parameters assert deserialized_request.is_decompressed_wo_data - assert not deserialized_request.is_decompressed + assert not deserialized_request.is_decompressed_with_data - with pytest.raises(AttributeError): - deserialized_request._pb_body_wo_data.data + +def test_change_parameters_wo_loading_data(): # test that changing parameters does not load the data + doc_count = 1000 + r = DataRequest() + da = r.docs + da.extend([Document(text='534534534er5yr5y645745675675675345')] * doc_count) + r.data.docs = da + + parameters = {'a': 0} + new_parameters = {'b': 1} + + r.parameters = parameters + byte_array = DataRequestProto.SerializeToString(r) + + deserialized_request = DataRequest(byte_array) + assert not deserialized_request.is_decompressed_with_data + assert deserialized_request.parameters == parameters + assert deserialized_request.is_decompressed_wo_data + + deserialized_request.parameters = new_parameters + + new_byte_array = DataRequestProto.SerializeToString(deserialized_request) + new_deserialized_request = DataRequest(new_byte_array) + + assert new_deserialized_request.parameters == new_parameters + new_deserialized_request.docs + assert new_deserialized_request.docs == da def test_send_data_request_wo_data(): # check that when sending a DataRequestWoData the docs are sent @@ -180,7 +205,7 @@ def test_delete_of_pb2_wo_data(): # ensure that pb2_wo_data is destroyed when a deserialized_request.parameters is not None ) # access the parameters and create the proto wo data assert deserialized_request.is_decompressed_wo_data - assert not deserialized_request.is_decompressed + assert not deserialized_request.is_decompressed_with_data assert ( deserialized_request.docs == r.docs @@ -189,4 +214,84 @@ def test_delete_of_pb2_wo_data(): # ensure that pb2_wo_data is destroyed when a assert ( not deserialized_request.is_decompressed_wo_data ) # check that it is destroyed - assert deserialized_request.is_decompressed + assert deserialized_request.is_decompressed_with_data + + +def test_change_only_params(): # check that when sending a DataRequestWoData the docs are sent + doc_count = 1000 + r = DataRequest() + da = r.docs + da.extend([Document(text='534534534er5yr5y645745675675675345')] * doc_count) + r.data.docs = da + + byte_array = DataRequestProto.SerializeToString(r) + + deserialized_request = DataRequest(byte_array) + + assert deserialized_request.parameters is not None + assert deserialized_request.is_decompressed_wo_data + + final_request = DataRequestProto.FromString( + DataRequestProto.SerializeToString(deserialized_request) + ) + + assert len(final_request.docs) == doc_count + assert final_request.docs == r.docs + + +def test_proto_wo_data_to_data(request_proto_bytes): + proto_wo_data = jina_pb2.DataRequestProtoWoData() + proto_wo_data.ParseFromString(request_proto_bytes) + + proto_data = jina_pb2.DataRequestProto() + proto_data.ParseFromString(request_proto_bytes) + + assert ( # check that once we serialize both proto have the same content + proto_wo_data.SerializePartialToString() + == proto_data.SerializePartialToString() + ) + + +@pytest.fixture() +def request_proto_bytes(): + doc_count = 1000 + r = DataRequest() + da = r.docs + da.extend([Document(text='534534534er5yr5y645745675675675345')] * doc_count) + r.data.docs = da + return DataRequestProto.SerializeToString(r) + + +def test_proto_wo_data_to_param_change_data(request_proto_bytes): + + proto_wo_data = jina_pb2.DataRequestProtoWoData() + proto_wo_data.ParseFromString(request_proto_bytes) + + proto_data = jina_pb2.DataRequestProto() + proto_data.ParseFromString(request_proto_bytes) + + for proto in [proto_data, proto_wo_data]: + proto.parameters.Clear() + proto.parameters.update({'b': 1}) + + assert ( # check that once we serialize both proto have the same content + proto_wo_data.SerializePartialToString() + == proto_data.SerializePartialToString() + ) + + +def test_proto_wo_data_docs(): # check if we can access the docs after deserializing from a proto_wo_data + doc_count = 1000 + r = DataRequest() + da = r.docs + da.extend([Document(text='534534534er5yr5y645745675675675345')] * doc_count) + r.data.docs = da + + proto_wo_data = jina_pb2.DataRequestProtoWoData() + proto_wo_data.ParseFromString(DataRequestProto.SerializeToString(r)) + + bytes_ = proto_wo_data.SerializePartialToString() + + new_data_request = DataRequest(bytes_) + + assert new_data_request.docs == r.docs From 9811c65477ba48c68d4eab6aead92ceb94bc4081 Mon Sep 17 00:00:00 2001 From: Sami Jaghouar Date: Tue, 5 Jul 2022 09:31:45 +0200 Subject: [PATCH 2/3] feat: update docstring --- jina/types/request/data.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/jina/types/request/data.py b/jina/types/request/data.py index dec584b4c2d8d..6ecd2724d8122 100644 --- a/jina/types/request/data.py +++ b/jina/types/request/data.py @@ -132,7 +132,7 @@ def __init__( def is_decompressed(self) -> bool: """ Checks if the underlying proto object was already deserialized into a :class:`jina.proto.jina_pb2.DataRequestProto` or - :class:`jina.proto.jina_pb2.DataRequestProtoWoData` + :class:`jina.proto.jina_pb2.DataRequestProtoWoData` This does not necessarily mean that the docs are loaded. :return: True if the proto was deserialized before """ return type(self._pb_body) in [ @@ -143,7 +143,7 @@ def is_decompressed(self) -> bool: @property def is_decompressed_with_data(self) -> bool: """ - Checks if the underlying proto object was already deserialized into a :class:`jina.proto.jina_pb2.DataRequestProto` + Checks if the underlying proto object was already deserialized into a :class:`jina.proto.jina_pb2.DataRequestProto`. In this case the full proto is loaded including tha data ( docs ) :return: True if the proto was deserialized before """ return type(self._pb_body) is jina_pb2.DataRequestProto @@ -151,8 +151,7 @@ def is_decompressed_with_data(self) -> bool: @property def is_decompressed_wo_data(self) -> bool: """ - Checks if the underlying proto object was already deserialized into a :class:`jina.proto.jina_pb2.DataRequestProtoWoData` i,e - a DataRequest without docs + Checks if the underlying proto object was already deserialized into a :class:`jina.proto.jina_pb2.DataRequestProtoWoData`. It means that the proto is loaded without the data ( docs ). :return: True if the proto was deserialized before into a DataRequest without docs """ @@ -177,7 +176,8 @@ def proto( self, ) -> Union['jina_pb2.DataRequestProto', 'jina_pb2.DataRequestProtoWoData']: """ - Cast ``self`` to a :class:`jina_pb2.DataRequestProto`. Laziness will be broken and serialization will be recomputed when calling. + Cast ``self`` to a :class:`jina_pb2.DataRequestProto` or a :class:`jina_pb2.DataRequestProto`. Laziness will be broken and serialization will be recomputed when calling. + it returns the underlying proto if it already exists (even if he is loaded without data) or creates a new one. :meth:`SerializeToString`. :return: DataRequestProto protobuf instance """ From 51667406e3a9385719391e80060cfbeb50bee96b Mon Sep 17 00:00:00 2001 From: samsja <55492238+samsja@users.noreply.github.com> Date: Tue, 5 Jul 2022 09:45:04 +0200 Subject: [PATCH 3/3] feat: apply johannes suggestion Co-authored-by: Johannes Messner <44071807+JohannesMessner@users.noreply.github.com> --- jina/types/request/data.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/jina/types/request/data.py b/jina/types/request/data.py index 6ecd2724d8122..9a989aef1c80d 100644 --- a/jina/types/request/data.py +++ b/jina/types/request/data.py @@ -132,7 +132,7 @@ def __init__( def is_decompressed(self) -> bool: """ Checks if the underlying proto object was already deserialized into a :class:`jina.proto.jina_pb2.DataRequestProto` or - :class:`jina.proto.jina_pb2.DataRequestProtoWoData` This does not necessarily mean that the docs are loaded. + :class:`jina.proto.jina_pb2.DataRequestProtoWoData`. This does not necessarily mean that the data (docs) inside the request is also decompressed. :return: True if the proto was deserialized before """ return type(self._pb_body) in [ @@ -143,8 +143,8 @@ def is_decompressed(self) -> bool: @property def is_decompressed_with_data(self) -> bool: """ - Checks if the underlying proto object was already deserialized into a :class:`jina.proto.jina_pb2.DataRequestProto`. In this case the full proto is loaded including tha data ( docs ) - :return: True if the proto was deserialized before + Checks if the underlying proto object was already deserialized into a :class:`jina.proto.jina_pb2.DataRequestProto`. In this case the full proto is decompressed, including the data (docs). + :return: True if the proto was deserialized before, including the data (docs) """ return type(self._pb_body) is jina_pb2.DataRequestProto