8000 Async Result builder by dbrattli · Pull Request #247 · dbrattli/Expression · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Async Result builder #247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions README.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@
is amazing stuff.
- **option** - an optional world for working with optional values.
- **result** - an error handling world for working with result values.
- **seq** - a world for working with sequences.
- **async_result** - an asynchronous error handling world for working
with asynchronous result values.
- **async_option** - an asynchronous optional world for working with
asynchronous optional values.
- **Mailbox Processor**: for lock free programming using the [Actor
model](https://en.wikipedia.org/wiki/Actor_model).
- **Cancellation Token**: for cancellation of asynchronous (and
Expand Down Expand Up @@ -461,6 +466,145 @@ def fn5() -> Generator[int, int, int]:
pinned to `Exception` i.e., `Result[TSource, Exception]`.
"""

# %% [markdown]
"""
### AsyncResult

The `AsyncResult[T, TError]` type is the asynchronous version of `Result`. It allows you
to compose asynchronous operations that may fail, using the Result type. This is
particularly useful for handling errors in asynchronous code, such as API calls,
database operations, or any other I/O-bound tasks.

Similar to the `Result` effect, AsyncResult enables "railway oriented programming" but
for asynchronous operations. If any part of the function yields an `Error`, the function
is short-circuited and the following statements will never be executed.
"""

# %%
from collections.abc import AsyncGenerator

from expression import Error, Ok, effect


@effect.async_result[int, str]()
async def fn() -> AsyncGenerator[int, int]:
x: int = yield 42 # Regular value
y: int = yield await Ok(43) # Awaitable Ok value

# Short-circuit if condition is met
if x + y > 80:
z: int = yield await Error("Value too large") # This will short-circuit
else:
z: int = yield 44

yield x + y + z # Final value


# This would be run in an async context
# result = await fn()
# assert result == Error("Value too large")

# %% [markdown]
"""
AsyncResult works well with other async functions and can be nested:
"""


# %%
@effect.async_result[int, str]()
async def inner(x: int) -> AsyncGenerator[int, int]:
y: int = yield x + 1
yield y + 1 # Final value is y + 1


@effect.async_result[int, str]()
async def outer() -> AsyncGenerator[int, int]:
x: int = yield 40

# Call inner and await its result
inner_result = await inner(x)
y: int = yield await inner_result

yield y # Final value is y


# This would be run in an async context
# result = await outer()
# assert result == Ok(42) # 40 -> 41 -> 42

# %% [markdown]
"""
A simplified type called `AsyncTry` is also available. It's an async result type that is
pinned to `Exception` i.e., `AsyncResult[TSource, Exception]`.
"""

# %% [markdown]
"""
### AsyncOption

The `AsyncOption[T]` type is the asynchronous version of `Option`. It allows you to
compose asynchronous operations that may return an optional value, using the Option type.
This is particularly useful for handling optional values in asynchronous code, such as
API calls that might not return a value, database queries that might not find a record,
or any other I/O-bound tasks that might not produce a meaningful result.

Similar to the `Option` effect, AsyncOption enables short-circuiting but for asynchronous
operations. If any part of the function yields `Nothing`, the function is short-circuited
and the following statements will never be executed.
"""

# %%
from collections.abc import AsyncGenerator

from expression import Nothing, Some, effect


@effect.async_option[int]()
async def fn_option() -> AsyncGenerator[int, int]:
x: int = yield 42 # Regular value
y: int = yield await Some(43) # Awaitable Some value

# Short-circuit if condition is met
if x + y > 80:
z: int = yield await Nothing # This will short-circuit
else:
z: int = yield 44

yield x + y + z # Final value


# This would be run in an async context
# result = await fn_option()
# assert result is Nothing

# %% [markdown]
"""
AsyncOption works well with other async functions and can be nested:
"""


# %%
@effect.async_option[int]()
async def inner_option(x: int) -> AsyncGenerator[int, int]:
y: int = yield x + 1
yield y + 1 # Final value is y + 1


@effect.async_option[int]()
async def outer_option() -> AsyncGenerator[int, int]:
x: int = yield 40

# Call inner and await its result
inner_result = await inner_option(x)
y: int = yield await inner_result

yield y # Final value is y


# This would be run in an async context
# result = await outer_option()
# assert result == Some(42) # 40 -> 41 -> 42

# %% [markdown]
"""
### Sequence
Expand Down
11 changes: 9 additions & 2 deletions expression/collections/seq.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ def choose(self, chooser: Callable[[_TSource], Option[_TResult]]) -> Seq[_TResul
xs = pipe(self, choose(chooser))
return Seq(xs)

def concat(self: Seq[Seq[_TResult]]) -> Seq[_TResult]:
"""Concatenate sequences.

Combines the given variable number of enumerations and/or
enumeration-of-enumerations as a single concatenated enumeration.
"""
return Seq(concat(*self))

def collect(self, mapping: Callable[[_TSource], Seq[_TResult]]) -> Seq[_TResult]:
"""Collect items from the sequence.

Expand All @@ -123,8 +131,7 @@ def collect(self, mapping: Callable[[_TSource], Seq[_TResult]]) -> Seq[_TResult]
A sequence comprising the concatenated values from the mapping
function.
"""
xs = pipe(self, collect(mapping))
return Seq(xs)
return self.map(mapping).concat()

@staticmethod
def delay(generator: Callable[[], Iterable[_TSource]]) -> Iterable[_TSource]:
Expand Down
130 changes: 130 additions & 0 deletions expression/core/async_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Async builder module.

This module provides the base class for async builders, which are used to
create computational expressions for async operations.
"""

from abc import ABC
from collections.abc import AsyncGenerator, Awaitable, Callable
from functools import wraps
from typing import Any, Generic, TypeVar, cast

from typing_extensions import ParamSpec

from .error import EffectError


_T = TypeVar("_T") # The container item type
_M = TypeVar("_M") # for container type
_P = ParamSpec("_P")


class AsyncBuilderState(Generic[_T]):
"""Encapsulates the state of an async builder computation."""

def __init__(self):
self.is_done = False


class AsyncBuilder(Generic[_T, _M], ABC): # Corrected Generic definition
"""Async effect builder."""

# Required methods
async def bind(
self, xs: _M, fn: Callable[[_T], Awaitable[_M]]
) -> _M: # Use concrete types for Callable input and output
raise NotImplementedError("AsyncBuilder does not implement a `bind` method")

async def return_(self, x: _T) -> _M:
raise NotImplementedError("AsyncBuilder does not implement a `return` method")

async def return_from(self, xs: _M) -> _M:
raise NotImplementedError("AsyncBuilder does not implement a `return` from method")

async def combine(self, xs: _M, ys: _M) -> _M:
"""Used for combining multiple statements in the effect."""
raise NotImplementedError("AsyncBuilder does not implement a `combine` method")

async def zero(self) -> _M:
"""Zero effect.

Called if the effect raises StopAsyncIteration without a value, i.e
returns None.
"""
raise NotImplementedError("AsyncBuilder does not implement a `zero` method")

# Optional methods for control flow
async def delay(self, fn: Callable[[], _M]) -> _M:
"""Delay the computation.

Default implementation is to return the result of the function.
"""
return fn()

async def run(self, computation: _M) -> _M:
"""Run a computation.

Default implementation is to return the computation as is.
"""
return computation

# Internal implementation
async def _send(
self,
gen: AsyncGenerator[_T, Any],
state: AsyncBuilderState[_T], # Use AsyncBuilderState
value: _T,
) -> _M:
try:
yielded = await gen.asend(value)
return await self.return_(yielded)
except EffectError as error:
# Effect errors (Nothing, Error, etc) short circuits
state.is_done = True
return await self.return_from(cast("_M", error.args[0]))
except StopAsyncIteration:
state.is_done = True
raise
except Exception:
state.is_done = True
raise

def __call__(
self,
fn: Callable[
_P,
AsyncGenerator[_T, Any],
],
) -> Callable[_P, Awaitable[_M]]:
"""The builder decorator."""

@wraps(fn)
async def wrapper(*args: _P.args, **kw: _P.kwargs) -> _M:
gen = fn(*args, **kw)
state = AsyncBuilderState[_T]() # Initialize AsyncBuilderState
result: _M = await self.zero() # Initialize result
value: _M

async def binder(value: Any) -> _M:
ret = await self._send(gen, state, value) # Pass state to _send
return await self.delay(lambda: ret) # Delay every bind call

try:
# Initialize co-routine with None to start the generator and get the
# first value
result = value = await binder(None)

while not state.is_done: # Loop until coroutine is exhausted
value = await self.bind(value, binder) # Send value to coroutine
result = await self.combine(result, value) # Combine previous result with new value

except StopAsyncIteration:
# This will happens if the generator exits by returning None
pass

return await self.run(result) # Run the result

return wrapper


__all__ = ["AsyncBuilder", "AsyncBuilderState"]
13 changes: 12 additions & 1 deletion expression/core/option.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from __future__ import annotations

import builtins
from collections.abc import Callable, Generator, Iterable
from collections.abc import Awaitable, Callable, Generator, Iterable
from typing import TYPE_CHECKING, Any, Literal, TypeGuard, TypeVar, cast, get_args, get_origin

from typing_extensions import TypeVarTuple, Unpack
Expand Down Expand Up @@ -42,6 +42,7 @@
@tagged_union(frozen=True, order=True)
class Option(
Iterable[_TSourceOut],
Awaitable[_TSourceOut],
PipeMixin,
):
"""Option class."""
Expand Down Expand Up @@ -296,6 +297,16 @@ def __str__(self) -> str:
def __repr__(self) -> str:
return self.__str__()

def __await__(self) -> Generator[_TSourceOut, _TSourceOut, _TSourceOut]:
"""Make Option awaitable by delegating to __iter__."""
match self:
case Option(tag="some", some=value):
return value
case _:
raise EffectError(self)

yield None

@classmethod
def __get_pydantic_core_schema__(cls, source_type: Any, handler: GetCoreSchemaHandler) -> CoreSchema:
from pydantic import ValidatorFunctionWrapHandler
Expand Down
13 changes: 12 additions & 1 deletion expression/core/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from __future__ import annotations

import builtins
from collections.abc import Callable, Generator, Iterable
from collections.abc import Awaitable, Callable, Generator, Iterable
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -49,6 +49,7 @@
@tagged_union(frozen=True, order=True)
class Result(
Iterable[_TSourceOut],
Awaitable[_TSourceOut],
PipeMixin,
Generic[_TSourceOut, _TErrorOut],
):
Expand Down Expand Up @@ -255,6 +256,16 @@ def __iter__(self) -> Generator[_TSourceOut, _TSourceOut, _TSourceOut]:
case _:
raise EffectError(self)

def __await__(self) -> Generator[_TSourceOut, _TSourceOut, _TSourceOut]:
"""Make Result awaitable by delegating to __iter__."""
match self:
case Result(tag="ok", ok=value):
return value
case _:
raise EffectError(self)

yield None

def __str__(self) -> str:
match self:
case Result(tag="ok", ok=value):
Expand Down
5 changes: 4 additions & 1 deletion expression/effect/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""A collection of computational expression effects."""

from .async_option import AsyncOptionBuilder as async_option
from .async_result import AsyncResultBuilder as async_result
from .async_result import AsyncTryBuilder as async_try
from .option import OptionBuilder as option
from .result import ResultBuilder as result
from .result import TryBuilder as try_
Expand All @@ -9,4 +12,4 @@
seq = seq_builder


__all__ = ["option", "result", "seq", "try_"]
__all__ = ["async_option", "async_result", "async_try", "option", "result", "seq", "try_"]
Loading
0