8000 Improve async performance. · Issue #3215 · encode/httpx · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Improve async performance. #3215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
MarkusSintonen opened this issue Jun 4, 2024 · 62 comments
Open

Improve async performance. #3215

MarkusSintonen opened this issue Jun 4, 2024 · 62 comments
Labels
perf Issues relating to performance

Comments

@MarkusSintonen
Copy link
MarkusSintonen commented Jun 4, 2024

There seems to be some performance issues in httpx (0.27.0) as it has much worse performance than aiohttp (3.9.4) with concurrently running requests (in python 3.12). The following benchmark shows how running 20 requests concurrently is over 10x slower with httpx compared to aiohttp. The benchmark has very basic httpx usage for doing multiple GET requests with limited concurrency. The script outputs a figure showing how duration of each GET request has a huge duration variance with httpx.

Figure_1

# requirements.txt:
# httpx == 0.27.0
# aiohttp == 3.9.4
# matplotlib == 3.9.0
# 
# 1. start server: python bench.py server
# 2. run client test: python bench.py client

import asyncio
import sys
from typing import Any, Coroutine, Iterator
import aiohttp
import time
import httpx
from aiohttp import web
import matplotlib.pyplot as plt


PORT = 1234
URL = f"http://localhost:{PORT}/req"
RESP = "a" * 2000
REQUESTS = 100
CONCURRENCY = 20


def run_web_server():
    async def handle(_request):
        return web.Response(text=RESP)

    app = web.Application()
    app.add_routes([web.get('/req', handle)])
    web.run_app(app, host="localhost", port=PORT)


def duration(start: float) -> int:
    return int((time.monotonic() - start) * 1000)


async def run_requests(axis: plt.Axes):
    async def gather_limited_concurrency(coros: Iterator[Coroutine[Any, Any, Any]]):
        sem = asyncio.Semaphore(CONCURRENCY)
        async def coro_with_sem(coro):
            async with sem:
                return await coro
        return await asyncio.gather(*(coro_with_sem(c) for c in coros))

    async def httpx_get(session: httpx.AsyncClient, timings: list[int]):
        start = time.monotonic()
        res = await session.request("GET", URL)
        assert len(await res.aread()) == len(RESP)
        assert res.status_code == 200, f"status_code={res.status_code}"
        timings.append(duration(start))

    async def aiohttp_get(session: aiohttp.ClientSession, timings: list[int]):
        start = time.monotonic()
        async with session.request("GET", URL) as res:
            assert len(await res.read()) == len(RESP)
            assert res.status == 200, f"status={res.status}"
        timings.append(duration(start))

    async with httpx.AsyncClient() as session:
        # warmup
        await asyncio.gather(*(httpx_get(session, []) for _ in range(REQUESTS)))

        timings = []
        start = time.monotonic()
        await gather_limited_concurrency((httpx_get(session, timings) for _ in range(REQUESTS)))
        axis.plot([*range(REQUESTS)], timings, label=f"httpx (tot={duration(start)}ms)")

    async with aiohttp.ClientSession() as session:
        # warmup
        await asyncio.gather(*(aiohttp_get(session, []) for _ in range(REQUESTS)))

        timings = []
        start = time.monotonic()
        await gather_limited_concurrency((aiohttp_get(session, timings) for _ in range(REQUESTS)))
        axis.plot([*range(REQUESTS)], timings, label=f"aiohttp (tot={duration(start)}ms)")


def main(mode: str):
    assert mode in {"server", "client"}, f"invalid mode: {mode}"

    if mode == "server":
        run_web_server()
    else:
        fig, ax = plt.subplots()
        asyncio.run(run_requests(ax))
        plt.legend(loc="upper left")
        ax.set_xlabel("# request")
        ax.set_ylabel("[ms]")
        plt.show()

    print("DONE", flush=True)


if __name__ == "__main__":
    assert len(sys.argv) == 2, f"Usage: {sys.argv[0]} server|client"
    main(sys.argv[1])

I found the following issue but seems its not related as the workaround doesnt make a difference here #838 (comment)

@MarkusSintonen
Copy link
Author

Found some related discussions:

Opening a proper issue is warranted to get better visibility for this. So the issue is easier to find for others. In its current state httpx is not a good option for highly concurrent applications. Hopefully the issue gets fixed as otherwise the library is great, so thanks for it!

@tomchristie
Copy link
Member
tomchristie commented Jun 6, 2024

Oh, interesting. There's some places I can think of where we might want to be digging into here...

  • A comparison of threaded performance would also be worthwhile. requests compared against httpx, with multithreaded requests.
  • A comparison of performance against a remote server would be more representative than performance against localhost.

Possibly points of interest here...

  • Do we have the same socket options as aiohttp? Are we sending simple GET requests across more than one TCP packet unneccessarily, either due to socket options or due to our flow in writing the request to the stream, or both? Eg. see https://brooker.co.za/blog/2024/05/09/nagle.html
  • We're currently using h11 for our HTTP construction and parsing. This is the best python option for careful spec correctness, tho it has more CPU overhead than eg. httptools.
  • We're currently using anyio for our async support. We did previously have a native asyncio backend, there might be some CPU overhead to be saved here, which in this localhost case might be outweighing network overheads.
  • Also worth noting here that aiohttp currently supports DNS caching where httpx does not, although not relevant in this particular case.

Also, the tracing support in both aiohttp and in httpx are likely to be extremely valuable to us here.

@MarkusSintonen
8000 Copy link
Author
MarkusSintonen commented Jun 6, 2024

Thank you for the good points!

A comparison of performance against a remote server would be more representative than performance against localhost.

My original benchmark hit AWS S3. There I got very similar results where httpx had a huge variance with requests timings with concurrent requests. This investigation was due to us observing some strange requests durations when servers were under heavy load in production. For now we have switched to aiohttp and it seems to have fixed the issue.

@tomchristie
Copy link
Member

My original benchmark hit AWS S3. There I got very similar results [...]

Okay, thanks. Was that also testing small GET requests / similar approach to above?

@MarkusSintonen
Copy link
Author

Okay, thanks. Was that also testing small GET requests / similar approach to above?

Yes pretty much, GET of a file with size of a couple KB. In the real system the sizes ofcourse vary alot.

@MarkusSintonen
Copy link
Author
MarkusSintonen commented Jun 7, 2024

We're currently using anyio for our async s 8000 upport. We did previously have a native asyncio backend, there might be some CPU overhead to be saved here, which in this localhost case might be outweighing network overheads.

@tomchristie you were right, this is the issue ^!

When I just do a simple patch into httpcore to replace anyio.Lock with asyncio.Lock the performance improves greatly. Why does httpcore use AnyIO there instead of asyncio? Seems AnyIO may have some issues.

With asyncio:
asyncio

With anyio:
anyio

@MarkusSintonen
Copy link
Author
MarkusSintonen commented Jun 7, 2024

There is another hot spot in AsyncHTTP11Connection.has_expired which is called eg from AsyncConnectionPool heavily. This checks the connection status via this is_readable logic. That seems to be a particularly heavy check.

The logic in connection pool is quite heavy as it rechecks all of the connections every time requests are assigned to the connectors. It might be possible to skip the is_readable checks in the pool side if we just take a connector from the pool and take another if the picked connector was actually not healthy. Instead of checking them all every time. What do you think?

Probably it would be good idea to add some performance tests to httpx/httpcore CI.

@MarkusSintonen
Copy link
Author
MarkusSintonen commented Jun 7, 2024

I can probably help with a PR if you give me pointers about how to proceed :)

I could eg replace the synchronization primitives to use the native asyncio.

@tomchristie
Copy link
Member

Why does httpcore use AnyIO there instead of asyncio?

See encode/httpcore#344, #1511, and encode/httpcore#345 for where/why we switched over to anyio.

I can probably help with a PR if you give me pointers about how to proceed

A good first pass onto this would be to add an asyncio.py backend, without switching the default over.

You might want to work from the last version that had an asyncio native backend, although I think the backend API has probably changed slightly.

Docs... https://www.encode.io/httpcore/network-backends/


Other context...

@MarkusSintonen
Copy link
Author
MarkusSintonen commented Jun 8, 2024

Thanks @tomchristie

What about this case I pointed:

When I just do a simple patch into httpcore to replace anyio.Lock with asyncio.Lock the performance improves greatly

There switching network backend won't help as the lock is not defined by the network implementation. The lock implementation is a global one. Should we just change the synchronization to use asyncio?

@MarkusSintonen
Copy link
Author
MarkusSintonen commented Jun 10, 2024

I'm able to push the performance of httpcore to be exactly on par with aiohttp:
new

Previously (in httpcore master) the performance is not great and the latency behaves very randomly:
old

You can see the benchmark here.

Here are the changes. There are 3 things required to improve the performance to get it as fast as aiohttp (in separate commits):

  1. Commit 1. Change synchronization primitives (in _synchronization.py) to use asyncio and not anyio
  2. Commit 2. Bringing back asyncio-based backend which was removed in the past (AsyncIOStream)
  3. Commit 3. Optimize the AsyncConnectionPool to avoid calling the socket poll every time the pool is used. Also fixing idle connection checking to have lower time complexity for it

