Skip to content

Commit b89ebe5

Browse files
authored
fix(scan/reduce): Typings correct for mixed seed/value types (#4858)
- Adds dtslint tests to cover various mixtures of seeds, accumulator results, and value types - Refactors scan a little bit, as types needed to be updated in the implementation - Resolves a performance issue where scan was calling next on the destination Subscriber when it had already errored
1 parent 259853e commit b89ebe5

7 files changed

Lines changed: 102 additions & 180 deletions

File tree

spec-dtslint/operators/reduce-spec.ts

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { of, Observable } from 'rxjs';
1+
import { of, OperatorFunction } from 'rxjs';
22
import { reduce } from 'rxjs/operators';
33

44
it('should enforce parameter', () => {
@@ -29,3 +29,29 @@ it('should accept seed parameter of a different type', () => {
2929
const bv: { [key: string]: string } = {};
3030
const b = of(1, 2, 3).pipe(reduce((x, y, z) => ({ ...x, [y]: y.toString() }), bv)); // $ExpectType Observable<{ [key: string]: string; }>
3131
});
32+
33+
it('should act appropriately with no seed', () => {
34+
// Because an observable of one value will just pass that value directly through the reducer,
35+
// the below could be a number or a string.
36+
const a = of(1, 2, 3).pipe(reduce((a: any, v) => '' + v)); // $ExpectType Observable<string | number>
37+
const b = of(1, 2, 3).pipe(reduce((a, v) => v)); // $ExpectType Observable<number>
38+
const c = of(1, 2, 3).pipe(reduce(() => {})); // $ExpectType Observable<number | void>
39+
});
40+
41+
it('should act appropriately with a seed', () => {
42+
const a = of(1, 2, 3).pipe(reduce((a, v) => a + v, '')); // $ExpectType Observable<string>
43+
const b = of(1, 2, 3).pipe(reduce((a, v) => a + v, 0)); // $ExpectType Observable<number>
44+
const c = of(1, 2, 3).pipe(reduce((a, v) => a + 1, [])); // $ExpectError
45+
});
46+
47+
it('should infer types properly from arguments', () => {
48+
function toArrayReducer(arr: number[], item: number, index: number): number[] {
49+
if (index === 0) {
50+
return [item];
51+
}
52+
arr.push(item);
53+
return arr;
54+
}
55+
56+
const a = reduce(toArrayReducer, [] as number[]); // $ExpectType OperatorFunction<number, number[]>
57+
});

spec-dtslint/operators/scan-spec.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,29 @@ it('should accept seed parameter of a different type', () => {
2929
const bv: { [key: string]: string } = {};
3030
const b = of(1, 2, 3).pipe(scan((x, y, z) => ({ ...x, [y]: y.toString() }), bv)); // $ExpectType Observable<{ [key: string]: string; }>
3131
});
32+
33+
it('should act appropriately with no seed', () => {
34+
// Because an observable of one value will just pass that value directly through the reducer,
35+
// the below could be a number or a string.
36+
const a = of(1, 2, 3).pipe(scan((a: any, v) => '' + v)); // $ExpectType Observable<string | number>
37+
const b = of(1, 2, 3).pipe(scan((a, v) => v)); // $ExpectType Observable<number>
38+
const c = of(1, 2, 3).pipe(scan(() => {})); // $ExpectType Observable<number | void>
39+
});
40+
41+
it('should act appropriately with a seed', () => {
42+
const a = of(1, 2, 3).pipe(scan((a, v) => a + v, '')); // $ExpectType Observable<string>
43+
const b = of(1, 2, 3).pipe(scan((a, v) => a + v, 0)); // $ExpectType Observable<number>
44+
const c = of(1, 2, 3).pipe(scan((a, v) => a + 1, [])); // $ExpectError
45+
});
46+
47+
it('should infer types properly from arguments', () => {
48+
function toArrayReducer(arr: number[], item: number, index: number): number[] {
49+
if (index === 0) {
50+
return [item];
51+
}
52+
arr.push(item);
53+
return arr;
54+
}
55+
56+
const a = scan(toArrayReducer, [] as number[]); // $ExpectType OperatorFunction<number, number[]>
57+
});

spec/operators/reduce-spec.ts

Lines changed: 1 addition & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ describe('reduce operator', () => {
193193
throw 'error';
194194
};
195195

196-
expectObservable(e1.pipe(reduce<string>(reduceFunction, seed))).toBe(expected);
196+
expectObservable(e1.pipe(reduce(reduceFunction, seed))).toBe(expected);
197197
expectSubscriptions(e1.subscriptions).toBe(e1subs);
198198
});
199199

@@ -289,105 +289,4 @@ describe('reduce operator', () => {
289289
expectObservable(e1.pipe(reduce(reduceFunction))).toBe(expected);
290290
expectSubscriptions(e1.subscriptions).toBe(e1subs);
291291
});
292-
293-
type('should accept array typed reducers', () => {
294-
let a: Observable<{ a: number; b: string }>;
295-
a.pipe(reduce((acc, value) => acc.concat(value), []));
296-
});
297-
298-
type('should accept T typed reducers', () => {
299-
let a: Observable<{ a: number; b: string }>;
300-
const reduced = a.pipe(reduce((acc, value) => {
301-
value.a = acc.a;
302-
value.b = acc.b;
303-
return acc;
304-
}));
305-
306-
reduced.subscribe(r => {
307-
r.a.toExponential();
308-
r.b.toLowerCase();
309-
});
310-
});
311-
312-
type('should accept T typed reducers when T is an array', () => {
313-
let a: Observable<number[]>;
314-
const reduced = a.pipe(reduce((acc, value) => {
315-
return acc.concat(value);
316-
}, []));
317-
318-
reduced.subscribe(rs => {
319-
rs[0].toExponential();
320-
});
321-
});
322-
323-
type('should accept R typed reduces when R is an array of T', () => {
324-
let a: Observable<number>;
325-
const reduced = a.pipe(reduce((acc, value) => {
326-
acc.push(value);
327-
return acc;
328-
}, []));
329-
330-
reduced.subscribe(rs => {
331-
rs[0].toExponential();
332-
});
333-
});
334-
335-
type('should accept R typed reducers when R is assignable to T', () => {
336-
let a: Observable<{ a?: number; b?: string }>;
337-
const reduced = a.pipe(reduce((acc, value) => {
338-
value.a = acc.a;
339-
value.b = acc.b;
340-
return acc;
341-
}, {} as { a?: number; b?: string }));
342-
343-
reduced.subscribe(r => {
344-
r.a.toExponential();
345-
r.b.toLowerCase();
346-
});
347-
});
348-
349-
type('should accept R typed reducers when R is not assignable to T', () => {
350-
let a: Observable<{ a: number; b: string }>;
351-
const seed = {
352-
as: [1],
353-
bs: ['a']
354-
};
355-
const reduced = a.pipe(reduce((acc, value: {a: number, b: string}) => {
356-
acc.as.push(value.a);
357-
acc.bs.push(value.b);
358-
return acc;
359-
}, seed));
360-
361-
reduced.subscribe(r => {
362-
r.as[0].toExponential();
363-
r.bs[0].toLowerCase();
364-
});
365-
});
366-
367-
type('should accept R typed reducers and reduce to type R', () => {
368-
let a: Observable<{ a: number; b: string }>;
369-
const reduced = a.pipe(reduce<{ a?: number; b?: string }>((acc, value) => {
370-
value.a = acc.a;
371-
value.b = acc.b;
372-
return acc;
373-
}, {}));
374-
375-
reduced.subscribe(r => {
376-
r.a.toExponential();
377-
r.b.toLowerCase();
378-
});
379-
});
380-
381-
type('should accept array of R typed reducers and reduce to array of R', () => {
382-
let a: Observable<number>;
383-
const reduced = a.pipe(reduce<number, string[]>((acc, cur) => {
384-
console.log(acc);
385-
acc.push(cur.toString());
386-
return acc;
387-
}, [] as string[]));
388-
389-
reduced.subscribe(rs => {
390-
rs[0].toLowerCase();
391-
});
392-
});
393292
});

