-
-
Notifications
You must be signed in to change notification settings - Fork 904
Improve async performance. #3215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Found some related discussions: Opening a proper issue is warranted to get better visibility for this. So the issue is easier to find for others. In its current state |
Oh, interesting. There's some places I can think of where we might want to be digging into here...
Possibly points of interest here...
Also, the tracing support in both aiohttp and in httpx are likely to be extremely valuable to us here. |
Thank you for the good points!
My original benchmark hit AWS S3. There I got very similar results where |
Okay, thanks. Was that also testing small |
Yes pretty much, GET of a file with size of a couple KB. In the real system the sizes ofcourse vary alot. |
@tomchristie you were right, this is the issue ^! When I just do a simple patch into |
There is another hot spot in The logic in connection pool is quite heavy as it rechecks all of the connections every time requests are assigned to the connectors. It might be possible to skip the Probably it would be good idea to add some performance tests to httpx/httpcore CI. |
I can probably help with a PR if you give me pointers about how to proceed :) I could eg replace the synchronization primitives to use the native asyncio. |
See encode/httpcore#344, #1511, and encode/httpcore#345 for where/why we switched over to anyio.
A good first pass onto this would be to add an You might want to work from the last version that had an Docs... https://www.encode.io/httpcore/network-backends/ Other context...
|
Thanks @tomchristie What about this case I pointed:
There switching network backend won't help as the lock is not defined by the network implementation. The lock implementation is a global one. Should we just change the synchronization to use asyncio? |
I'm able to push the performance of Previously (in You can see the benchmark here. Here are the changes. There are 3 things required to improve the performance to get it as fast as
I'm happy to open a PR from these. What do you think @tomchristie? |
@MarkusSintonen - Nice one. Let's work through those as individual PRs. Is it worth submitting a PR where we add a |
I think it would be beneficial to have benchmark run in CI so we would see the difference. Previously I have contributed to Pydantic and they use codspeed. That outputs benchmark diffs to PR when the benchmarked behaviour changes. It should be free for open-source projects. |
That's an interesting idea. I'd clearly be in agreement with adding a |
@tomchristie I have now opened the 2 fix PRs:
Maybe Ill open the network backend addition after these as its the most complex one. |
Maybe you can refer to the implementation of aiohttp |
Isn't usage of http.CookieJar a part of the problem? Line 1020 in db9072f
|
@rafalkrupinski I haven't run benchmarks when requests/responses uses cookies but atleast it doesnt cause performance issues in general. I run similar benchmarks from
(Waiting for review from @tomchristie) |
TBH I'm surprised by httpx ditching anyio. Sure anyio comes with performance overhead, but this is breaking compatibility with Trio. |
I'm not aware of it ditching it completely. It will still support using it, it's just optional. Trio will be also supported by httpcore. |
These are really cool speed-ups. Can't wait for httpx to overtake aiohttp ;) |
Since the benchmark seems to be using http I think below is also a related issue where creation of ssl context in httpx had some overhead compared to aiohttp. Ref : #838 |
Here is a more complete version: https://github.com/lizeyan/httpx-AIOHttpTransport/ |
@tomchristie Can you provide an update on this or link me to something that I can track? |
@MarkusSintonen Will you be running new tests after the last changes? |
What changes @4uku? 🤔 I'm not seeing anything in |
Hey, have the proposals from @MarkusSintonen been integrated yet? When can we expect them to be released? :) |
Nope, unfortunately this project seems fairly dead. We have already moved away from httpx to aiohttp because of performance issues. |
Thank you @MarkusSintonen for your comment and update. My team and I just acted the switch to |
Me too, sad. But the performance is wonderful now^^ |
Anyone (managed to) run |
@tuukkamustonen I found some issues on that implementation and created my own. The problems are mostly in the exception mapping as he's not considering the inheritance tree and using Haven't deployed it yet, but locally it's working well. I will update here once I test it. |
I have created a forked repository that merges the PR from @MarkusSintonen: I have also deployed a package index as a Simple repository API on GitHub Pages: You can easily replace the $ uv add httpcore --index httpcore-speedups=https://mtkn1.github.io/httpcore-speedups/simple/ However, please note that this is a short-term solution and an independent effort. |
Note that from aiohttp import ClientTimeout
from aiohttp.client import ClientSession, ClientResponse
import httpx
import asyncio
from httpx import Request, Response, AsyncByteStream
import typing
class AiohttpResponseStream(AsyncByteStream):
CHUNK_SIZE = 1024
def __init__(self, aiohttp_response: ClientResponse) -> None:
self._aiohttp_response = aiohttp_response
async def __aiter__(self) -> typing.AsyncIterator[bytes]:
async for chunk in self._aiohttp_response.content.iter_chunked(self.CHUNK_SIZE):
yield chunk
async def aclose(self) -> None:
await self._aiohttp_response.__aexit__(None, None, None)
class AiohttpTransport(httpx.AsyncBaseTransport):
def __init__(self, client: ClientSession) -> None:
self.client = client
async def handle_async_request(
self,
request: Request,
) -> Response:
timeout = request.extensions.get("timeout", {})
response = await self.client.request(
method=request.method,
url=str(request.url),
headers=request.headers,
data=request.content,
allow_redirects=False,
auto_decompress=False,
compress=False,
timeout=ClientTimeout(
sock_connect=timeout.get("connect"),
sock_read=timeout.get("read"),
connect=timeout.get("pool"),
),
).__aenter__()
return httpx.Response(
status_code=re
CEB7
sponse.status,
headers=response.headers,
content=AiohttpResponseStream(response),
request=request,
)
async def main():
async with ClientSession() as aiohttp_client:
async with httpx.AsyncClient(
transport=AiohttpTransport(aiohttp_client)
) as client:
async with client.stream(
"GET",
"https://www.encode.io",
) as resp:
async for chunk in resp.aiter_bytes():
print(chunk)
asyncio.run(main()) So, httpx itself isn't slow, but it can use a slow backend, which I hope will be improved soon. You can connect any library to httpx as its transport layer, but if you're using high-level libraries that handle more than just I/O—such as authentication, cookies, or retries—you should disable those features, as they are the responsibility of httpx I noticed that there is an implementation with aiohttp transport by @lizeyan, but it doesn't seem to support streaming. Also, I found it a bit more complex than necessary. Feel free to use this example to improve it! |
Thank you @karpetrosyan ! Would you be willing to publish that as its own repo / PyPI package? I'm not sure I could reasonably recommend many users of |
It has been isolated in a standalone repository. Feel free to raise an issue if you find one. |
Deleting some of the conversation here as spam. |
Not going to waste time justifying why I will not allow contributors who deliberately try to blur the line between genuine analysis and deliberate time-wasting. I'm going to have to assume you're in this category. Any further time wasters will be blocked as required. |
@karpetrosyan As you mentioned, httpx itself isn’t slow, but it uses a slow backend. After I replaced that slow backend, the benchmark results completely changed. fastapi_benchmark_api.py import random
import time
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
items_db = {}
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
@app.get("/items")
def welcome_items():
return {"message": "Welcome to the Items API"}
@app.get("/items/{item_id}")
def read_item(item_id: int):
time.sleep(random.uniform(0, 0.05))
item = items_db.get(item_id)
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
return item
@app.post("/items", status_code=201)
def create_item(item: Item):
time.sleep(random.uniform(0, 0.05))
item_id = len(items_db) + 1
items_db[item_id] = item.model_dump()
return {"item_id": item_id, **item.model_dump()}
@app.put("/items/{item_id}")
def update_item(item_id: int, item: Item):
time.sleep(random.uniform(0, 0.05))
if item_id not in items_db:
raise HTTPException(status_code=404, detail="Item not found")
items_db[item_id] = item.model_dump()
return {"item_id": item_id, **item.model_dump()}
@app.delete("/items/{item_id}")
def delete_item(item_id: int):
time.sleep(random.uniform(0, 0.05))
if item_id not in items_db:
raise HTTPException(status_code=404, detail="Item not found")
del items_db[item_id]
return {"detail": "Item deleted"} httpx_aiohttp_opt.py import asyncio
import time
import typing
import aiohttp
import httpx
from httpx import AsyncByteStream, AsyncBaseTransport, Request, Response
ASYNC_URL = "http://127.0.0.1:8000/items"
ASYNC_TOTAL_REQUESTS = 5000
ASYNC_CONCURRENCY = 200
SYNC_URL = "http://127.0.0.1:8000/items"
SYNC_TOTAL_REQUESTS = 200
class AiohttpResponseStream(AsyncByteStream):
CHUNK_SIZE = 1024
def __init__(self, aiohttp_response: aiohttp.ClientResponse) -> None:
self._aiohttp_response = aiohttp_response
async def __aiter__(self) -> typing.AsyncIterator[bytes]:
async for chunk in self._aiohttp_response.content.iter_chunked(self.CHUNK_SIZE):
yield chunk
async def aclose(self) -> None:
await self._aiohttp_response.__aexit__(None, None, None)
class AiohttpTransport(AsyncBaseTransport):
def __init__(self, session: aiohttp.ClientSession) -> None:
self.session = session
async def handle_async_request(self, request: Request) -> Response:
timeout_config = request.extensions.get("timeout", {})
try:
aiohttp_response = await self.session.request(
method=request.method,
url=str(request.url),
headers=request.headers,
data=request.content,
allow_redirects=False,
timeout=timeout_config.get("connect", 10.0),
).__aenter__()
except Exception as e:
raise e
return Response(
status_code=aiohttp_response.status,
headers=aiohttp_response.headers,
content=AiohttpResponseStream(aiohttp_response),
request=request,
)
async def httpx_test(total_requests: int = ASYNC_TOTAL_REQUESTS,
concurrency: int = ASYNC_CONCURRENCY,
url: str = ASYNC_URL) -> None:
limits = httpx.Limits(max_connections=200, max_keepalive_connections=100)
connector = aiohttp.TCPConnector(limit=200, keepalive_timeout=30)
aiohttp_session = aiohttp.ClientSession(connector=connector)
transport = AiohttpTransport(aiohttp_session)
async with httpx.AsyncClient(
limits=limits,
follow_redirects=False,
transport=transport
) as client:
sem = asyncio.Semaphore(concurrency)
async def one_request() -> int | None:
retries_left = 2
while retries_left >= 0:
async with sem:
try:
resp = await client.get(url, timeout=10.0)
return resp.status_code
except (httpx.ReadError, httpx.ConnectError, httpx.RemoteProtocolError,
aiohttp.ClientError):
retries_left -= 1
if retries_left < 0:
return None
await asyncio.sleep(0.5)
return None
start = time.perf_counter()
tasks = [one_request() for _ in range(total_requests)]
results = await asyncio.gather(*tasks)
end = time.perf_counter()
success = sum(r == 200 for r in results if r is not None)
print(f"HTTPX (async): {end - start:.2f}s, success={success}/{total_requests}")
await aiohttp_session.close()
async def aiohttp_test(total_requests: int = ASYNC_TOTAL_REQUESTS,
concurrency: int = ASYNC_CONCURRENCY,
url: str = ASYNC_URL) -> None:
connector = aiohttp.TCPConnector(limit=200, keepalive_timeout=30)
async with aiohttp.ClientSession(connector=connector) as session:
sem = asyncio.Semaphore(concurrency)
async def one_request() -> int | None:
retries_left = 2
while retries_left >= 0:
async with sem:
try:
async with session.get(url, timeout=10.0) as resp:
return resp.status
except (aiohttp.ClientError, asyncio.TimeoutError):
retries_left -= 1
if retries_left < 0:
return None
await asyncio.sleep(0.5)
return None
start = time.perf_counter()
tasks = [one_request() for _ in range(total_requests)]
results = await asyncio.gather(*tasks)
end = time.perf_counter()
success = sum(r == 200 for r in results if r is not None)
print(f"AIOHTTP (async): {end - start:.2f}s, success={success}/{total_requests}")
def sync_httpx_test(total_requests: int = SYNC_TOTAL_REQUESTS,
url: str = SYNC_URL) -> None:
start = time.perf_counter()
with httpx.Client() as client:
success = 0
for _ in range(total_requests):
try:
r = client.get(url, timeout=10.0)
if r.status_code == 200:
success += 1
except httpx.RequestError:
pass
end = time.perf_counter()
print(f"HTTPX (sync): {end - start:.2f}s, success={success}/{total_requests}")
class MinimalAiohttpHttpClient:
def __init__(self) -> None:
self.loop: asyncio.AbstractEventLoop | None = None
self.shared_loop = asyncio.new_event_loop()
self.session: aiohttp.ClientSession | None = None
def __enter__(self) -> "MinimalAiohttpHttpClient":
if not self.loop:
self.loop = self.shared_loop
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self._init_session())
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
async def _init_session(self) -> None:
self.session = aiohttp.ClientSession()
def close(self) -> None:
if self.session:
assert self.loop is not None
self.loop.run_until_complete(self.session.close())
self.session = None
if self.loop:
self.loop.close()
self.loop = None
async def _get(self, full_url: str) -> tuple[int, str]:
assert self.session is not None
async with self.session.get(full_url, timeout=10.0) as resp:
txt = await resp.text()
return resp.status, txt
def sync_get(self, full_url: str) -> tuple[int, str]:
assert self.loop is not None
return self.loop.run_until_complete(self._get(full_url))
def sync_aiohttp_test(total_requests: int = SYNC_TOTAL_REQUESTS,
url: str = SYNC_URL) -> None:
start = time.perf_counter()
success = 0
with MinimalAiohttpHttpClient() as client:
for _ in range(total_requests):
try:
status, _txt = client.sync_get(url)
if status == 200:
success += 1
except (aiohttp.ClientError, asyncio.TimeoutError):
pass
end = time.perf_counter()
print(f"AIOHTTP (sync): {end - start:.2f}s, success={success}/{total_requests}")
async def async_main() -> None:
print("=== ASYNC BENCHMARK ===")
await httpx_test()
await aiohttp_test()
def main() -> None:
print("=== SYNC BENCHMARK ===")
sync_httpx_test()
sync_aiohttp_test()
print()
asyncio.run(async_main())
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
pass Run benchmark, the test results are as follows: uvicorn fastapi_benchmark_api:app --host 0.0.0.0 --port 8000 INFO: Started server process [9238] python httpx_aiohttp_opt.py === SYNC BENCHMARK === === ASYNC BENCHMARK === |
It's great to see—thanks for the benchmark! So yes, for users facing performance issues, this is a really easy way to resolve the problem: delegate connection pooling and socket-level messaging to aiohttp while handling the rest with httpx, using its amazing API. |
I know I might sound stupid for asking that. But does using HTTPX with aiohttp for transport still allows us to make requests over http/2? Since aiohttp has no support for that. |
I was indeed stupid for asking. HTTP/2 support is not preserved. Also for some reason this small snippet made on top of the benchmark transport solution raises an exception. import asyncio
import typing
import aiohttp
import httpx
from httpx import AsyncByteStream, AsyncBaseTransport, Request, Response
class AiohttpResponseStream(AsyncByteStream):
CHUNK_SIZE = 1024
def __init__(self, aiohttp_response: aiohttp.ClientResponse) -> None:
self._aiohttp_response = aiohttp_response
async def __aiter__(self) -> typing.AsyncIterator[bytes]:
async for chunk in self._aiohttp_response.content.iter_chunked(self.CHUNK_SIZE):
yield chunk
async def aclose(self) -> None:
await self._aiohttp_response.__aexit__(None, None, None)
class AiohttpTransport(AsyncBaseTransport):
def __init__(self, session: aiohttp.ClientSession) -> None:
self.session = session
async def handle_async_request(self, request: Request) -> Response:
timeout_config = request.extensions.get("timeout", {})
try:
aiohttp_response = await self.session.request(
method=request.method,
url=str(request.url),
headers=request.headers,
data=request.content,
allow_redirects=False,
timeout=timeout_config.get("connect", 10.0),
).__aenter__()
except Exception as e:
raise e
return Response(
status_code=aiohttp_response.status,
headers=aiohttp_response.headers,
content=AiohttpResponseStream(aiohttp_response),
request=request)
async def main() -> None:
client = httpx.AsyncClient(transport=AiohttpTransport(aiohttp.ClientSession()))
response = await client.get("https://pokeapi.co/api/v2/pokemon/ditto")
print(response.json())
if __name__ == "__main__":
asyncio.run(main())
I think this is worth investigating. |
import asyncio
import json
import httpx
from typing import AsyncIterator
from aiohttp import ClientSession, ClientTimeout, ClientResponse
from httpx import AsyncByteStream, AsyncBaseTransport, Request, Response
class AiohttpResponseStream(AsyncByteStream):
CHUNK_SIZE = 1024
def __init__(self, aiohttp_response: ClientResponse) -> None:
self._aiohttp_response = aiohttp_response
async def __aiter__(self) -> AsyncIterator[bytes]:
async for chunk in self._aiohttp_response.content.iter_chunked(self.CHUNK_SIZE):
yield chunk
async def aclose(self) -> None:
await self._aiohttp_response.__aexit__(None, None, None)
class AiohttpTransport(AsyncBaseTransport):
def __init__(self, session: ClientSession) -> None:
self.session = session
async def handle_async_request(self, request: Request) -> Response:
timeout_config = request.extensions.get("timeout", {})
response = await self.session.request(
method=request.method,
url=str(request.url),
headers=request.headers,
data=request.content,
allow_redirects=False,
auto_decompress=False,
compress=False,
timeout=ClientTimeout(
sock_connect=timeout_config.get("connect"),
sock_read=timeout_config.get("read"),
connect=timeout_config.get("pool"),
),
).__aenter__()
return Response(
status_code=response.status,
headers=response.headers,
content=AiohttpResponseStream(response),
request=request,
)
async def main() -> None:
async with ClientSession() as session:
transport = AiohttpTransport(session)
async with httpx.AsyncClient(transport=transport) as client:
response = await client.get("https://pokeapi.co/api/v2/pokemon/ditto")
print(json.dumps(response.json(), indent=2))
if __name__ == "__main__":
asyncio.run(main()) |
If anyone is interested, found a while back an alternative when you absolutely want http2 onward. I am sure talented people can optimize it further in no time for http1. did not run it with http2 or http3 through. |
It looks like niquests uses urllib3-future for backend, maybe someone more knowledgeable can write a new transport backend using this library and test the performance? |
Uh oh!
There was an error while loading. Please reload this page.
There seems to be some performance issues in
httpx
(0.27.0) as it has much worse performance thanaiohttp
(3.9.4) with concurrently running requests (in python 3.12). The following benchmark shows how running 20 requests concurrently is over 10x slower withhttpx
compared toaiohttp
. The benchmark has very basichttpx
usage for doing multiple GET requests with limited concurrency. The script outputs a figure showing how duration of each GET request has a huge duration variance withhttpx
.I found the following issue but seems its not related as the workaround doesnt make a difference here #838 (comment)
The text was updated successfully, but these errors were encountered: