-
Notifications
You must be signed in to change notification settings - Fork 66
Pipe #1166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pipe #1166
Conversation
* @return | ||
* A new stream of transformed element type `A` | ||
*/ | ||
def into[A, S2](pipe: Pipe[V, A, S2])(using Tag[V], Frame): Stream[A, S & S2] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about map
instead of into
? Pipe
is similar to a map
function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's similar but different in important ways. map
makes it seem like each element will be transformed, which is not necessarily the case. The input and output streams can be entirely different lengths, for instance.
* @return | ||
* New pipe that performs both transformations in sequence | ||
*/ | ||
def join[C, S1](pipe: Pipe[B, C, S1])(using Tag[B], Frame): Pipe[A, C, S & S1] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this make sense as andThen
? I think we have some other APIs with that naming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer not to re-use extension method names as this
8000
can create annoying conflicts (Pipe[A, B, S]
can be lifted to Pipe[A, B, S] < Any
, the andThen
extension on <
could be accessed).
Looks like the failed build is unrelated to this PR |
@johnhungerford it's a draft, or ready to review + merge? (it looks good to me) |
Ready for review as far as I'm concerned |
def contramap[A1, S1](f: A1 => A < S1)( | ||
using | ||
t1: Tag[Poll[Chunk[A]]], | ||
t2: Tag[Poll[Chunk[A1]]], | ||
disc: Discriminator, | ||
fr: Frame | ||
): Pipe[A1, B, S & S1] = | ||
Pipe: | ||
ArrowEffect.handleLoop(t1, pollEmit)( | ||
[C] => | ||
(unit, cont) => | ||
Poll.andMap[Chunk[A1]]: | ||
case Absent => Loo 8000 p.continue(cont(Absent)) | ||
case Present(chunk) => | ||
Kyo.foreach(chunk)(f).map: chunk2 => | ||
Loop.continue(cont(Present(chunk2))) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's going to be contramapKyo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for the delay to review
Completes #1124
Problem
It would be good to have a Kyo version of ZIO's
ZPipeline
Solution
As discussed in #1124 , added
Pipe[A, B, S]
, a wrapper aroundUnit < (Poll[Chunk[A]] & Emit[Chunk[B]] & S)
.Combinators:
Constructors:
You can now process a stream via a sequence of pipes terminating with a sink:
val processed = stream.into(pipe1).into(pipe2).into(pipe3).into(sink)
Notes
Pipe.transform
andSink.drain
no longer usPoll.run
since this method handledEmit
prior toPoll
.Pipe
andSink
delegate control to their ownPoll
effects, allowing early termination.Stream
, since currentPipe
s are all based onStream
methods. Users should expect identical behavior.