@@ -3,9 +3,10 @@ import { identity } from '../util/identity.js';
33import { wrapWithAbort } from './operators/withabort.js' ;
44import { throwIfAborted } from '../aborterror.js' ;
55import { safeRace } from '../util/safeRace.js' ;
6+ import { returnAsyncIterators } from '../util/returniterator.js' ;
67
78// eslint-disable-next-line @typescript-eslint/no-empty-function
8- const NEVER_PROMISE = new Promise ( ( ) => { } ) ;
9+ const NEVER_PROMISE = new Promise < never > ( ( ) => { } ) ;
910
1011type MergeResult < T > = { value : T ; index : number } ;
1112
@@ -28,39 +29,42 @@ export class CombineLatestAsyncIterable<TSource> extends AsyncIterableX<TSource[
2829 const length = this . _sources . length ;
2930 const iterators = new Array < AsyncIterator < TSource > > ( length ) ;
3031 const nexts = new Array < Promise < MergeResult < IteratorResult < TSource > > > > ( length ) ;
31- let hasValueAll = false ;
32- const values = new Array < TSource > ( length ) ;
33- const hasValues = new Array < boolean > ( length ) ;
34- let active = length ;
3532
36- hasValues . fill ( false ) ;
33+ let active = length ;
34+ let allValuesAvailable = false ;
35+ const values = new Array < TSource > ( length ) ;
36+ const hasValues = new Array < boolean > ( length ) . fill ( false ) ;
3737
3838 for ( let i = 0 ; i < length ; i ++ ) {
3939 const iterator = wrapWithAbort ( this . _sources [ i ] , signal ) [ Symbol . asyncIterator ] ( ) ;
4040 iterators [ i ] = iterator ;
4141 nexts [ i ] = wrapPromiseWithIndex ( iterator . next ( ) , i ) ;
4242 }
4343
44- while ( active > 0 ) {
45- const next = safeRace ( nexts ) ;
46- const {
47- value : { value : value$ , done : done$ } ,
48- index,
49- } = await next ;
50- if ( done$ ) {
51- nexts [ index ] = < Promise < MergeResult < IteratorResult < TSource > > > > NEVER_PROMISE ;
52- active -- ;
53- } else {
54- values [ index ] = value$ ;
55- hasValues [ index ] = true ;
44+ try {
45+ while ( active > 0 ) {
46+ const {
47+ value : { value, done } ,
48+ index,
49+ } = await safeRace ( nexts ) ;
50+
51+ if ( done ) {
52+ nexts [ index ] = NEVER_PROMISE ;
53+ active -- ;
54+ } else {
55+ values [ index ] = value ;
56+ hasValues [ index ] = true ;
57+ allValuesAvailable = allValuesAvailable || hasValues . every ( identity ) ;
5658
57- const iterator$ = iterators [ index ] ;
58- nexts [ index ] = wrapPromiseWithIndex ( iterator$ . next ( ) , index ) ;
59+ nexts [ index ] = wrapPromiseWithIndex ( iterators [ index ] . next ( ) , index ) ;
5960
60- if ( hasValueAll || ( hasValueAll = hasValues . every ( identity ) ) ) {
61- yield values ;
61+ if ( allValuesAvailable ) {
62+ yield values ;
63+ }
6264 }
6365 }
66+ } finally {
67+ await returnAsyncIterators ( iterators ) ;
6468 }
6569 }
6670}
@@ -176,5 +180,5 @@ export function combineLatest<T, T2, T3, T4, T5, T6>(
176180 */
177181export function combineLatest < T > ( ...sources : AsyncIterable < T > [ ] ) : AsyncIterableX < T [ ] > ;
178182export function combineLatest < T > ( ...sources : any [ ] ) : AsyncIterableX < T [ ] > {
179- return new CombineLatestAsyncIterable < T > ( sources ) ;
183+ return new CombineLatestAsyncIterable ( sources ) ;
180184}
0 commit comments