spec/operators/scan-spec.ts

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -227,27 +227,4 @@ describe('scan operator', () => {
227227
expectObservable(scanObs).toBe(expected, values);
228228
expectSubscriptions(e1.subscriptions).toBe(e1subs);
229229
});
230-
231-
type('should accept array typed reducers', () => {
232-
let a: Observable<{ a: number; b: string }>;
233-
a.pipe(scan((acc, value) => acc.concat(value), []));
234-
});
235-
236-
type('should accept T typed reducers', () => {
237-
let a: Observable<{ a?: number; b?: string }>;
238-
a.pipe(scan((acc, value) => {
239-
value.a = acc.a;
240-
value.b = acc.b;
241-
return acc;
242-
}, {} as { a?: number; b?: string }));
243-
});
244-
245-
type('should accept R typed reducers', () => {
246-
let a: Observable<{ a: number; b: string }>;
247-
a.pipe(scan<{ a?: number; b?: string }>((acc, value) => {
248-
value.a = acc.a;
249-
value.b = acc.b;
250-
return acc;
251-
}, {}));
252-
});
253230
});

src/internal/operators/reduce.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ import { Observable } from '../Observable';
22
import { scan } from './scan';
33
import { takeLast } from './takeLast';
44
import { defaultIfEmpty } from './defaultIfEmpty';
5-
import { OperatorFunction, MonoTypeOperatorFunction } from '../types';
5+
import { OperatorFunction } from '../types';
66
import { pipe } from '../util/pipe';
77