I'm happy to open a PR from these. What do you think @tomchristie?

@tomchristie
Copy link
Member

@MarkusSintonen - Nice one. Let's work through those as individual PRs.

Is it worth submitting a PR where we add a scripts/benchmark?

@MarkusSintonen
Copy link
Author

Is it worth submitting a PR where we add a scripts/benchmark?

I think it would be beneficial to have benchmark run in CI so we would see the difference. Previously I have contributed to Pydantic and they use codspeed. That outputs benchmark diffs to PR when the benchmarked behaviour changes. It should be free for open-source projects.

@tomchristie
Copy link
Member
tomchristie commented Jun 10, 2024

That's an interesting idea. I'd clearly be in agreement with adding a scripts/benchmark. I'm uncertain on if we'd want the extra CI runs everytime or not. Suggest proceeding with the uncontroversial progression to start with, and then afterwards figure out if/how to tie it into CI. (Reasonable?)

@MarkusSintonen
Copy link
Author
MarkusSintonen commented Jun 10, 2024

@tomchristie I have now opened the 2 fix PRs:

Maybe Ill open the network backend addition after these as its the most complex one.

@HuiDBK
Copy link
HuiDBK commented Jun 27, 2024

@rafalkrupinski
Copy link

Isn't usage of http.CookieJar a part of the problem?

self.jar = CookieJar()

https://github.com/python/cpython/blob/68e279b37aae3019979a05ca55f462b11aac14be/Lib/http/cookiejar.py#L1266

@MarkusSintonen
Copy link
Author

Isn't usage of http.CookieJar a part of the problem?

@rafalkrupinski I haven't run benchmarks when requests/responses uses cookies but atleast it doesnt cause performance issues in general. I run similar benchmarks from httpcore side with httpx. Performance is at similar levels as with aiohttp and urllib3 when using the performance fixes from the PRs:

(Waiting for review from @tomchristie)

Async (httpx vs aiohttp):
async

Sync (httpx vs urllib3):
sync

@rafalkrupinski
Copy link

TBH I'm surprised by httpx ditching anyio. Sure anyio comes with performance overhead, but this is breaking compatibility with Trio.

@MarkusSintonen
Copy link
Author
MarkusSintonen commented Jul 10, 2024

TBH I'm surprised by httpx ditching anyio. Sure anyio comes with performance overhead, but this is breaking compatibility with Trio.

I'm not aware of it ditching it completely. It will still support using it, it's just optional. Trio will be also supported by httpcore.

@rafalkrupinski
Copy link

@rafalkrupinski I haven't run benchmarks when requests/responses uses cookies but atleast it doesnt cause performance issues in general

These are really cool speed-ups. Can't wait for httpx to overtake aiohttp ;)

@tirkarthi
Copy link

Since the benchmark seems to be using http I think below is also a related issue where creation of ssl context in httpx had some overhead compared to aiohttp.

Ref : #838

@lizeyan
Copy link
lizeyan commented Jan 16, 2025

Hello guys, I think I've encountered the same issue. However, our production code heavily relies on httpx, and our tests depend on respx, making it difficult to migrate to aiohttp. If anyone has faced similar challenges, I think there's a workaround: take advantage of httpx's custom transport capability to use aiohttp for the actual requests:

import asyncio
import typing
import time
import aiohttp
from aiohttp import ClientSession
import httpx
from concurrent.futures import ProcessPoolExecutor
import statistics

ADDRESS = "https://www.baidu.com"

async def request_with_aiohttp(session):
    async with session.get(ADDRESS) as rsp:
        return await rsp.text()

async def request_with_httpx(client):
    rsp = await client.get(ADDRESS)
    return rsp.text

# 性能测试函数
async def benchmark_aiohttp(n):
    async with ClientSession() as session:
        # make sure code is right
        print(await request_with_aiohttp(session))
        start = time.time()
        tasks = []
        for i in range(n):
            tasks.append(request_with_aiohttp(session))
        await asyncio.gather(*tasks)
        return time.time() - start

async def benchmark_httpx(n):
    async with httpx.AsyncClient(
        timeout=httpx.Timeout(
            timeout=10,
        ),
    ) as client:
        # make sure code is right
        print(await request_with_httpx(client))

        start = time.time()
        tasks = []
        for i in range(n):
            tasks.append(request_with_httpx(client))
        await asyncio.gather(*tasks)
        return time.time() - start
    
