8000 [Bug]: `message/stream` does not stream properly · Issue #111 · google-a2a/a2a-python · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
8000

[Bug]: message/stream does not stream properly #111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
1 task done
matino opened this issue May 26, 2025 · 0 comments
Open
1 task done

[Bug]: message/stream does not stream properly #111

matino opened this issue May 26, 2025 · 0 comments

Comments

@matino
Copy link
Contributor
matino commented May 26, 2025

What happened?

I was trying to get streaming through SSE working, and it didn't work properly.

I wrote a simple ADK agent executor:

class ADKAgentExecutor(AgentExecutor):
    def __init__(self, runner: Runner) -> None:
        self.runner = runner
        self.run_config = RunConfig(streaming_mode=StreamingMode.SSE)
        self.user_id = "self"

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)
        task_updater.start_work()

        session = await self._get_session(context.context_id)
        message = types.UserContent(
            parts=a2a_parts_to_genai(context.message.parts),
        )

        async for event in self.runner.run_async(
            session_id=session.id,
            user_id=self.user_id,
            new_message=message,
            run_config=self.run_config,
        ):
            parts = genai_parts_to_a2a(event.content.parts)
            if event.is_final_response():
                task_updater.add_artifact(parts)
                task_updater.complete()
                break
            if not event.get_function_calls():
                task_updater.update_status(
                    TaskState.working,
                    message=task_updater.new_agent_message(parts),
                )

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        raise ServerError(error=UnsupportedOperationError())

    async def _get_session(self, session_id: str) -> Session:
        return await self.runner.session_service.get_session(
            app_name=self.runner.app_name, user_id=self.user_id, session_id=session_id
        ) or await self.runner.session_service.create_session(
            app_name=self.runner.app_name, user_id=self.user_id, session_id=session_id
        )

When testing the app with curl:

curl -N --http2 -H "Accept:text/event-stream" -X POST http://0.0.0.0:8000/ -d '{"method": "message/stream", "params": {"message": {"role": "user", "parts": [{"kind": "text", "text": "Some random quesiton"}], "messageId": "1"}}}'

I can see the very first chunk of response appear immediately:

data: {"jsonrpc":"2.0","result":{"contextId":"6661ac28-40fc-4102-ae12-fb09b0c2dc24","final":false,"kind":"status-update","status":{"state":"working"},"taskId":"88edd064-7d95-40c3-88f4-e41cb4009b19"}}

Then, the response is buffered and all the chunks are returned at the same time after a few seconds.

The solution, is to sleep small amount of time in the async for event in self.runner.run_async loop:

        async for event in self.runner.run_async(
            session_id=session.id,
            user_id=self.user_id,
            new_message=message,
            run_config=self.run_config,
        ):
            parts = genai_parts_to_a2a(event.content.parts)
            if event.is_final_response():
                task_updater.add_artifact(parts)
                task_updater.complete()
                break
            if not event.get_function_calls():
                task_updater.update_status(
                    TaskState.working,
                    message=task_updater.new_agent_message(parts),
                )

                # Hack for SSE
                await asyncio.sleep(0)

Only then, the response chunks started to show up one after another.
I don't think that's the right solution though :)

Related discussion:
#79

Code of Conduct

  • I agree to follow this project's Code of Conduct
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
3F5A
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant
0