8000 Refactor forwardStream: uses `Stream.multi` by hoc081098 · Pull Request #605 · ReactiveX/rxdart · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Refactor forwardStream: uses Stream.multi #605

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 2 additions & 95 deletions lib/src/subjects/behavior_subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,12 @@ import 'package:rxdart/src/utils/value_wrapper.dart';
/// subject.stream.listen(print); // prints 1
class BehaviorSubject<T> extends Subject<T> implements ValueStream<T> {
final _Wrapper<T> _wrapper;
final Stream<T> _stream;

BehaviorSubject._(
StreamController<T> controller,
this._stream,
Stream<T> stream,
this._wrapper,
) : super(controller, _stream);
) : super(controller, stream);

/// Constructs a [BehaviorSubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
Expand Down Expand Up @@ -170,98 +169,6 @@ class BehaviorSubject<T> extends Subject<T> implements ValueStream<T> {

@override
StackTrace? get stackTrace => _wrapper.errorAndStackTrace?.stackTrace;

@override
BehaviorSubject<R> createForwardingSubject<R>({
void Function()? onListen,
void Function()? onCancel,
bool sync = false,
}) =>
BehaviorSubject(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);

// Override built-in operators.

@override
ValueStream<T> where(bool Function(T event) test) =>
_forwardBehaviorSubject<T>((s) => s.where(test));

@override
ValueStream<S> map<S>(S Function(T event) convert) =>
_forwardBehaviorSubject<S>((s) => s.map(convert));

@override
ValueStream<E> asyncMap<E>(FutureOr<E> Function(T event) convert) =>
_forwardBehaviorSubject<E>((s) => s.asyncMap(convert));

@override
ValueStream<E> asyncExpand<E>(Stream<E>? Function(T event) convert) =>
_forwardBehaviorSubject<E>((s) => s.asyncExpand(convert));

@override
ValueStream<T> handleError(Function onError,
{bool Function(dynamic error)? test}) =>
_forwardBehaviorSubject<T>((s) => s.handleError(onError, test: test));

@override
ValueStream<S> expand<S>(Iterable<S> Function(T element) convert) =>
_forwardBehaviorSubject<S>((s) => s.expand(convert));

@override
ValueStream<S> transform<S>(StreamTransformer<T, S> streamTransformer) =>
_forwardBehaviorSubject<S>((s) => s.transform(streamTransformer));

@override
ValueStream<R> cast<R>() => _forwardBehaviorSubject<R>((s) => s.cast<R>());

@override
ValueStream<T> take(int count) =>
_forwardBehaviorSubject<T>((s) => s.take(count));

@override
ValueStream<T> takeWhile(bool Function(T element) test) =>
_forwardBehaviorSubject<T>((s) => s.takeWhile(test));

@override
ValueStream<T> skip(int count) =>
_forwardBehaviorSubject<T>((s) => s.skip(count));

@override
ValueStream<T> skipWhile(bool Function(T element) test) =>
_forwardBehaviorSubject<T>((s) => s.skipWhile(test));

@override
ValueStream<T> distinct([bool Function(T previous, T next)? equals]) =>
_forwardBehaviorSubject<T>((s) => s.distinct(equals));

@override
ValueStream<T> timeout(Duration timeLimit,
{void Function(EventSink<T> sink)? onTimeout}) =>
_forwardBehaviorSubject<T>(
(s) => s.timeout(timeLimit, onTimeout: onTimeout));

ValueStream<R> _forwardBehaviorSubject<R>(
Stream<R> Function(Stream<T> s) transformerStream) {
late BehaviorSubject<R> subject;
late StreamSubscription<R> subscription;

final => subscription = transformerStream(_stream).listen(
subject.add,
onError: subject.addError,
onDone: subject.close,
);

final => subscription.cancel();

return subject = createForwardingSubject(
onListen: onListen,
onCancel: onCancel,
sync: true,
);
}
}

class _Wrapper<T> {
Expand Down
12 changes: 0 additions & 12 deletions lib/src/subjects/publish_subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,4 @@ class PublishSubject<T> extends Subject<T> {
controller.stream,
);
}

@override
PublishSubject<R> createForwardingSubject<R>({
void Function()? onListen,
void Function()? onCancel,
bool sync = false,
}) =>
PublishSubject(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
}
13 changes: 0 additions & 13 deletions lib/src/subjects/replay_subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,6 @@ class ReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
.where((event) => event.isError)
.map((event) => event.errorAndStackTrace!.stackTrace)
.toList(growable: false);

@override
ReplaySubject<R> createForwardingSubject<R>({
void Function()? onListen,
void Function()? onCancel,
bool sync = false,
}) =>
ReplaySubject(
maxSize: _maxSize,
onCancel: onCancel,
onListen: onListen,
sync: sync,
);
}

class _Event<T> {
Expand Down
9 changes: 0 additions & 9 deletions lib/src/subjects/subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,6 @@ abstract class Subject<T> extends StreamView<T> implements StreamController<T> {

return _controller.close();
}

/// Creates a trampoline StreamController, which can forward events
/// in the same manner as the original [Subject] does.
/// e.g. replay or behavior on subscribe.
Subject<R> createForwardingSubject<R>({
void Function()? onListen,
void Function()? onCancel,
bool sync = false,
});
}