class AiohttpTransport(httpx.AsyncBaseTransport):
    def __init__(self, session: typing.Optional[aiohttp.ClientSession] = None):
        self._session = session or aiohttp.ClientSession()
        self._closed = False

    async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
        if self._closed:
            raise RuntimeError("Transport is closed")

        # 转换headers
        headers = dict(request.headers)
        
        # 准备请求参数
        method = request.method
        url = str(request.url)
        content = request.content
        
        async with self._session.request(
            method=method,
            url=url,
            headers=headers,
            data=content,
            allow_redirects=False,
        ) as aiohttp_response:
            # 读取响应内容
            content = await aiohttp_response.read()
            
            # 转换headers
            headers = [(k.lower(), v) for k, v in aiohttp_response.headers.items()]
            
            # 构建httpx.Response
            return httpx.Response(
                status_code=aiohttp_response.status,
                headers=headers,
                content=content,
                request=request
            )

    async def aclose(self):
        if not self._closed:
            self._closed = True
            await self._session.close()


async def benchmark_httpx_with_aiohttp_transport(n):
    async with httpx.AsyncClient(
        timeout=httpx.Timeout(
            timeout=10,
        ),
        transport=AiohttpTransport(),
    ) as client:
        start = time.time()
        tasks = []
        for i in range(n):
            tasks.append(request_with_httpx(client))
        await asyncio.gather(*tasks)
        return time.time() - start
    

async def run_benchmark(requests=1000, rounds=3):
    aiohttp_times = []
    httpx_times = []
    httpx_aio_times = []
    
    print(f"开始测试 {requests} 并发请求...")
    
    for i in range(rounds):
        print(f"\n{i+1} 轮测试:")
        
        # aiohttp 测试
        aiohttp_time = await benchmark_aiohttp(requests)
        aiohttp_times.append(aiohttp_time)
        print(f"aiohttp 耗时: {aiohttp_time:.2f} 秒")
        
        # 短暂暂停让系统冷却
        await asyncio.sleep(1)
        
        # httpx 测试
        httpx_time = await benchmark_httpx(requests)
        httpx_times.append(httpx_time)
        print(f"httpx 耗时: {httpx_time:.2f} 秒")

        # 短暂暂停让系统冷却
        await asyncio.sleep(1)
        
        # httpx 测试
        httpx_time = await benchmark_httpx_with_aiohttp_transport(requests)
        httpx_aio_times.append(httpx_time)
        print(f"httpx (aiohttp transport) 耗时: {httpx_time:.2f} 秒")
    
    print("\n测试结果汇总:")
    print(f"aiohttp 平均耗时: {statistics.mean(aiohttp_times):.2f} 秒")
    print(f"httpx 平均耗时: {statistics.mean(httpx_times):.2f} 秒")
    print(f"httpx aio 平均耗时: {statistics.mean(httpx_aio_times):.2f} 秒")

if __name__ == '__main__':
    # 运行基准测试
    asyncio.run(run_benchmark(512))
测试结果汇总:
aiohttp 平均耗时: 0.49 秒
httpx 平均耗时: 1.55 秒
httpx aio 平均耗时: 0.51 秒

Thanks for this.

Had a couple of questions

  1. Is there anythign you have to to get the respx mock to work? Been banging my head against the wall trying to get it to work
  2. How could we make this work with other mock httpx libraries (httpx mock)
  3. How does this behave with (raise_for_status). (I believe the raise for status happens post response but just wanted to verify)
  4. Do we have to handle aiohttp errors that happen at the transport layer?

Again thanks for the work around!

Here is a more complete version: https://github.com/lizeyan/httpx-AIOHttpTransport/
It supports respx (see readme) and handles aiohttp errors (map to httpx errors)

@day-mon
Copy link
day-mon commented Feb 7, 2025

@tomchristie Can you provide an update on this or link me to something that I can track?

@4uku
Copy link
4uku commented Feb 24, 2025

@MarkusSintonen Will you be running new tests after the last changes?

@MarkusSintonen
Copy link
Author
MarkusSintonen commented Feb 24, 2025

@MarkusSintonen Will you be running new tests after the last changes?

What changes @4uku? 🤔 I'm not seeing anything in httpcore/httpx related to the perf issues.

@johannesloibl
Copy link

Hey, have the proposals from @MarkusSintonen been integrated yet? When can we expect them to be released? :)

@MarkusSintonen
Copy link
Author

Hey, have the proposals from @MarkusSintonen been integrated yet? When can we expect them to be released? :)

Nope, unfortunately this project seems fairly dead. We have already moved away from httpx to aiohttp because of performance issues.

@MarcBresson
Copy link

Thank you @MarkusSintonen for your comment and update. My team and I just acted the switch to aiohttp too.

