Open
Description
Code:
import scala.concurrent.duration._
import cats.effect.{IO, Resource}
import fs2.Stream
import cats.effect.unsafe.implicits.global
Stream.resource(
Resource.make(IO.println("start"))(_ => IO.println("stop"))
)
.flatMap(_ => Stream(1,2,3,4).covary[IO].metered(100.millis))
.take(2)
.evalTap(IO.println)
.groupWithin(2, 1.second)
.evalTap(IO.println)
.compile
.drain
.unsafeRunSync()
Result:
start
1
2
stop
Chunk(1, 2)
The resource is already closed before the stage after groupWithin
gets to process the chunk. So this can leak a closed resource.