From e8beca72a0378b15503e7ee3a0c1be9427e2eb83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Tue, 10 Nov 2015 22:30:08 +0100 Subject: [PATCH] 1.x: merge can now run in horizontally unbounded mode. --- .../rx/internal/operators/OperatorMerge.java | 48 +-- .../atomic/AtomicReferenceArrayQueue.java | 75 ++++ .../util/atomic/SpscAtomicArrayQueue.java | 124 +++++++ .../atomic/SpscExactAtomicArrayQueue.java | 169 ++++++++++ .../atomic/SpscUnboundedAtomicArrayQueue.java | 319 ++++++++++++++++++ src/perf/java/rx/operators/FlatMapPerf.java | 71 ++++ .../internal/operators/OperatorMergeTest.java | 33 +- .../rx/internal/util/JCToolsQueueTests.java | 108 ++++++ 8 files changed, 923 insertions(+), 24 deletions(-) create mode 100644 src/main/java/rx/internal/util/atomic/AtomicReferenceArrayQueue.java create mode 100644 src/main/java/rx/internal/util/atomic/SpscAtomicArrayQueue.java create mode 100644 src/main/java/rx/internal/util/atomic/SpscExactAtomicArrayQueue.java create mode 100644 src/main/java/rx/internal/util/atomic/SpscUnboundedAtomicArrayQueue.java create mode 100644 src/perf/java/rx/operators/FlatMapPerf.java diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index d2f52cb204..3fd96791a0 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -17,13 +17,15 @@ import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicLong; import rx.*; -import rx.Observable.Operator; import rx.Observable; +import rx.Observable.Operator; import rx.exceptions.*; import rx.internal.util.*; +import rx.internal.util.atomic.*; +import rx.internal.util.unsafe.*; import rx.subscriptions.CompositeSubscription; /** @@ -144,7 +146,7 @@ static final class MergeSubscriber extends Subscriber MergeProducer producer; - volatile RxRingBuffer queue; + volatile Queue queue; /** Tracks the active subscriptions to sources. */ volatile CompositeSubscription subscriptions; @@ -182,8 +184,7 @@ public MergeSubscriber(Subscriber child, boolean delayErrors, int max this.nl = NotificationLite.instance(); this.innerGuard = new Object(); this.innerSubscribers = EMPTY; - long r = Math.min(maxConcurrent, RxRingBuffer.SIZE); - request(r); + request(maxConcurrent == Integer.MAX_VALUE ? Long.MAX_VALUE : maxConcurrent); } Queue getOrCreateErrorQueue() { @@ -443,23 +444,27 @@ protected void queueScalar(T value) { * due to lack of requests or an ongoing emission, * enqueue the value and try the slow emission path. */ - RxRingBuffer q = this.queue; + Queue q = this.queue; if (q == null) { - q = RxRingBuffer.getSpscInstance(); - this.add(q); + int mc = maxConcurrent; + if (mc == Integer.MAX_VALUE) { + q = new SpscUnboundedAtomicArrayQueue(RxRingBuffer.SIZE); + } else { + if (Pow2.isPowerOfTwo(mc)) { + if (UnsafeAccess.isUnsafeAvailable()) { + q = new SpscArrayQueue(mc); + } else { + q = new SpscAtomicArrayQueue(mc); + } + } else { + q = new SpscExactAtomicArrayQueue(mc); + } + } this.queue = q; } - try { - q.onNext(nl.next(value)); - } catch (MissingBackpressureException ex) { - this.unsubscribe(); - this.onError(ex); - return; - } catch (IllegalStateException ex) { - if (!this.isUnsubscribed()) { - this.unsubscribe(); - this.onError(ex); - } + if (!q.offer(value)) { + unsubscribe(); + onError(OnErrorThrowable.addValueAsLastCause(new MissingBackpressureException(), value)); return; } emit(); @@ -533,7 +538,7 @@ void emitLoop() { skipFinal = true; return; } - RxRingBuffer svq = queue; + Queue svq = queue; long r = producer.get(); boolean unbounded = r == Long.MAX_VALUE; @@ -610,9 +615,6 @@ void emitLoop() { } else { reportError(); } - if (svq != null) { - svq.release(); - } skipFinal = true; return; } diff --git a/src/main/java/rx/internal/util/atomic/AtomicReferenceArrayQueue.java b/src/main/java/rx/internal/util/atomic/AtomicReferenceArrayQueue.java new file mode 100644 index 0000000000..f7594ba20a --- /dev/null +++ b/src/main/java/rx/internal/util/atomic/AtomicReferenceArrayQueue.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/AtomicReferenceArrayQueue.java + */ +package rx.internal.util.atomic; + +import java.util.*; +import java.util.concurrent.atomic.AtomicReferenceArray; + +import rx.internal.util.unsafe.Pow2; + +abstract class AtomicReferenceArrayQueue extends AbstractQueue { + protected final AtomicReferenceArray buffer; + protected final int mask; + public AtomicReferenceArrayQueue(int capacity) { + int actualCapacity = Pow2.roundToPowerOfTwo(capacity); + this.mask = actualCapacity - 1; + this.buffer = new AtomicReferenceArray(actualCapacity); + } + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + @Override + public void clear() { + // we have to test isEmpty because of the weaker poll() guarantee + while (poll() != null || !isEmpty()) + ; + } + protected final int calcElementOffset(long index, int mask) { + return (int)index & mask; + } + protected final int calcElementOffset(long index) { + return (int)index & mask; + } + protected final E lvElement(AtomicReferenceArray buffer, int offset) { + return buffer.get(offset); + } + protected final E lpElement(AtomicReferenceArray buffer, int offset) { + return buffer.get(offset); // no weaker form available + } + protected final E lpElement(int offset) { + return buffer.get(offset); // no weaker form available + } + protected final void spElement(AtomicReferenceArray buffer, int offset, E value) { + buffer.lazySet(offset, value); // no weaker form available + } + protected final void spElement(int offset, E value) { + buffer.lazySet(offset, value); // no weaker form available + } + protected final void soElement(AtomicReferenceArray buffer, int offset, E value) { + buffer.lazySet(offset, value); + } + protected final void soElement(int offset, E value) { + buffer.lazySet(offset, value); + } + protected final void svElement(AtomicReferenceArray buffer, int offset, E value) { + buffer.set(offset, value); + } + protected final E lvElement(int offset) { + return lvElement(buffer, offset); + } +} diff --git a/src/main/java/rx/internal/util/atomic/SpscAtomicArrayQueue.java b/src/main/java/rx/internal/util/atomic/SpscAtomicArrayQueue.java new file mode 100644 index 0000000000..65c29e3ce8 --- /dev/null +++ b/src/main/java/rx/internal/util/atomic/SpscAtomicArrayQueue.java @@ -0,0 +1,124 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/SpscAtomicArrayQueue.java + */ +package rx.internal.util.atomic; + +import java.util.concurrent.atomic.*; + +/** + * A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer. + *

+ * This implementation is a mashup of the Fast Flow + * algorithm with an optimization of the offer method taken from the BQueue algorithm (a variation on Fast + * Flow), and adjusted to comply with Queue.offer semantics with regards to capacity.
+ * For convenience the relevant papers are available in the resources folder:
+ * 2010 - Pisa - SPSC Queues on Shared Cache Multi-Core Systems.pdf
+ * 2012 - Junchang- BQueue- Efficient and Practical Queuing.pdf
+ *
This implementation is wait free. + * + * @param + */ +public final class SpscAtomicArrayQueue extends AtomicReferenceArrayQueue { + private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096); + final AtomicLong producerIndex; + protected long producerLookAhead; + final AtomicLong consumerIndex; + final int lookAheadStep; + public SpscAtomicArrayQueue(int capacity) { + super(capacity); + this.producerIndex = new AtomicLong(); + this.consumerIndex = new AtomicLong(); + lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP); + } + + @Override + public boolean offer(E e) { + if (null == e) { + throw new NullPointerException("Null is not a valid element"); + } + // local load of field to avoid repeated loads after volatile reads + final AtomicReferenceArray buffer = this.buffer; + final int mask = this.mask; + final long index = producerIndex.get(); + final int offset = calcElementOffset(index, mask); + if (index >= producerLookAhead) { + int step = lookAheadStep; + if (null == lvElement(buffer, calcElementOffset(index + step, mask))) {// LoadLoad + producerLookAhead = index + step; + } + else if (null != lvElement(buffer, offset)){ + return false; + } + } + soProducerIndex(index + 1); // ordered store -> atomic and ordered for size() + soElement(buffer, offset, e); // StoreStore + return true; + } + + @Override + public E poll() { + final long index = consumerIndex.get(); + final int offset = calcElementOffset(index); + // local load of field to avoid repeated loads after volatile reads + final AtomicReferenceArray lElementBuffer = buffer; + final E e = lvElement(lElementBuffer, offset);// LoadLoad + if (null == e) { + return null; + } + soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size() + soElement(lElementBuffer, offset, null);// StoreStore + return e; + } + + @Override + public E peek() { + return lvElement(calcElementOffset(consumerIndex.get())); + } + + @Override + public int size() { + /* + * It is possible for a thread to be interrupted or reschedule between the read of the producer and consumer + * indices, therefore protection is required to ensure size is within valid range. In the event of concurrent + * polls/offers to this method the size is OVER estimated as we read consumer index BEFORE the producer index. + */ + long after = lvConsumerIndex(); + while (true) { + final long before = after; + final long currentProducerIndex = lvProducerIndex(); + after = lvConsumerIndex(); + if (before == after) { + return (int) (currentProducerIndex - after); + } + } + } + + private void soProducerIndex(long newIndex) { + producerIndex.lazySet(newIndex); + } + + private void soConsumerIndex(long newIndex) { + consumerIndex.lazySet(newIndex); + } + + private long lvConsumerIndex() { + return consumerIndex.get(); + } + private long lvProducerIndex() { + return producerIndex.get(); + } +} diff --git a/src/main/java/rx/internal/util/atomic/SpscExactAtomicArrayQueue.java b/src/main/java/rx/internal/util/atomic/SpscExactAtomicArrayQueue.java new file mode 100644 index 0000000000..00fc1f96f0 --- /dev/null +++ b/src/main/java/rx/internal/util/atomic/SpscExactAtomicArrayQueue.java @@ -0,0 +1,169 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/SpscAtomicArrayQueue.java + */ + +package rx.internal.util.atomic; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import rx.internal.util.unsafe.Pow2; + +/** + * A single-producer single-consumer bounded queue with exact capacity tracking. + *

This means that a queue of 10 will allow exactly 10 offers, however, the underlying storage is still power-of-2. + *

The implementation uses field updaters and thus should be platform-safe. + */ +public final class SpscExactAtomicArrayQueue extends AtomicReferenceArray implements Queue { + /** */ + private static final long serialVersionUID = 6210984603741293445L; + final int mask; + final int capacitySkip; + volatile long producerIndex; + volatile long consumerIndex; + + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater PRODUCER_INDEX = + AtomicLongFieldUpdater.newUpdater(SpscExactAtomicArrayQueue.class, "producerIndex"); + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater CONSUMER_INDEX = + AtomicLongFieldUpdater.newUpdater(SpscExactAtomicArrayQueue.class, "consumerIndex"); + + public SpscExactAtomicArrayQueue(int capacity) { + super(Pow2.roundToPowerOfTwo(capacity)); + int len = length(); + this.mask = len - 1; + this.capacitySkip = len - capacity; + } + + + @Override + public boolean offer(T value) { + if (value == null) { + throw new NullPointerException(); + } + + long pi = producerIndex; + int m = mask; + + int fullCheck = (int)(pi + capacitySkip) & m; + if (get(fullCheck) != null) { + return false; + } + int offset = (int)pi & m; + PRODUCER_INDEX.lazySet(this, pi + 1); + lazySet(offset, value); + return true; + } + @Override + public T poll() { + long ci = consumerIndex; + int offset = (int)ci & mask; + T value = get(offset); + if (value == null) { + return null; + } + CONSUMER_INDEX.lazySet(this, ci + 1); + lazySet(offset, null); + return value; + } + @Override + public T peek() { + return get((int)consumerIndex & mask); + } + @Override + public void clear() { + while (poll() != null || !isEmpty()); + } + @Override + public boolean isEmpty() { + return producerIndex == consumerIndex; + } + + @Override + public int size() { + long ci = consumerIndex; + for (;;) { + long pi = producerIndex; + long ci2 = consumerIndex; + if (ci == ci2) { + return (int)(pi - ci2); + } + ci = ci2; + } + } + + @Override + public boolean contains(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public E[] toArray(E[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(T e) { + throw new UnsupportedOperationException(); + } + + @Override + public T remove() { + throw new UnsupportedOperationException(); + } + + @Override + public T element() { + throw new UnsupportedOperationException(); + } + +} diff --git a/src/main/java/rx/internal/util/atomic/SpscUnboundedAtomicArrayQueue.java b/src/main/java/rx/internal/util/atomic/SpscUnboundedAtomicArrayQueue.java new file mode 100644 index 0000000000..af62a9ce60 --- /dev/null +++ b/src/main/java/rx/internal/util/atomic/SpscUnboundedAtomicArrayQueue.java @@ -0,0 +1,319 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE + * Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/SpscUnboundedAtomicArrayQueue.java + */ + +package rx.internal.util.atomic; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import rx.internal.util.unsafe.Pow2; + +/** + * A single-producer single-consumer queue with unbounded capacity. + *

The implementation uses fixed, power-of-2 arrays to store elements and turns into a linked-list like + * structure if the production overshoots the consumption. + *

Note that the minimum capacity of the 'islands' are 8 due to how the look-ahead optimization works. + *

The implementation uses field updaters and thus should be platform-safe. + */ +public final class SpscUnboundedAtomicArrayQueue implements Queue { + static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096); + protected volatile long producerIndex; + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater PRODUCER_INDEX = + AtomicLongFieldUpdater.newUpdater(SpscUnboundedAtomicArrayQueue.class, "producerIndex"); + protected int producerLookAheadStep; + protected long producerLookAhead; + protected int producerMask; + protected AtomicReferenceArray producerBuffer; + protected int consumerMask; + protected AtomicReferenceArray consumerBuffer; + protected volatile long consumerIndex; + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater CONSUMER_INDEX = + AtomicLongFieldUpdater.newUpdater(SpscUnboundedAtomicArrayQueue.class, "consumerIndex"); + private static final Object HAS_NEXT = new Object(); + + public SpscUnboundedAtomicArrayQueue(final int bufferSize) { + int p2capacity = Pow2.roundToPowerOfTwo(Math.max(8, bufferSize)); // lookahead doesn't work with capacity < 8 + int mask = p2capacity - 1; + AtomicReferenceArray buffer = new AtomicReferenceArray(p2capacity + 1); + producerBuffer = buffer; + producerMask = mask; + adjustLookAheadStep(p2capacity); + consumerBuffer = buffer; + consumerMask = mask; + producerLookAhead = mask - 1; // we know it's all empty to start with + soProducerIndex(0L); + } + + /** + * {@inheritDoc} + *

+ * This implementation is correct for single producer thread use only. + */ + @Override + public final boolean offer(final T e) { + if (e == null) { + throw new NullPointerException(); + } + // local load of field to avoid repeated loads after volatile reads + final AtomicReferenceArray buffer = producerBuffer; + final long index = lpProducerIndex(); + final int mask = producerMask; + final int offset = calcWrappedOffset(index, mask); + if (index < producerLookAhead) { + return writeToQueue(buffer, e, index, offset); + } else { + final int lookAheadStep = producerLookAheadStep; + // go around the buffer or resize if full (unless we hit max capacity) + int lookAheadElementOffset = calcWrappedOffset(index + lookAheadStep, mask); + if (null == lvElement(buffer, lookAheadElementOffset)) {// LoadLoad + producerLookAhead = index + lookAheadStep - 1; // joy, there's plenty of room + return writeToQueue(buffer, e, index, offset); + } else if (null != lvElement(buffer, calcWrappedOffset(index + 1, mask))) { // buffer is not full + return writeToQueue(buffer, e, index, offset); + } else { + resize(buffer, index, offset, e, mask); // add a buffer and link old to new + return true; + } + } + } + + private boolean writeToQueue(final AtomicReferenceArray buffer, final T e, final long index, final int offset) { + soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms + soElement(buffer, offset, e);// StoreStore + return true; + } + + private void resize(final AtomicReferenceArray oldBuffer, final long currIndex, final int offset, final T e, + final long mask) { + final int capacity = oldBuffer.length(); + final AtomicReferenceArray newBuffer = new AtomicReferenceArray(capacity); + producerBuffer = newBuffer; + producerLookAhead = currIndex + mask - 1; + soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms + soElement(newBuffer, offset, e);// StoreStore + soNext(oldBuffer, newBuffer); + soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is + // inserted + } + + private void soNext(AtomicReferenceArray curr, AtomicReferenceArray next) { + soElement(curr, calcDirectOffset(curr.length() - 1), next); + } + @SuppressWarnings("unchecked") + private AtomicReferenceArray lvNext(AtomicReferenceArray curr) { + return (AtomicReferenceArray)lvElement(curr, calcDirectOffset(curr.length() - 1)); + } + /** + * {@inheritDoc} + *

+ * This implementation is correct for single consumer thread use only. + */ + @SuppressWarnings("unchecked") + @Override + public final T poll() { + // local load of field to avoid repeated loads after volatile reads + final AtomicReferenceArray buffer = consumerBuffer; + final long index = lpConsumerIndex(); + final int mask = consumerMask; + final int offset = calcWrappedOffset(index, mask); + final Object e = lvElement(buffer, offset);// LoadLoad + boolean isNextBuffer = e == HAS_NEXT; + if (null != e && !isNextBuffer) { + soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms + soElement(buffer, offset, null);// StoreStore + return (T) e; + } else if (isNextBuffer) { + return newBufferPoll(lvNext(buffer), index, mask); + } + + return null; + } + + @SuppressWarnings("unchecked") + private T newBufferPoll(AtomicReferenceArray nextBuffer, final long index, final int mask) { + consumerBuffer = nextBuffer; + final int offsetInNew = calcWrappedOffset(index, mask); + final T n = (T) lvElement(nextBuffer, offsetInNew);// LoadLoad + if (null == n) { + return null; + } else { + soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms + soElement(nextBuffer, offsetInNew, null);// StoreStore + return n; + } + } + + /** + * {@inheritDoc} + *

+ * This implementation is correct for single consumer thread use only. + */ + @SuppressWarnings("unchecked") + @Override + public final T peek() { + final AtomicReferenceArray buffer = consumerBuffer; + final long index = lpConsumerIndex(); + final int mask = consumerMask; + final int offset = calcWrappedOffset(index, mask); + final Object e = lvElement(buffer, offset);// LoadLoad + if (e == HAS_NEXT) { + return newBufferPeek(lvNext(buffer), index, mask); + } + + return (T) e; + } + + @Override + public void clear() { + while (poll() != null || !isEmpty()); + } + + @SuppressWarnings("unchecked") + private T newBufferPeek(AtomicReferenceArray nextBuffer, final long index, final int mask) { + consumerBuffer = nextBuffer; + final int offsetInNew = calcWrappedOffset(index, mask); + return (T) lvElement(nextBuffer, offsetInNew);// LoadLoad + } + + @Override + public final int size() { + /* + * It is possible for a thread to be interrupted or reschedule between the read of the producer and + * consumer indices, therefore protection is required to ensure size is within valid range. In the + * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer + * index BEFORE the producer index. + */ + long after = lvConsumerIndex(); + while (true) { + final long before = after; + final long currentProducerIndex = lvProducerIndex(); + after = lvConsumerIndex(); + if (before == after) { + return (int) (currentProducerIndex - after); + } + } + } + + @Override + public boolean isEmpty() { + return lvProducerIndex() == lvConsumerIndex(); + } + + private void adjustLookAheadStep(int capacity) { + producerLookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP); + } + + private long lvProducerIndex() { + return producerIndex; + } + + private long lvConsumerIndex() { + return consumerIndex; + } + + private long lpProducerIndex() { + return producerIndex; + } + + private long lpConsumerIndex() { + return consumerIndex; + } + + private void soProducerIndex(long v) { + PRODUCER_INDEX.lazySet(this, v); + } + + private void soConsumerIndex(long v) { + CONSUMER_INDEX.lazySet(this, v); + } + + private static final int calcWrappedOffset(long index, int mask) { + return calcDirectOffset((int)index & mask); + } + private static final int calcDirectOffset(int index) { + return index; + } + private static final void soElement(AtomicReferenceArray buffer, int offset, Object e) { + buffer.lazySet(offset, e); + } + + private static final Object lvElement(AtomicReferenceArray buffer, int offset) { + return buffer.get(offset); + } + + @Override + public final Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean contains(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public E[] toArray(E[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(T e) { + throw new UnsupportedOperationException(); + } + + @Override + public T remove() { + throw new UnsupportedOperationException(); + } + + @Override + public T element() { + throw new UnsupportedOperationException(); + } +} diff --git a/src/perf/java/rx/operators/FlatMapPerf.java b/src/perf/java/rx/operators/FlatMapPerf.java new file mode 100644 index 0000000000..f8dafd467d --- /dev/null +++ b/src/perf/java/rx/operators/FlatMapPerf.java @@ -0,0 +1,71 @@ +/* + * Copyright 2011-2015 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.operators; + +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; + +import rx.Observable; +import rx.functions.Func1; + +/** + * Benchmark flatMap's optimizations. + *

+ * gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*FlatMapPerf.*" + *

+ * gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*FlatMapPerf.*" + */ +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlatMapPerf { + @Param({ "1", "1000", "1000000" }) + public int times; + + Observable rxSource; + Observable rxSource2; + + @Setup + public void setup() { + Observable rxRange = Observable.range(0, times); + rxSource = rxRange.flatMap(new Func1>() { + @Override + public Observable call(Integer t) { + return Observable.just(t); + } + }); + rxSource2 = rxRange.flatMap(new Func1>() { + @Override + public Observable call(Integer v) { + return Observable.range(v, 2); + } + }); + } + + @Benchmark + public Object rxFlatMap() { + return rxSource.subscribe(); + } + @Benchmark + public Object rxFlatMap2() { + return rxSource2.subscribe(); + } +} diff --git a/src/test/java/rx/internal/operators/OperatorMergeTest.java b/src/test/java/rx/internal/operators/OperatorMergeTest.java index 9732611e44..c3ef0a83ee 100644 --- a/src/test/java/rx/internal/operators/OperatorMergeTest.java +++ b/src/test/java/rx/internal/operators/OperatorMergeTest.java @@ -716,7 +716,8 @@ public void onNext(Integer t) { } }; - Observable.merge(o1).observeOn(Schedulers.computation()).take(RxRingBuffer.SIZE * 2).subscribe(testSubscriber); + int limit = RxRingBuffer.SIZE; // the default unbounded behavior makes this test fail 100% of the time: source is too fast + Observable.merge(o1, limit).observeOn(Schedulers.computation()).take(RxRingBuffer.SIZE * 2).subscribe(testSubscriber); testSubscriber.awaitTerminalEvent(); if (testSubscriber.getOnErrorEvents().size() > 0) { testSubscriber.getOnErrorEvents().get(0).printStackTrace(); @@ -1303,4 +1304,34 @@ public void onNext(Integer t) { runMerge(toHiddenScalar, ts); } } + + @Test + public void testUnboundedDefaultConcurrency() { + List> os = new ArrayList>(); + for(int i=0; i < 2000; i++) { + os.add(Observable.never()); + } + os.add(Observable.range(0, 100)); + + TestSubscriber ts = TestSubscriber.create(); + Observable.merge(os).take(1).subscribe(ts); + ts.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS); + ts.assertValue(0); + ts.assertCompleted(); + } + + @Test + public void testConcurrencyLimit() { + List> os = new ArrayList>(); + for(int i=0; i < 2000; i++) { + os.add(Observable.never()); + } + os.add(Observable.range(0, 100)); + + TestSubscriber ts = TestSubscriber.create(); + Observable.merge(os, Integer.MAX_VALUE).take(1).subscribe(ts); + ts.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS); + ts.assertValue(0); + ts.assertCompleted(); + } } diff --git a/src/test/java/rx/internal/util/JCToolsQueueTests.java b/src/test/java/rx/internal/util/JCToolsQueueTests.java index fea60217eb..fdf844bf81 100644 --- a/src/test/java/rx/internal/util/JCToolsQueueTests.java +++ b/src/test/java/rx/internal/util/JCToolsQueueTests.java @@ -460,4 +460,112 @@ public void testUnsafeAccessAddressOf() { } UnsafeAccess.addressOf(Object.class, "field"); } + + @Test + public void testSpscExactAtomicArrayQueue() { + for (int i = 1; i <= RxRingBuffer.SIZE * 2; i++) { + SpscExactAtomicArrayQueue q = new SpscExactAtomicArrayQueue(i); + + for (int j = 0; j < i; j++) { + assertTrue(q.offer(j)); + } + + assertFalse(q.offer(i)); + + for (int j = 0; j < i; j++) { + assertEquals((Integer)j, q.peek()); + assertEquals((Integer)j, q.poll()); + } + + for (int j = 0; j < RxRingBuffer.SIZE * 4; j++) { + assertTrue(q.offer(j)); + assertEquals((Integer)j, q.peek()); + assertEquals((Integer)j, q.poll()); + } + } + } + + @Test + public void testUnboundedAtomicArrayQueue() { + for (int i = 1; i <= RxRingBuffer.SIZE * 2; i *= 2) { + SpscUnboundedAtomicArrayQueue q = new SpscUnboundedAtomicArrayQueue(i); + + for (int j = 0; j < i; j++) { + assertTrue(q.offer(j)); + } + + assertTrue(q.offer(i)); + + for (int j = 0; j < i; j++) { + assertEquals((Integer)j, q.peek()); + assertEquals((Integer)j, q.poll()); + } + + assertEquals((Integer)i, q.peek()); + assertEquals((Integer)i, q.poll()); + + for (int j = 0; j < RxRingBuffer.SIZE * 4; j++) { + assertTrue(q.offer(j)); + assertEquals((Integer)j, q.peek()); + assertEquals((Integer)j, q.poll()); + } + } + + } + + + @Test(expected = NullPointerException.class) + public void testSpscAtomicArrayQueueNull() { + SpscAtomicArrayQueue q = new SpscAtomicArrayQueue(16); + q.offer(null); + } + + @Test + public void testSpscAtomicArrayQueueOfferPoll() { + Queue q = new SpscAtomicArrayQueue(128); + + testOfferPoll(q); + } + @Test(expected = UnsupportedOperationException.class) + public void testSpscAtomicArrayQueueIterator() { + SpscAtomicArrayQueue q = new SpscAtomicArrayQueue(16); + q.iterator(); + } + + @Test(expected = NullPointerException.class) + public void testSpscExactAtomicArrayQueueNull() { + SpscExactAtomicArrayQueue q = new SpscExactAtomicArrayQueue(10); + q.offer(null); + } + + @Test + public void testSpscExactAtomicArrayQueueOfferPoll() { + Queue q = new SpscAtomicArrayQueue(120); + + testOfferPoll(q); + } + @Test(expected = UnsupportedOperationException.class) + public void testSpscExactAtomicArrayQueueIterator() { + SpscAtomicArrayQueue q = new SpscAtomicArrayQueue(10); + q.iterator(); + } + + @Test(expected = NullPointerException.class) + public void testSpscUnboundedAtomicArrayQueueNull() { + SpscUnboundedAtomicArrayQueue q = new SpscUnboundedAtomicArrayQueue(16); + q.offer(null); + } + + @Test + public void testSpscUnboundedAtomicArrayQueueOfferPoll() { + Queue q = new SpscUnboundedAtomicArrayQueue(128); + + testOfferPoll(q); + } + @Test(expected = UnsupportedOperationException.class) + public void testSpscUnboundedAtomicArrayQueueIterator() { + SpscUnboundedAtomicArrayQueue q = new SpscUnboundedAtomicArrayQueue(16); + q.iterator(); + } + }