class _StreamSinkWrapper<T> implements StreamSink<T> {
Expand Down
39 changes: 19 additions & 20 deletions lib/src/transformers/backpressure/backpressure.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ enum WindowStrategy {
onHandler
}

class _BackpressureStreamSink<S, T> implements ForwardingSink<S, T> {
class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
final WindowStrategy _strategy;
final Stream<dynamic> Function(S event)? _windowStreamFactory;
final T Function(S event)? _onWindowStart;
Expand Down Expand Up @@ -51,7 +51,7 @@ class _BackpressureStreamSink<S, T> implements ForwardingSink<S, T> {
);

@override
void add(EventSink<T> sink, S data) {
void onData(S data) {
_hasData = true;
maybeCreateWindow(data, sink);

Expand All @@ -71,11 +71,10 @@ class _BackpressureStreamSink<S, T> implements ForwardingSink<S, T> {
}

@override
void addError(EventSink<T> sink, Object e, StackTrace st) =>
sink.addError(e, st);
void onError(Object e, StackTrace st) => sink.addError(e, st);

@override
void close(EventSink<T> sink) {
void onDone() {
_mainClosed = true;

if (_strategy == WindowStrategy.eventAfterLastWindow) {
Expand All @@ -97,16 +96,16 @@ class _BackpressureStreamSink<S, T> implements ForwardingSink<S, T> {
}

@override
FutureOr onCancel(EventSink<T> sink) => _windowSubscription?.cancel();
FutureOr<void> onCancel() => _windowSubscription?.cancel();

@override
void onListen(EventSink<T> sink) {}
void onListen() {}

@override
void onPause(EventSink<T> sink) => _windowSubscription?.pause();
void onPause() => _windowSubscription?.pause();

@override
void onResume(EventSink<T> sink) => _windowSubscription?.resume();
void onResume() => _windowSubscription?.resume();

void maybeCreateWindow(S event, EventSink<T> sink) {
switch (_strategy) {
Expand Down Expand Up @@ -341,17 +340,17 @@ class BackpressureStreamTransformer<S, T> extends StreamTransformerBase<S, T> {

@override
Stream<T> bind(Stream<S> stream) {
final sink = _BackpressureStreamSink(
strategy,
windowStreamFactory,
onWindowStart,
onWindowEnd,
startBufferEvery,
closeWindowWhen,
ignoreEmptyWindows,
dispatchOnClose,
maxLengthQueue,
);
final sink = () => _BackpressureStreamSink(
strategy,
windowStreamFactory,
onWindowStart,
onWindowEnd,
startBufferEvery,
closeWindowWhen,
ignoreEmptyWindows,
dispatchOnClose,
maxLengthQueue,
);
return forwardStream(stream, sink);
}
}
Expand Down
19 changes: 9 additions & 10 deletions lib/src/transformers/delay.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import 'package:rxdart/src/rx.dart';
import 'package:rxdart/src/utils/forwarding_sink.dart';
import 'package:rxdart/src/utils/forwarding_stream.dart';

class _DelayStreamSink<S> implements ForwardingSink<S, S> {
class _DelayStreamSink<S> extends ForwardingSink<S, S> {
final Duration _duration;
var _inputClosed = false;
final _subscriptions = Queue<StreamSubscription<void>>();

_DelayStreamSink(this._duration);

@override
void add(EventSink<S> sink, S data) {
void onData(S data) {
final subscription = Rx.timer<void>(null, _duration).listen((_) {
_subscriptions.removeFirst();

Expand All @@ -28,11 +28,10 @@ class _DelayStreamSink<S> implements ForwardingSink<S, S> {
}

@override
void addError(EventSink<S> sink, Object error, StackTrace st) =>
sink.addError(error, st);
void onError(Object error, StackTrace st) => sink.addError(error, st);

@override
void close(EventSink<S> sink) {
void onDone() {
_inputClosed = true;

if (_subscriptions.isEmpty) {
Expand All @@ -41,21 +40,21 @@ class _DelayStreamSink<S> implements ForwardingSink<S, S> {
}

@override
FutureOr<void> onCancel(EventSink<S> sink) {
FutureOr<void> onCancel() {
if (_subscriptions.isNotEmpty) {
return Future.wait(_subscriptions.map((t) => t.cancel()))
.whenComplete(() => _subscriptions.clear());
}
}

@override
void onListen(EventSink<S> sink) {}
void onListen() {}

@override
void onPause(EventSink<S> sink) => _subscriptions.forEach((s) => s.pause());
void onPause() => _subscriptions.forEach((s) => s.pause());

@override
void onResume(EventSink<S> sink) => _subscriptions.forEach((s) => s.resume());
void onResume() => _subscriptions.forEach((s) => s.resume());
}

/// The Delay operator modifies its source Stream by pausing for
Expand All @@ -81,7 +80,7 @@ class DelayStreamTransformer<S> extends StreamTransformerBase<S, S> {

@override
Stream<S> bind(Stream<S> stream) =>
forwardStream(stream, _DelayStreamSink<S>(duration));
forwardStream(stream, () => _DelayStreamSink<S>(duration));
}

/// Extends the Stream class with the ability to delay events being emitted
Expand Down
Loading
0