@johannesloibl
Copy link

Thank you @MarkusSintonen for your comment and update. My team and I just acted the switch to aiohttp too.

Me too, sad. But the performance is wonderful now^^

@tuukkamustonen
Copy link

Anyone (managed to) run aiohttp as the HTTPX transport, as shown by @lizeyan above? What are your experiences and thoughts?

@conradogarciaberrotaran
Copy link

@tuukkamustonen I found some issues on that implementation and created my own. The problems are mostly in the exception mapping as he's not considering the inheritance tree and using isinstance to do the mapping.

Haven't deployed it yet, but locally it's working well. I will update here once I test it.

@MtkN1
Copy link
MtkN1 commented Mar 6, 2025

I have created a forked repository that merges the PR from @MarkusSintonen:
https://github.com/MtkN1/httpcore-speedups

I have also deployed a package index as a Simple repository API on GitHub Pages:
https://mtkn1.github.io/httpcore-speedups/

You can easily replace the httpcore dependency using a tool like uv:

$ uv add httpcore --index httpcore-speedups=https://mtkn1.github.io/httpcore-speedups/simple/

However, please note that this is a short-term solution and an independent effort.

@karpetrosyan
Copy link
Member

Note that httpx is a high-level HTTP client that does not interact with I/O directly. Instead, it relies on another library for I/O operations. You can think of httpx as a sans-I/O library, though it comes with a default I/O backend that can be easily replaced. If the default I/O library (httpcore) does not suit your needs (perhaps you find it too slow), you can swap the underlying HTTP implementation for another one. There is already an example of httpx with urllib3 here, and below is an example of httpx with aiohttp

from aiohttp import ClientTimeout
from aiohttp.client import ClientSession, ClientResponse
import httpx
import asyncio

from httpx import Request, Response, AsyncByteStream
import typing


class AiohttpResponseStream(AsyncByteStream):
    CHUNK_SIZE = 1024

    def __init__(self, aiohttp_response: ClientResponse) -> None:
        self._aiohttp_response = aiohttp_response

    async def __aiter__(self) -> typing.AsyncIterator[bytes]:
        async for chunk in self._aiohttp_response.content.iter_chunked(self.CHUNK_SIZE):
            yield chunk

    async def aclose(self) -> None:
        await self._aiohttp_response.__aexit__(None, None, None)


class AiohttpTransport(httpx.AsyncBaseTransport):
    def __init__(self, client: ClientSession) -> None:
        self.client = client

    async def handle_async_request(
        self,
        request: Request,
    ) -> Response:
        timeout = request.extensions.get("timeout", {})

        response = await self.client.request(
            method=request.method,
            url=str(request.url),
            headers=request.headers,
            data=request.content,
            allow_redirects=False,
            auto_decompress=False,
            compress=False,
            timeout=ClientTimeout(
                sock_connect=timeout.get("connect"),
                sock_read=timeout.get("read"),
                connect=timeout.get("pool"),
            ),
        ).__aenter__()

        return httpx.Response(
            status_code=re
CEB7
sponse.status,
            headers=response.headers,
            content=AiohttpResponseStream(response),
            request=request,
        )


async def main():
    async with ClientSession() as aiohttp_client:
        async with httpx.AsyncClient(
            transport=AiohttpTransport(aiohttp_client)
        ) as client:
            async with client.stream(
                "GET",
                "https://www.encode.io",
            ) as resp:
                async for chunk in resp.aiter_bytes():
                    print(chunk)


asyncio.run(main())

So, httpx itself isn't slow, but it can use a slow backend, which I hope will be improved soon. You can connect any library to httpx as its transport layer, but if you're using high-level libraries that handle more than just I/O—such as authentication, cookies, or retries—you should disable those features, as they are the responsibility of httpx

I noticed that there is an implementation with aiohttp transport by @lizeyan, but it doesn't seem to support streaming. Also, I found it a bit more complex than necessary. Feel free to use this example to improve it!

@rattrayalex
Copy link

Thank you @karpetrosyan ! Would you be willing to publish that as its own repo / PyPI package? I'm not sure I could reasonably recommend many users of openai to vendor something like this for production usage, for example…

@karpetrosyan
Copy link
Member

It has been isolated in a standalone repository. Feel free to raise an issue if you find one.

@encode encode deleted a comment from binbjz Mar 10, 2025
@tomchristie
Copy link
Member

Deleting some of the conversation here as spam.

@tomchristie
Copy link
Member

Not going to waste time justifying why I will not allow contributors who deliberately try to blur the line between genuine analysis and deliberate time-wasting. I'm going to have to assume you're in this category.

