-
Notifications
You must be signed in to change notification settings - Fork 3
Publish and subscribe acks and nacks #36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This pull request has been linked to Shortcut Story #17421: Subscribe callbacks. |
@@ -15,7 +15,7 @@ def wrap(event, topic_id): | |||
raise TypeError("topic_id must be a ULID") | |||
|
|||
return event_pb2.EventWrapper( | |||
event=event.SerializeToString(), topic_id=topic_id.bytes | |||
local_id=ULID().bytes, event=event.SerializeToString(), topic_id=topic_id.bytes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Ensign server uses the local_id
to communicate which event was acked/nacked.
@@ -268,48 +243,3 @@ async def _close_streams(self): | |||
await subscriber.close() | |||
|
|||
await self.pool.release() | |||
|
|||
|
|||
class BidiQueue: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to a separate file to avoid the circular import.
@@ -199,12 +209,13 @@ async def subscribe(self, *topics, query="", consumer_group=None): | |||
# Get the ID of the topic | |||
topic_ids.append(str(self._resolve_topic(topic))) | |||
|
|||
async for event in self.client.subscribe( | |||
# Run the subscriber | |||
await self.client.subscribe( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: This will break the examples
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below is what our "minimal" Python example looks like now. Can you show how it will be different when we do the next merge to main?
import json
import asyncio
from datetime import datetime
from pyensign.ensign import Ensign
from pyensign.events import Event
TOPIC = "chocolate-covered-espresso-beans"
async def publish(client, topic, events):
await asyncio.sleep(1)
errors = await client.publish(topic, events)
if errors:
print("Failed to publish events: {}".format(errors))
return errors
async def subscribe(client, topic):
id = await(client.topic_id(topic))
async for event in client.subscribe(id):
msg = json.loads(event.data)
print("At {}, {} sent you the following message: {}".format(msg["timestamp"], msg["sender"], msg["message"]))
return event
async def main():
# Create an Ensign client
client = Ensign(
# endpoint="staging.ensign.world:443", # uncomment if in staging
# auth_url="https://auth.ensign.world" # uncomment if in staging
)
# Create topic if it doesn't exist
if not await client.topic_exists(TOPIC):
await client.create_topic(TOPIC)
# Create an event with some data
msg = {
"sender": "Enson the Sea Otter",
"timestamp": datetime.now().isoformat(),
"message": "You're looking smart today!",
}
data = json.dumps(msg).encode("utf-8")
event = Event(data, mimetype="application/json")
# Create the publisher and subscriber tasks
pub = publish(client, TOPIC, event)
sub = subscribe(client, TOPIC)
# Wait for the tasks to complete
await asyncio.gather(pub, sub)
if __name__ == "__main__":
asyncio.run(main())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this (untested).
received = asyncio.Event()
async def print_event(event):
msg = json.loads(event.data)
print("At {}, {} sent you the following message: {}".format(msg["timestamp"], msg["sender"], msg["message"]))
received.set()
async def main():
# Create an Ensign client
client = Ensign(
# endpoint="staging.ensign.world:443", # uncomment if in staging
# auth_url="https://auth.ensign.world" # uncomment if in staging
)
# Create topic if it doesn't exist
client.ensure_topic_exists(TOPIC)
# Create an event with some data
msg = {
"sender": "Enson the Sea Otter",
"timestamp": datetime.now().isoformat(),
"message": "You're looking smart today!",
}
data = json.dumps(msg).encode("utf-8")
event = Event(data, mimetype="application/json")
# Publish and subscribe
await client.subscribe(TOPIC, on_event=print_event)
await asyncio.sleep(1)
await client.publish(client, TOPIC, event)
await received.wait()
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice @pdeziel ! Just had a few questions for you, otherwise should be good to go
@@ -59,8 +59,9 @@ class PublishResponseIterator(ResponseIterator): | |||
gRPC publish stream and executes user-defined callbacks for acks and nacks. | |||
""" | |||
|
|||
def __init__(self, stream, on_ack=None, on_nack=None): | |||
def __init__(self, stream, pending, on_ack=None, on_nack=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could update the docstring to explain what type pending
is and what it is expected to contain (and how that differs from stream
)?
@@ -199,12 +209,13 @@ async def subscribe(self, *topics, query="", consumer_group=None): | |||
# Get the ID of the topic | |||
topic_ids.append(str(self._resolve_topic(topic))) | |||
|
|||
async for event in self.client.subscribe( | |||
# Run the subscriber | |||
await self.client.subscribe( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Below is what our "minimal" Python example looks like now. Can you show how it will be different when we do the next merge to main?
import json
import asyncio
from datetime import datetime
from pyensign.ensign import Ensign
from pyensign.events import Event
TOPIC = "chocolate-covered-espresso-beans"
async def publish(client, topic, events):
await asyncio.sleep(1)
errors = await client.publish(topic, events)
if errors:
print("Failed to publish events: {}".format(errors))
return errors
async def subscribe(client, topic):
id = await(client.topic_id(topic))
async for event in client.subscribe(id):
msg = json.loads(event.data)
print("At {}, {} sent you the following message: {}".format(msg["timestamp"], msg["sender"], msg["message"]))
return event
async def main():
# Create an Ensign client
client = Ensign(
# endpoint="staging.ensign.world:443", # uncomment if in staging
# auth_url="https://auth.ensign.world" # uncomment if in staging
)
# Create topic if it doesn't exist
if not await client.topic_exists(TOPIC):
await client.create_topic(TOPIC)
# Create an event with some data
msg = {
"sender": "Enson the Sea Otter",
"timestamp": datetime.now().isoformat(),
"message": "You're looking smart today!",
}
data = json.dumps(msg).encode("utf-8")
event = Event(data, mimetype="application/json")
# Create the publisher and subscriber tasks
pub = publish(client, TOPIC, event)
sub = subscribe(client, TOPIC)
# Wait for the tasks to complete
await asyncio.gather(pub, sub)
if __name__ == "__main__":
asyncio.run(main())
tests/pyensign/test_connection.py
Outdated
await client.close() | ||
assert events == 3 | ||
assert len(event_ids) == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What made this drop from three to two events?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A typo, nice catch.
This updates the ack/nack handling to give more control and a better event interface to the user.
I have made the following changes:
on_event
callbacks rather than returning an iteratorTODOs and questions
CHECKLIST
pytest
?