Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 9 additions & 27 deletions src/main/java/reactor/core/processor/EmitterProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,7 @@ public void onNext(T t) {
}
}

if (RUNNING.getAndIncrement(this) != 0) {
return;
}

drainLoop();

drain();
}
else {
buffer(t);
Expand Down Expand Up @@ -282,7 +277,7 @@ public long expectedFromUpstream() {
return outstanding;
}

RingBuffer<RingBuffer.Slot<T>> getMainQueue() {
private RingBuffer<RingBuffer.Slot<T>> getMainQueue() {
RingBuffer<RingBuffer.Slot<T>> q = emitBuffer;
if (q == null) {
q = RingBuffer.createSingleProducer(bufferSize);
Expand All @@ -291,7 +286,7 @@ RingBuffer<RingBuffer.Slot<T>> getMainQueue() {
return q;
}

final long buffer(T value) {
private long buffer(T value) {
RingBuffer<RingBuffer.Slot<T>> q = getMainQueue();

long seq = q.next();
Expand All @@ -301,16 +296,15 @@ final long buffer(T value) {
return seq;
}

final void drain() {
private void drain() {
if (RUNNING.getAndIncrement(this) == 0) {
drainLoop();
}
}

final void drainLoop() {
int missed = 1;
private void drainLoop() {
RingBuffer<RingBuffer.Slot<T>> q = null;
for (; ; ) {
do {
EmitterSubscriber<?>[] inner = subscribers;
if (inner == CANCELLED) {
cancel();
Expand All @@ -325,12 +319,11 @@ final void drainLoop() {
int n = inner.length;

if (n != 0) {
int j = 0;
Sequence innerSequence;
long _r;

for (int i = 0; i < n; i++) {
@SuppressWarnings("unchecked") EmitterSubscriber<T> is = (EmitterSubscriber<T>) inner[j];
@SuppressWarnings("unchecked") EmitterSubscriber<T> is = (EmitterSubscriber<T>) inner[i];

long r = is.requested;

Expand Down Expand Up @@ -384,11 +377,6 @@ final void drainLoop() {
if (d) {
checkTerminal(is, innerSequence, _r);
}

j++;
if (j == n) {
j = 0;
}
}

if (!done && firstDrain) {
Expand All @@ -408,11 +396,7 @@ final void drainLoop() {
}
}

missed = RUNNING.addAndGet(this, -missed);
if (missed == 0) {
break;
}
}
} while (RUNNING.addAndGet(this, -1) > 0);
}

final void checkTerminal(EmitterSubscriber<T> is, Sequence innerSequence, long r) {
Expand Down Expand Up @@ -591,9 +575,7 @@ public EmitterSubscriber(EmitterProcessor<T> parent, final Subscriber<? super T>
public void request(long n) {
if (BackpressureUtils.checkRequest(n, actual)) {
BackpressureUtils.getAndAdd(REQUESTED, this, n);
if (EmitterProcessor.RUNNING.getAndIncrement(parent) == 0) {
parent.drainLoop();
}
parent.drain();
}
}

Expand Down