Any further time wasters will be blocked as required.

@binbjz
Copy link
binbjz commented Mar 11, 2025

@karpetrosyan As you mentioned, httpx itself isn’t slow, but it uses a slow backend. After I replaced that slow backend, the benchmark results completely changed.

fastapi_benchmark_api.py

import random
import time
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

items_db = {}


class Item(BaseModel):
    name: str
    description: str | None = None
    price: float
    tax: float | None = None


@app.get("/items")
def welcome_items():
    return {"message": "Welcome to the Items API"}


@app.get("/items/{item_id}")
def read_item(item_id: int):
    time.sleep(random.uniform(0, 0.05))
    item = items_db.get(item_id)
    if item is None:
        raise HTTPException(status_code=404, detail="Item not found")
    return item


@app.post("/items", status_code=201)
def create_item(item: Item):
    time.sleep(random.uniform(0, 0.05))
    item_id = len(items_db) + 1
    items_db[item_id] = item.model_dump()
    return {"item_id": item_id, **item.model_dump()}


@app.put("/items/{item_id}")
def update_item(item_id: int, item: Item):
    time.sleep(random.uniform(0, 0.05))
    if item_id not in items_db:
        raise HTTPException(status_code=404, detail="Item not found")
    items_db[item_id] = item.model_dump()
    return {"item_id": item_id, **item.model_dump()}


@app.delete("/items/{item_id}")
def delete_item(item_id: int):
    time.sleep(random.uniform(0, 0.05))
    if item_id not in items_db:
        raise HTTPException(status_code=404, detail="Item not found")
    del items_db[item_id]
    return {"detail": "Item deleted"}

httpx_aiohttp_opt.py

import asyncio
import time
import typing
import aiohttp
import httpx
from httpx import AsyncByteStream, AsyncBaseTransport, Request, Response

ASYNC_URL = "http://127.0.0.1:8000/items"
ASYNC_TOTAL_REQUESTS = 5000
ASYNC_CONCURRENCY = 200

SYNC_URL = "http://127.0.0.1:8000/items"
SYNC_TOTAL_REQUESTS = 200


class AiohttpResponseStream(AsyncByteStream):
    CHUNK_SIZE = 1024

    def __init__(self, aiohttp_response: aiohttp.ClientResponse) -> None:
        self._aiohttp_response = aiohttp_response

    async def __aiter__(self) -> typing.AsyncIterator[bytes]:
        async for chunk in self._aiohttp_response.content.iter_chunked(self.CHUNK_SIZE):
            yield chunk

    async def aclose(self) -> None:
        await self._aiohttp_response.__aexit__(None, None, None)


class AiohttpTransport(AsyncBaseTransport):
    def __init__(self, session: aiohttp.ClientSession) -> None:
        self.session = session

    async def handle_async_request(self, request: Request) -> Response:
        timeout_config = request.extensions.get("timeout", {})
        try:
            aiohttp_response = await self.session.request(
                method=request.method,
                url=str(request.url),
                headers=request.headers,
                data=request.content,
                allow_redirects=False,
                timeout=timeout_config.get("connect", 10.0),
            ).__aenter__()
        except Exception as e:
            raise e

        return Response(
            status_code=aiohttp_response.status,
            headers=aiohttp_response.headers,
            content=AiohttpResponseStream(aiohttp_response),
            request=request,
        )


async def httpx_test(total_requests: int = ASYNC_TOTAL_REQUESTS,
                     concurrency: int = ASYNC_CONCURRENCY,
                     url: str = ASYNC_URL) -> None:
    limits = httpx.Limits(max_connections=200, max_keepalive_connections=100)

    connector = aiohttp.TCPConnector(limit=200, keepalive_timeout=30)
    aiohttp_session = aiohttp.ClientSession(connector=connector)

    transport = AiohttpTransport(aiohttp_session)

    async with httpx.AsyncClient(
            limits=limits,
            follow_redirects=False,
            transport=transport
    ) as client:
        sem = asyncio.Semaphore(concurrency)

        async def one_request() -> int | None:
            retries_left = 2
            while retries_left >= 0:
                async with sem:
                    try:
                        resp = await client.get(url, timeout=10.0)
                        return resp.status_code
                    except (httpx.ReadError, httpx.ConnectError, httpx.RemoteProtocolError,
                            aiohttp.ClientError):
                        retries_left -= 1
                        if retries_left < 0:
                            return None
                        await asyncio.sleep(0.5)
            return None

        start = time.perf_counter()
        tasks = [one_request() for _ in range(total_requests)]
        results = await asyncio.gather(*tasks)
        end = time.perf_counter()
        success = sum(r == 200 for r in results if r is not None)
        print(f"HTTPX (async): {end - start:.2f}s, success={success}/{total_requests}")

    await aiohttp_session.close()


