8000 GitHub - cqfn/rio at 0.1.5
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
This repository was archived by the owner on Mar 26, 2023. It is now read-only.
/ rio Public archive

Reactive Input Output objects for Java

License

Notifications You must be signed in to change notification settings

cqfn/rio

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

24 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Reactive Input Output objects for Java:

  • Fine tuned: fast or memory effecient
  • RS-TCK compatible (see tests for publishers and subscribers)
  • No dependencies: the only dependency is org.reactivestreams:reactive-streams interfaces

Maven Build codecov Maven Central

Install

Add Maven dependency to pom.xml:

<dependency>
  <groupId>wtf.g4s8</groupId>
  <artifactId>rio</artifactId>
  <version><!-- see latest release -->
</dependency>

Usage

The primary protocol for all objects is Publisher<ByteBuffer>, it's either accepted or provided by various methods in main entry points.

File API

To create new reactive file instance use constructor:

import java.nio.file.Paths;
import wtf.g4s8.rio.file.File;

var file = new File(Paths.get("/tmp/my/file.txt"));

Read file

To read the file use Publisher<ByteBuffer> content() method of File object:

import java.nio.file.Paths;
import org.reactivestreams.Publisher;
import wtf.g4s8.rio.file.File;

Publisher<ByteBuffer> content = new File(Paths.get("/tmp/my/file.txt")).content();

The file will be read on demand with all respect to backpressure. This default implementation uses 8 kilobytes buffer to read the file.

To tune the buffer allocation strategy use overloaded Publisher<ByteBuffer> read(Buffers buffers) method, Buffers interface is responsible to provide new buffers for reading. Some standard implementations are available at Buffers.Standard enum.

All read operations are performed in ThreadPoolExecutor by default, to specify executor service explicitely use content(ExecuorService) or content(Buffers, ExecutorService) overloaded methods.

Write file

To write reactive stream of ByteBuffers to file use CompletionStage<Void> write(Publisher<ByteBuffer> data) method:

import java.nio.file.Paths;
import org.reactivestreams.Publisher;
import wtf.g4s8.rio.file.File;

CompletionStage<Void> result = new File(Paths.get("/tmp/my/file.txt")).write(data);

It returns CompletionStage to signal errors or complete events. Also, it supports cancellation via cancel() method of CompletableFuture:

// will stop writing after subscribe
CompletionStage<Void> result = new File(Paths.get("/tmp/my/file.txt")).write(data);
Thread.sleep(1000);
result.toCompletableFuture().cancel(); // cancel writing after one second

Method write supports OpenOptions from java.nio.file as second varargs parameter: file.write(data, StandardOpenOptions.WRITE, StandardOpenOptions.CREATE_NEW), by default it uses WRITE and CREATE options if not provided any.

To fine tune the speed or memory usage of write, the client is able to configure the WriteGreed level. It configures the amount of buffers to requests and when. By default it requests 3 buffers at the beginning and when writing last buffer (one befire the end):

[subscribe] | [write] [write] [write] | [write] [write] [write] |
 request(3) |          request(3)     |          request(3)

to consume less memory WriteGreed.SINGLE can be used which requests one by one buffer at the beginning and after write operation. These values can be configured via write overloaded 2-nd parameter:

file.write(path, new WriteGreed.Constant(10, 2)) // request 10 buffers when read 8

or via system properties for default imeplementation:

# request 10 buffers when read 8 (only for default write method)
java -Drio.file.write.greed.amount=10 -Drio.file.write.greed.shift=2

All write operations are performed in ThreadPoolExecutor by default, to specify executor service explicitely use write(Publisher<ByteBuffer> data, ExecuorService exec) overloaded method.

0