8000 Publish and subscribe acks and nacks by pdeziel · Pull Request #36 · rotationalio/pyensign · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Publish and subscribe acks and nacks #36

New issue 8000

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 19, 2023
Merged

Publish and subscribe acks and nacks #36

merged 3 commits into from
Jun 19, 2023

Conversation

pdeziel
Copy link
Collaborator
@pdeziel pdeziel commented Jun 15, 2023

This updates the ack/nack handling to give more control and a better event interface to the user.

I have made the following changes:

  1. Updated the Event helper class to have state
  2. Updated publish stream handler set ack/nack on the Events
  3. Changed subscribe to have on_event callbacks rather than returning an iterator

TODOs and questions

CHECKLIST

  • Is the commit message formatted correctly?
  • Do all of your functions and methods have docstrings?
  • Have you added/updated unit tests where appropriate?
  • Have you run the unit tests using pytest?
  • Is your code style correct (are you using PEP8, pyflakes)?

@shortcut-integration
Copy link

This pull request has been linked to Shortcut Story #17421: Subscribe callbacks.

@@ -15,7 +15,7 @@ def wrap(event, topic_id):
raise TypeError("topic_id must be a ULID")

return event_pb2.EventWrapper(
event=event.SerializeToString(), topic_id=topic_id.bytes
local_id=ULID().bytes, event=event.SerializeToString(), topic_id=topic_id.bytes
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Ensign server uses the local_id to communicate which event was acked/nacked.

@@ -268,48 +243,3 @@ async def _close_streams(self):
await subscriber.close()

await self.pool.release()


class BidiQueue:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to a separate file to avoid the circular import.

@@ -199,12 +209,13 @@ async def subscribe(self, *topics, query="", consumer_group=None):
# Get the ID of the topic
topic_ids.append(str(self._resolve_topic(topic)))

async for event in self.client.subscribe(
# Run the subscriber
await self.client.subscribe(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This will break the examples

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below is what our "minimal" Python example looks like now. Can you show how it will be different when we do the next merge to main?

import json
import asyncio
from datetime import datetime

from pyensign.ensign import Ensign
from pyensign.events import Event

TOPIC = "chocolate-covered-espresso-beans"

async def publish(client, topic, events):
    await asyncio.sleep(1)
    errors = await client.publish(topic, events)
    if errors:
        print("Failed to publish events: {}".format(errors))
    return errors

async def subscribe(client, topic):
    id = await(client.topic_id(topic))
    async for event in client.subscribe(id):
        msg = json.loads(event.data)
        print("At {}, {} sent you the following message: {}".format(msg["timestamp"], msg["sender"], msg["message"]))
        return event

async def main():
    # Create an Ensign client
    client = Ensign(
        # endpoint="staging.ensign.world:443", # uncomment if in staging
        # auth_url="https://auth.ensign.world" # uncomment if in staging
    )

    # Create topic if it doesn't exist
    if not await client.topic_exists(TOPIC):
        await client.create_topic(TOPIC)

    # Create an event with some data
    msg = {
        "sender": "Enson the Sea Otter",
        "timestamp": datetime.now().isoformat(),
        "message": "You're looking smart today!",
    }
    data = json.dumps(msg).encode("utf-8")
    event = Event(data, mimetype="application/json")

    # Create the publisher and subscriber tasks
    pub = publish(client, TOPIC, event)
    sub = subscribe(client, TOPIC)

    # Wait for the tasks to complete
    await asyncio.gather(pub, sub)

if __name__ == "__main__":
    asyncio.run(main())

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this (untested).

received = asyncio.Event()

async def print_event(event):
    msg = json.loads(event.data)
    print("At {}, {} sent you the following message: {}".format(msg["timestamp"], msg["sender"], msg["message"]))
    received.set()

async def main():
    # Create an Ensign client
    client = Ensign(
        # endpoint="staging.ensign.world:443", # uncomment if in staging
        # auth_url="https://auth.ensign.world" # uncomment if in staging
    )

    # Create topic if it doesn't exist
    client.ensure_topic_exists(TOPIC)

    # Create an event with some data
    msg = {
        "sender": "Enson the Sea Otter",
        "timestamp": datetime.now().isoformat(),
        "message": "You're looking smart today!",
    }
    data = json.dumps(msg).encode("utf-8")
    event = Event(data, mimetype="application/json")

    # Publish and subscribe
    await client.subscribe(TOPIC, on_event=print_event)
    await asyncio.sleep(1)
    await client.publish(client, TOPIC, event)
    await received.wait()

if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

@pdeziel pdeziel requested a review from rebeccabilbro June 15, 2023 13:38
Copy link
Member
@rebeccabilbro rebeccabilbro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice @pdeziel ! Just had a few questions for you, otherwise should be good to go

@@ -59,8 +59,9 @@ class PublishResponseIterator(ResponseIterator):
gRPC publish stream and executes user-defined callbacks for acks and nacks.
"""

def __init__(self, stream, on_ack=None, on_nack=None):
def __init__(self, stream, pending, on_ack=None, on_nack=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could update the docstring to explain what type pending is and what it is expected to contain (and how that differs from stream)?

@@ -199,12 +209,13 @@ async def subscribe(self, *topics, query="", consumer_group=None):
# Get the ID of the topic
topic_ids.append(str(self._resolve_topic(topic)))

async for event in self.client.subscribe(
# Run the subscriber
await self.client.subscribe(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below is what our "minimal" Python example looks like now. Can you show how it will be different when we do the next merge to main?

import json
import asyncio
from datetime import datetime

from pyensign.ensign import Ensign
from pyensign.events import Event

TOPIC = "chocolate-covered-espresso-beans"

async def publish(client, topic, events):
    await asyncio.sleep(1)
    errors = await client.publish(topic, events)
    if errors:
        print("Failed to publish events: {}".format(errors))
    return errors

async def subscribe(client, topic):
    id = await(client.topic_id(topic))
    async for event in client.subscribe(id):
        msg = json.loads(event.data)
        print("At {}, {} sent you the following message: {}".format(msg["timestamp"], msg["sender"], msg["message"]))
        return event

async def main():
    # Create an Ensign client
    client = Ensign(
        # endpoint="staging.ensign.world:443", # uncomment if in staging
        # auth_url="https://auth.ensign.world" # uncomment if in staging
    )

    # Create topic if it doesn't exist
    if not await client.topic_exists(TOPIC):
        await client.create_topic(TOPIC)

    # Create an event with some data
    msg = {
        "sender": "Enson the Sea Otter",
        "timestamp": datetime.now().isoformat(),
        "message": "You're looking smart today!",
    }
    data = json.dumps(msg).encode("utf-8")
    event = Event(data, mimetype="application/json")

    # Create the publisher and subscriber tasks
    pub = publish(client, TOPIC, event)
    sub = subscribe(client, TOPIC)

    # Wait for the tasks to complete
    await asyncio.gather(pub, sub)

if __name__ == "__main__":
    asyncio.run(main())

await client.close()
assert events == 3
assert len(event_ids) == 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What made this drop from three to two events?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A typo, nice catch.

@pdeziel pdeziel merged commit bc49b9f into develop Jun 19, 2023
@pdeziel pdeziel deleted the sc-17421 branch June 19, 2023 21:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants
0