async def aiohttp_test(total_requests: int = ASYNC_TOTAL_REQUESTS,
                       concurrency: int = ASYNC_CONCURRENCY,
                       url: str = ASYNC_URL) -> None:
    connector = aiohttp.TCPConnector(limit=200, keepalive_timeout=30)
    async with aiohttp.ClientSession(connector=connector) as session:
        sem = asyncio.Semaphore(concurrency)

        async def one_request() -> int | None:
            retries_left = 2
            while retries_left >= 0:
                async with sem:
                    try:
                        async with session.get(url, timeout=10.0) as resp:
                            return resp.status
                    except (aiohttp.ClientError, asyncio.TimeoutError):
                        retries_left -= 1
                        if retries_left < 0:
                            return None
                        await asyncio.sleep(0.5)
            return None

        start = time.perf_counter()
        tasks = [one_request() for _ in range(total_requests)]
        results = await asyncio.gather(*tasks)
        end = time.perf_counter()
        success = sum(r == 200 for r in results if r is not None)
        print(f"AIOHTTP (async): {end - start:.2f}s, success={success}/{total_requests}")


def sync_httpx_test(total_requests: int = SYNC_TOTAL_REQUESTS,
                    url: str = SYNC_URL) -> None:
    start = time.perf_counter()
    with httpx.Client() as client:
        success = 0
        for _ in range(total_requests):
            try:
                r = client.get(url, timeout=10.0)
                if r.status_code == 200:
                    success += 1
            except httpx.RequestError:
                pass
    end = time.perf_counter()
    print(f"HTTPX (sync): {end - start:.2f}s, success={success}/{total_requests}")


class MinimalAiohttpHttpClient:
    def __init__(self) -> None:
        self.loop: asyncio.AbstractEventLoop | None = None
        self.shared_loop = asyncio.new_event_loop()
        self.session: aiohttp.ClientSession | None = None

    def __enter__(self) -> "MinimalAiohttpHttpClient":
        if not self.loop:
            self.loop = self.shared_loop
            asyncio.set_event_loop(self.loop)
            self.loop.run_until_complete(self._init_session())
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        self.close()

    async def _init_session(self) -> None:
        self.session = aiohttp.ClientSession()

    def close(self) -> None:
        if self.session:
            assert self.loop is not None
            self.loop.run_until_complete(self.session.close())
            self.session = None
        if self.loop:
            self.loop.close()
            self.loop = None

    async def _get(self, full_url: str) -> tuple[int, str]:
        assert self.session is not None
        async with self.session.get(full_url, timeout=10.0) as resp:
            txt = await resp.text()
            return resp.status, txt

    def sync_get(self, full_url: str) -> tuple[int, str]:
        assert self.loop is not None
        return self.loop.run_until_complete(self._get(full_url))


def sync_aiohttp_test(total_requests: int = SYNC_TOTAL_REQUESTS,
                      url: str = SYNC_URL) -> None:
    start = time.perf_counter()
    success = 0
    with MinimalAiohttpHttpClient() as client:
        for _ in range(total_requests):
            try:
                status, _txt = client.sync_get(url)
                if status == 200:
                    success += 1
            except (aiohttp.ClientError, asyncio.TimeoutError):
                pass
    end = time.perf_counter()
    print(f"AIOHTTP (sync): {end - start:.2f}s, success={success}/{total_requests}")


async def async_main() -> None:
    print("=== ASYNC BENCHMARK ===")
    await httpx_test()
    await aiohttp_test()


def main() -> None:
    print("=== SYNC BENCHMARK ===")
    sync_httpx_test()
    sync_aiohttp_test()

    print()
    asyncio.run(async_main())


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        pass

Run benchmark, the test results are as follows:

uvicorn fastapi_benchmark_api:app --host 0.0.0.0 --port 8000

INFO: Started server process [9238]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)

python httpx_aiohttp_opt.py

=== SYNC BENCHMARK ===
HTTPX (sync): 0.75s, success=200/200
AIOHTTP (sync): 0.07s, success=200/200

=== ASYNC BENCHMARK ===
HTTPX (async): 0.75s, success=5000/5000
AIOHTTP (async): 0.54s, success=5000/5000

@karpetrosyan
Copy link
Member

It's great to see—thanks for the benchmark! So yes, for users facing performance issues, this is a really easy way to resolve the problem: delegate connection pooling and socket-level messaging to aiohttp while handling the rest with httpx, using its amazing API.

