Skip to content

Commit f31c3df

Browse files
cartantbenlesh
authored andcommitted
fix(race): ignore latter sources after first complete or error (#4809)
* test(race): add failing tests * fix(race): ignore sources after first complete/error Closes #4808 BREAKING CHANGE: `race()` will no longer subscribe to subsequent observables if a provided source synchronously errors or completes. This means side effects that might have occurred during subscription in those rare cases will no longer occur.
1 parent 362d1d4 commit f31c3df

2 files changed

Lines changed: 33 additions & 1 deletion

File tree

spec/operators/race-spec.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { expect } from 'chai';
22
import * as sinon from 'sinon';
33
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
4-
import { NEVER, of, race as staticRace, timer, defer, Observable } from 'rxjs';
4+
import { EMPTY, NEVER, of, race as staticRace, timer, defer, Observable, throwError } from 'rxjs';
55
import { race, mergeMap, map, finalize, startWith } from 'rxjs/operators';
66

77
/** @test {race} */
@@ -184,6 +184,28 @@ describe('race operator', () => {
184184
expect(onSubscribe.called).to.be.false;
185185
});
186186

187+
it('should ignore latter observables if a former one completes immediately', () => {
188+
const onComplete = sinon.spy();
189+
const onSubscribe = sinon.spy();
190+
const e1 = EMPTY; // Wins the race
191+
const e2 = defer(onSubscribe); // Should be ignored
192+
193+
e1.pipe(race(e2)).subscribe({ complete: onComplete });
194+
expect(onComplete.calledWithExactly()).to.be.true;
195+
expect(onSubscribe.called).to.be.false;
196+
});
197+
198+
it('should ignore latter observables if a former one errors immediately', () => {
199+
const onError = sinon.spy();
200+
const onSubscribe = sinon.spy();
201+
const e1 = throwError('kaboom'); // Wins the race
202+
const e2 = defer(onSubscribe); // Should be ignored
203+
204+
e1.pipe(race(e2)).subscribe({ error: onError });
205+
expect(onError.calledWithExactly('kaboom')).to.be.true;
206+
expect(onSubscribe.called).to.be.false;
207+
});
208+
187209
it('should unsubscribe former observables if a latter one emits immediately', () => {
188210
const onNext = sinon.spy();
189211
const onUnsubscribe = sinon.spy();

src/internal/observable/race.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,14 @@ export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
137137

138138
this.destination.next(innerValue);
139139
}
140+
141+
notifyComplete(innerSub: InnerSubscriber<T, T>): void {
142+
this.hasFirst = true;
143+
super.notifyComplete(innerSub);
144+
}
145+
146+
notifyError(error: any, innerSub: InnerSubscriber<T, T>): void {
147+
this.hasFirst = true;
148+
super.notifyError(error, innerSub);
149+
}
140150
}

0 commit comments

Comments
 (0)