8000 Pipe by johnhungerford · Pull Request #1166 · getkyo/kyo · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Pipe #1166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jun 19, 2025
Merged

Pipe #1166

merged 15 commits into from
Jun 19, 2025

Conversation

johnhungerford
Copy link
Collaborator

Completes #1124

Problem

It would be good to have a Kyo version of ZIO's ZPipeline

Solution

As discussed in #1124 , added Pipe[A, B, S], a wrapper around Unit < (Poll[Chunk[A]] & Emit[Chunk[B]] & S).

Combinators:

  • contramap
  • contramapChunk
  • map
  • mapChunk
  • join (pipe1.join(pipe2) -> pipe, pipe.join(sink) -> sink)
  • transform

Constructors:

  • empty
  • identity
  • map
  • mapChunk
  • take
  • drop
  • takeWhile
  • dropWhile
  • filter
  • collect
  • collectWhile
  • changes
  • rechunk
  • tap
  • tapChunk

You can now process a stream via a sequence of pipes terminating with a sink:

val processed = stream.into(pipe1).into(pipe2).into(pipe3).into(sink)

Notes

  • Pipe.transform and Sink.drain no longer us Poll.run since this method handled Emit prior to Poll. Pipe and Sink delegate control to their own Poll effects, allowing early termination.
  • Tests have been copied from Stream, since current Pipes are all based on Stream methods. Users should expect identical behavior.

@johnhungerford johnhungerford marked this pull request as ready for review April 28, 2025 15:56
* @return
* A new stream of transformed element type `A`
*/
def into[A, S2](pipe: Pipe[V, A, S2])(using Tag[V], Frame): Stream[A, S & S2] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about map instead of into? Pipe is similar to a map function

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's similar but different in important ways. map makes it seem like each element will be transformed, which is not necessarily the case. The input and output streams can be entirely different lengths, for instance.

* @return
* New pipe that performs both transformations in sequence
*/
def join[C, S1](pipe: Pipe[B, C, S1])(using Tag[B], Frame): Pipe[A, C, S & S1] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this make sense as andThen? I think we have some other APIs with that naming.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to re-use extension method names as this 8000 can create annoying conflicts (Pipe[A, B, S] can be lifted to Pipe[A, B, S] < Any, the andThen extension on < could be accessed).

@johnhungerford
Copy link
Collaborator Author

Looks like the failed build is unrelated to this PR

@ahoy-jon
Copy link
Collaborator

@johnhungerford it's a draft, or ready to review + merge? (it looks good to me)

@johnhungerford
Copy link
Collaborator Author

Ready for review as far as I'm concerned

Comment on lines +59 to +75
def contramap[A1, S1](f: A1 => A < S1)(
using
t1: Tag[Poll[Chunk[A]]],
t2: Tag[Poll[Chunk[A1]]],
disc: Discriminator,
fr: Frame
): Pipe[A1, B, S & S1] =
Pipe:
ArrowEffect.handleLoop(t1, pollEmit)(
[C] =>
(unit, cont) =>
Poll.andMap[Chunk[A1]]:
case Absent => Loo 8000 p.continue(cont(Absent))
case Present(chunk) =>
Kyo.foreach(chunk)(f).map: chunk2 =>
Loop.continue(cont(Present(chunk2)))
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's going to be contramapKyo

Copy link
Collaborator
@fwbrasil fwbrasil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the delay to review

@johnhungerford johnhungerford merged commit 617018d into getkyo:main Jun 19, 2025
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants
0