@UnknownAPI
Copy link

I know I might sound stupid for asking that. But does using HTTPX with aiohttp for transport still allows us to make requests over http/2? Since aiohttp has no support for that.

@UnknownAPI
Copy link

I was indeed stupid for asking. HTTP/2 support is not preserved. Also for some reason this small snippet made on top of the benchmark transport solution raises an exception.

import asyncio
import typing
import aiohttp
import httpx
from httpx import AsyncByteStream, AsyncBaseTransport, Request, Response


class AiohttpResponseStream(AsyncByteStream):
    CHUNK_SIZE = 1024

    def __init__(self, aiohttp_response: aiohttp.ClientResponse) -> None:
        self._aiohttp_response = aiohttp_response

    async def __aiter__(self) -> typing.AsyncIterator[bytes]:
        async for chunk in self._aiohttp_response.content.iter_chunked(self.CHUNK_SIZE):
            yield chunk

    async def aclose(self) -> None:
        await self._aiohttp_response.__aexit__(None, None, None)


class AiohttpTransport(AsyncBaseTransport):
    def __init__(self, session: aiohttp.ClientSession) -> None:
        self.session = session

    async def handle_async_request(self, request: Request) -> Response:
        timeout_config = request.extensions.get("timeout", {})
        try:
            aiohttp_response = await self.session.request(
                method=request.method,
                url=str(request.url),
                headers=request.headers,
                data=request.content,
                allow_redirects=False,
                timeout=timeout_config.get("connect", 10.0),
            ).__aenter__()
        except Exception as e:
            raise e

        return Response(
            status_code=aiohttp_response.status,
            headers=aiohttp_response.headers,
            content=AiohttpResponseStream(aiohttp_response),
            request=request)


async def main() -> None:
    client = httpx.AsyncClient(transport=AiohttpTransport(aiohttp.ClientSession()))
    response = await client.get("https://pokeapi.co/api/v2/pokemon/ditto")
    print(response.json())


if __name__ == "__main__":
    asyncio.run(main())

httpx.DecodingError: Error -3 while decompressing data: incorrect header check

I think this is worth investigating.

@binbjz
Copy link
binbjz commented Mar 13, 2025

@UnknownAPI

import asyncio
import json
import httpx
from typing import AsyncIterator
from aiohttp import ClientSession, ClientTimeout, ClientResponse
from httpx import AsyncByteStream, AsyncBaseTransport, Request, Response


class AiohttpResponseStream(AsyncByteStream):
    CHUNK_SIZE = 1024

    def __init__(self, aiohttp_response: ClientResponse) -> None:
        self._aiohttp_response = aiohttp_response

    async def __aiter__(self) -> AsyncIterator[bytes]:
        async for chunk in self._aiohttp_response.content.iter_chunked(self.CHUNK_SIZE):
            yield chunk

    async def aclose(self) -> None:
        await self._aiohttp_response.__aexit__(None, None, None)


class AiohttpTransport(AsyncBaseTransport):
    def __init__(self, session: ClientSession) -> None:
        self.session = session

    async def handle_async_request(self, request: Request) -> Response:
        timeout_config = request.extensions.get("timeout", {})

        response = await self.session.request(
            method=request.method,
            url=str(request.url),
            headers=request.headers,
            data=request.content,
            allow_redirects=False,
            auto_decompress=False,
            compress=False,
            timeout=ClientTimeout(
                sock_connect=timeout_config.get("connect"),
                sock_read=timeout_config.get("read"),
                connect=timeout_config.get("pool"),
            ),
        ).__aenter__()

        return Response(
            status_code=response.status,
            headers=response.headers,
            content=AiohttpResponseStream(response),
            request=request,
        )


async def main() -> None:
    async with ClientSession() as session:
        transport = AiohttpTransport(session)
        async with httpx.AsyncClient(transport=transport) as client:
            response = await client.get("https://pokeapi.co/api/v2/pokemon/ditto")
            print(json.dumps(response.json(), indent=2))


if __name__ == "__main__":
    asyncio.run(main())

@jiwazz
Copy link
jiwazz commented Mar 14, 2025

If anyone is interested, found a while back an alternative when you absolutely want http2 onward.
Took by curiosity the initial script @MarkusSintonen and ran it with the alternative. aiohttp is quite good with http1.1

bench

I am sure talented people can optimize it further in no time for http1. did not run it with http2 or http3 through.

@AnghelRA
Copy link

It looks like niquests uses urllib3-future for backend, maybe someone more knowledgeable can write a new transport backend using this library and test the performance?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
perf Issues relating to performance
Projects
None yet
Development

No branches or pull requests

0