88
/* tslint:disable:max-line-length */
9-
export function reduce<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed: R): OperatorFunction<T, R>;
10-
export function reduce<T>(accumulator: (acc: T, value: T, index: number) => T, seed?: T): MonoTypeOperatorFunction<T>;
11-
export function reduce<T, R>(accumulator: (acc: R, value: T, index: number) => R): OperatorFunction<T, R>;
9+
export function reduce<V, A = V>(accumulator: (acc: A|V, value: V, index: number) => A): OperatorFunction<V, V|A>;
10+
export function reduce<V, A>(accumulator: (acc: A, value: V, index: number) => A, seed: A): OperatorFunction<V, A>;
11+
export function reduce<V, A, S = A>(accumulator: (acc: A|S, value: V, index: number) => A, seed: S): OperatorFunction<V, A>;
1212
/* tslint:enable:max-line-length */
1313

1414
/**
@@ -54,28 +54,32 @@ export function reduce<T, R>(accumulator: (acc: R, value: T, index: number) => R
5454
* @see {@link mergeScan}
5555
* @see {@link scan}
5656
*
57-
* @param {function(acc: R, value: T, index: number): R} accumulator The accumulator function
57+
* @param {function(acc: A, value: V, index: number): A} accumulator The accumulator function
5858
* called on each source value.
59-
* @param {R} [seed] The initial accumulation value.
60-
* @return {Observable<R>} An Observable that emits a single value that is the
59+
* @param {A} [seed] The initial accumulation value.
60+
* @return {Observable<A>} An Observable that emits a single value that is the
6161
* result of accumulating the values emitted by the source Observable.
6262
* @method reduce
6363
* @owner Observable
6464
*/
65-
export function reduce<T, R>(accumulator: (acc: T | R, value: T, index?: number) => T | R, seed?: T | R): OperatorFunction<T, T | R> {
65+
export function reduce<V, A>(accumulator: (acc: V | A, value: V, index?: number) => A, seed?: any): OperatorFunction<V, V | A> {
6666
// providing a seed of `undefined` *should* be valid and trigger
6767
// hasSeed! so don't use `seed !== undefined` checks!
6868
// For this reason, we have to check it here at the original call site
6969
// otherwise inside Operator/Subscriber we won't know if `undefined`
7070
// means they didn't provide anything or if they literally provided `undefined`
7171
if (arguments.length >= 2) {
72-
return function reduceOperatorFunctionWithSeed(source: Observable<T>): Observable<T | R> {
73-
return pipe(scan(accumulator, seed), takeLast(1), defaultIfEmpty(seed))(source);
72+
return function reduceOperatorFunctionWithSeed(source: Observable<V>): Observable<V | A> {
73+
return pipe(
74+
scan(accumulator, seed),
75+
takeLast(1),
76+
defaultIfEmpty(seed),
77+
)(source);
7478
};
7579
}
76-
return function reduceOperatorFunction(source: Observable<T>): Observable<T | R> {
80+
return function reduceOperatorFunction(source: Observable<V>): Observable<V | A> {
7781
return pipe(
78-
scan<T, T | R>((acc, value, index) => accumulator(acc, value, index + 1)),
82+
scan<V, V | A>((acc, value, index) => accumulator(acc, value, index + 1)),
7983
takeLast(1),
8084
)(source);
8185
};

src/internal/operators/scan.ts

Lines changed: 31 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { Operator } from '../Operator';
22
import { Observable } from '../Observable';
33
import { Subscriber } from '../Subscriber';
4-
import { OperatorFunction, MonoTypeOperatorFunction } from '../types';
4+
import { OperatorFunction, TeardownLogic } from '../types';
55

66
/* tslint:disable:max-line-length */
7-
export function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed: R): OperatorFunction<T, R>;
8-
export function scan<T>(accumulator: (acc: T, value: T, index: number) => T, seed?: T): MonoTypeOperatorFunction<T>;
9-
export function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R): OperatorFunction<T, R>;
7+
export function scan<V, A = V>(accumulator: (acc: A|V, value: V, index: number) => A): OperatorFunction<V, V|A>;
8+
export function scan<V, A>(accumulator: (acc: A, value: V, index: number) => A, seed: A): OperatorFunction<V, A>;
9+
export function scan<V, A, S>(accumulator: (acc: A|S, value: V, index: number) => A, seed: S): OperatorFunction<V, A>;
1010
/* tslint:enable:max-line-length */
1111

1212
/**
@@ -45,14 +45,14 @@ export function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R):
4545
* @see {@link mergeScan}
4646
* @see {@link reduce}
4747
*
48-
* @param {function(acc: R, value: T, index: number): R} accumulator
48+
* @param {function(acc: A, value: V, index: number): A} accumulator
4949
* The accumulator function called on each source value.
50-
* @param {T|R} [seed] The initial accumulation value.
51-
* @return {Observable<R>} An observable of the accumulated values.
50+
* @param {V|A} [seed] The initial accumulation value.
51+
* @return {Observable<A>} An observable of the accumulated values.
5252
* @method scan
5353
* @owner Observable
5454
*/
55-
export function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed?: T | R): OperatorFunction<T, R> {
55+
export function scan<V, A, S>(accumulator: (acc: V|A|S, value: V, index: number) => A, seed?: S): OperatorFunction<V, V|A> {
5656
let hasSeed = false;
5757
// providing a seed of `undefined` *should* be valid and trigger
5858
// hasSeed! so don't use `seed !== undefined` checks!
@@ -63,15 +63,15 @@ export function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R,
6363
hasSeed = true;
6464
}
6565

66-
return function scanOperatorFunction(source: Observable<T>): Observable<R> {
66+
return function scanOperatorFunction(source: Observable<V>) {
6767
return source.lift(new ScanOperator(accumulator, seed, hasSeed));
6868
};
6969
}
7070

71-
class ScanOperator<T, R> implements Operator<T, R> {
72-
constructor(private accumulator: (acc: R, value: T, index: number) => R, private seed?: T | R, private hasSeed: boolean = false) {}
71+
class ScanOperator<V, A, S> implements Operator<V, A> {
72+
constructor(private accumulator: (acc: V|A|S, value: V, index: number) => A, private seed?: S, private hasSeed: boolean = false) {}
7373

74-
call(subscriber: Subscriber<R>, source: any): any {
74+
call(subscriber: Subscriber<A>, source: any): TeardownLogic {
7575
return source.subscribe(new ScanSubscriber(subscriber, this.accumulator, this.seed, this.hasSeed));
7676
}
7777
}
@@ -81,41 +81,31 @@ class ScanOperator<T, R> implements Operator<T, R> {
8181
* @ignore
8282
* @extends {Ignored}
8383
*/
84-
class ScanSubscriber<T, R> extends Subscriber<T> {
84+
class ScanSubscriber<V, A> extends Subscriber<V> {
8585
private index: number = 0;
8686

87-
get seed(): T | R {
88-
return this._seed;
89-
}
90-
91-
set seed(value: T | R) {
92-
this.hasSeed = true;
93-
this._seed = value;
94-
}
95-
96-
constructor(destination: Subscriber<R>, private accumulator: (acc: R, value: T, index: number) => R, private _seed: T | R,
97-
private hasSeed: boolean) {
87+
constructor(destination: Subscriber<A>, private accumulator: (acc: V|A, value: V, index: number) => A, private _state: any,
88+
private _hasState: boolean) {
9889
super(destination);
9990
}
10091

101-
protected _next(value: T): void {
102-
if (!this.hasSeed) {
103-
this.seed = value;
104-
this.destination.next(value);
92+
protected _next(value: V): void {
93+
const { destination } = this;
94+
if (!this._hasState) {
95+
this._state = value;
96+
this._hasState = true;
97+
destination.next(value);
10598
} else {
106-
return this._tryNext(value);
107-
}
108-
}
109-
110-
private _tryNext(value: T): void {
111-
const index = this.index++;
112-
let result: any;
113-
try {
114-
result = this.accumulator(<R>this.seed, value, index);
115-
} catch (err) {
116-
this.destination.error(err);
99+
const index = this.index++;
100+
let result: A;
101+
try {
102+
result = this.accumulator(this._state, value, index);
103+
} catch (err) {
104+
destination.error(err);
105+
return;
106+
}
107+
this._state = result;
108+
destination.next(result);
117109
}
118-
this.seed = result;
119-
this.destination.next(result);
120110
}
121111
}

src/internal/operators/toArray.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { reduce } from './reduce';
22
import { OperatorFunction } from '../types';
33

4-
function toArrayReducer<T>(arr: T[], item: T, index: number) {
4+
function toArrayReducer<T>(arr: T[], item: T, index: number): T[] {
55
if (index === 0) {
66
return [item];
77
}

0 commit comments

Comments
 (0)