Modern applications often need to perform multiple operations concurrently to improve responsiveness and efficiency. This guide explores how to leverage the parallel execution capabilities within the "Effectively" library to manage complex asynchronous workflows. We'll cover everything from simple parallel calls to advanced streaming and scheduling techniques.
- Why Run Tasks in Parallel?
- Getting Started: Simple Parallel Execution
- Controlling Parallelism:
ParallelOptionsconcurrency: Limiting Simultaneous Taskspriority: Guiding the Schedulerbatching&batchSize: Processing in GroupspreserveOrder: Result Order vs. Completion Ordersignal: Custom Cancellation
- Structured Parallelism:
forkJoinandallTupleforkJoin: Named Parallel TasksallTuple: Ordered Parallel Tasks with Typed Tuples- "Settled" Variants:
forkJoinSettledandallTupleSettled
- Advanced Result Handling
some: Collecting Only Successful Resultsrace: Getting the First Task to Settleany: Getting the First Task to Succeed
- Streaming Parallelism: Handling Large or Dynamic Task Sets
stream: Core Streaming EnginestreamAll,streamSome,streamAny,raceFrom(for Iterables)
- Conditional Parallelism with
ift - The Scheduler Backend
- Native Scheduler vs. Promise-Based Fallback
- Using
scheduler.postTaskDirectly
- Best Practices & Performance Considerations
- Troubleshooting Common Issues
Many applications involve operations that can happen independently without waiting for each other. For example:
- Fetching user profile data, user settings, and user activity simultaneously for a dashboard.
- Processing multiple uploaded files.
- Making several API calls to different microservices.
Running these tasks in parallel, rather than one after another (sequentially), can dramatically reduce the total time taken, leading to faster load times and a more responsive user experience.
The "Effectively" library provides several utilities to run an array of tasks concurrently. All these utilities expect an array of Task<C, V, R> functions and a common input value V that will be passed to each task.
This is often the most robust starting point. allSettled executes all provided tasks in parallel and waits for all of them to complete, regardless of whether they succeed or fail. It never rejects its own promise (unless there's an unexpected internal error). Instead, it resolves with an array detailing the outcome of each individual task.
API: allSettled<C, V, R>(tasks, value, options?, providedContext?): Promise<ParallelResult<R>[]>
tasks: ReadonlyArray<Task<C, V, R>>: An array of your tasks.value: V: The common input value passed to each task.options?: ParallelOptions: Optional configuration (see Controlling Parallelism).providedContext?: C: Optional explicit context. Defaults to ambient context.
ParallelResult<R> Structure:
Each element in the resolved array will be an object:
{ status: 'fulfilled'; value: R; index: number }if the task succeeded.{ status: 'rejected'; reason: unknown; index: number }if the task failed.indexrefers to the original position of the task in the inputtasksarray, especially useful whenpreserveOrder: false.
Example:
import { allSettled, defineTask, run, createContext, type BaseContext } from 'effectively'; // Adjust path
interface MyContext extends BaseContext { /* ... */ }
const { run: appRun, defineTask: appDefineTask } = createContext<MyContext>({ /* ... */ });
const fetchUserData = appDefineTask(async (ctx, userId: string) => {
console.log(`Fetching user ${userId}`);
// Simulate API call
await new Promise(res => setTimeout(res, 100));
if (userId === 'user-error') throw new Error('Failed to fetch user');
return { id: userId, name: `User ${userId}` };
});
const fetchUserOrders = appDefineTask(async (ctx, userId: string) => {
console.log(`Fetching orders for ${userId}`);
await new Promise(res => setTimeout(res, 150));
return [{ orderId: 'o1', amount: 100 }, { orderId: 'o2', amount: 50 }];
});
const loadAllData = appDefineTask(async (ctx, userId: string) => {
const results = await allSettled(
[fetchUserData, fetchUserOrders], // Array of Tasks
userId, // Common input value for both tasks
{ concurrency: 2 } // Options
// `ctx` is implicitly passed if appDefineTask is used from the same ContextTools as run
);
const userData = results[0].status === 'fulfilled' ? results[0].value : null;
const ordersData = results[1].status === 'fulfilled' ? results[1].value : [];
if (results[0].status === 'rejected') {
console.error('User fetch failed:', results[0].reason);
}
return { user: userData, orders: ordersData };
});
async function main() {
let data = await appRun(loadAllData, 'user-123');
console.log('Data for user-123:', data);
data = await appRun(loadAllData, 'user-error'); // This will show one failure
console.log('Data for user-error:', data);
}
main();When to use allSettled:
- When you need to know the outcome of every task, even if some fail.
- When one task failing shouldn't necessarily stop others or fail the overall operation.
- Ideal for dashboards where you want to display as much data as possible, showing errors for parts that failed.
If you need all tasks to succeed and want the entire operation to fail if any single task fails, use all. This behaves like the standard Promise.all.
API: all<C, V, R>(tasks, value, options?, providedContext?): Promise<R[]>
- Returns a
Promisethat resolves to an array of resultsR[]if all tasks succeed. The order of results matches the inputtasksarray. - If any task rejects, the promise returned by
allimmediately rejects with the reason of the first task that failed. Other tasks might still be running but their results/errors will be ignored.
Example:
import { all, defineTask, run, createContext, type BaseContext } from 'effectively';
// Assuming fetchCriticalConfig and fetchEssentialService are defined Tasks
// const fetchCriticalConfig = appDefineTask(async (ctx, serviceName: string) => { /* ... */ });
// const fetchEssentialService = appDefineTask(async (ctx, serviceName: string) => { /* ... */ });
const initializeApp = appDefineTask(async (ctx, appName: string) => {
try {
// Both results are needed, so use `all`
const [config, serviceStatus] = await all(
[fetchCriticalConfig, fetchEssentialService],
appName, // Common input (e.g., appName or null if tasks don't need it)
{ priority: 'user-blocking' }
);
// Both `config` and `serviceStatus` are available here if successful
return { config, serviceStatus };
} catch (error) {
console.error(`Critical initialization failed for ${appName}:`, error);
throw error; // Re-throw to fail the `initializeApp` task
}
});
// async function main() {
// try {
// const initResult = await appRun(initializeApp, "my-app");
// console.log("App initialized:", initResult);
// } catch (e) {
// console.log("App failed to initialize fully.");
// }
// }
// main();When to use all:
- When all parallel operations are critical, and the failure of one means the overall operation cannot succeed.
- When you want behavior identical to
Promise.allbut with the added benefits of the "Effectively" scheduler (priority, concurrency, cancellation).
All array-based parallel utilities (allSettled, all, some, any) accept an optional options object of type ParallelOptions:
export interface ParallelOptions {
concurrency?: number; // Default: Infinity
priority?: TaskPriority; // Default: 'user-visible' ('user-blocking' | 'user-visible' | 'background')
batching?: boolean; // Default: false
batchSize?: number; // Default: 10 (only if batching is true)
preserveOrder?: boolean; // Default: true (for allSettled, results match input task order)
signal?: AbortSignal; // Default: context.scope.signal (for overall cancellation)
}-
concurrency: number- Limits how many tasks run simultaneously. For example,
{ concurrency: 5 }will run at most 5 tasks at a time. - Crucial for I/O-bound tasks (like network requests) to avoid overwhelming the browser, server, or rate limits.
- For CPU-bound tasks, setting concurrency close to
navigator.hardwareConcurrencycan be optimal. - If
batchingis true,concurrencyis typically not applied directly byexecuteBatched(batches run sequentially, parallelism is within a batch viascheduler.postTaskwhich respects priority).
- Limits how many tasks run simultaneously. For example,
-
priority: TaskPriority- Guides the underlying scheduler (
scheduler.postTask). 'user-blocking': Highest priority (e.g., usequeueMicrotaskin fallback). For critical UI updates, input responses.'user-visible': Default. For tasks that update visible content but aren't immediately blocking.'background': Lowest priority. For deferrable work like logging, analytics, pre-fetching.- The native browser scheduler uses these hints for better resource allocation. The fallback scheduler approximates this.
- Guides the underlying scheduler (
-
batching: booleanandbatchSize: number- When
batching: true, tasks are grouped into batches ofbatchSize. - Batches are processed sequentially. Tasks within each batch are posted to the scheduler and can run in parallel according to their priority and available resources.
- Useful when there's an overhead per "operation dispatch" or if tasks within a batch can share resources or context more efficiently.
- When
batchingis true, theconcurrencyoption inParallelOptionsis not directly used by theexecuteBatchedstrategy; concurrency is managed at thescheduler.postTasklevel for tasks within a batch.
- When
-
preserveOrder: boolean(Mainly forallSettled,any)- If
true(default forallSettled), the array ofParallelResultobjects will be in the same order as the inputtasksarray. EachParallelResultincludes anindexproperty corresponding to its original position. - If
false, results may arrive in completion order, which can be slightly more performant if you don't need the original order and process results as they come. allalways preserves order implicitly.someeffectively usespreserveOrder: false.
- If
-
signal: AbortSignal- Allows you to provide a custom
AbortSignalto cancel the entire parallel operation. - If not provided, it defaults to
context.scope.signalfrom the context in which the parallel utility is run. - The scheduler utilities also listen to
context.scope.signaland will aggregate it with this provided signal.
- Allows you to provide a custom
These are pipeable operators, usually used within createWorkflow (from @doeixd/effectively/utils) or with a pipe function, to manage parallel tasks whose results contribute to a structured output.
Executes an object of tasks in parallel, where each key in the object maps to a task. The result is an object with the same keys, but with the resolved values of the tasks.
API: forkJoin<C, V, T extends Record<string, Task<C, V, any>>>(tasks: T): Task<C, V, { [K in keyof T]: T[K] extends Task<C, V, infer R> ? R : never }>
Example:
import { forkJoin /*, createWorkflow, fromValue, map (from utils) */ } from 'effectively'; // utils & parallel
// Assuming:
// const fetchUser = appDefineTask(async (ctx, userId: string) => ({ id: userId, name: 'Alice' }));
// const fetchUserPreferences = appDefineTask(async (ctx, userId: string) => ({ theme: 'dark' }));
const getUserDashboardData = createWorkflow(
fromValue('user-123'), // Input: userId
forkJoin({
profile: fetchUser, // profile will be User object
settings: fetchUserPreferences // settings will be Prefs object
}),
// Output of forkJoin: { profile: User, settings: Prefs }
map(dashboardData => {
return `User: ${dashboardData.profile.name}, Theme: ${dashboardData.settings.theme}`;
})
);
// const dashboardMessage = await appRun(getUserDashboardData, undefined);
// console.log(dashboardMessage); // "User: Alice, Theme: dark"forkJoinusesPromise.allsemantics (fail-fast).
Executes an array (or tuple) of tasks in parallel and returns their results in a typed tuple. For best type inference, use as const with your input task array.
API: allTuple<C, V, T extends ReadonlyArray<Task<C, V, any>>>(tasks: T): Task<C, V, { -readonly [K in keyof T]: T[K] extends Task<C, V, infer R> ? R : never }>
Example:
import { allTuple /*, createWorkflow, fromValue (from utils) */ } from 'effectively';
// Assuming:
// const dbConnect = appDefineTask(async (ctx, _:null) => ({ type: 'db' as const }));
// const cacheConnect = appDefineTask(async (ctx, _:null) => ({ type: 'cache' as const }));
const initializeServices = createWorkflow(
fromValue(null), // No specific input needed for these connection tasks
allTuple([
dbConnect,
cacheConnect
] as const), // `as const` is crucial for tuple type inference!
// Output: [DbConnection, CacheClient]
map(([db, cache]) => { // Destructure with types!
console.log('DB Type:', db.type);
console.log('Cache Type:', cache.type);
return 'Services initialized';
})
);
// const initStatus = await appRun(initializeServices, undefined);
// console.log(initStatus);allTuplealso usesPromise.allsemantics (fail-fast).
Just like allSettled for arrays, these variants execute all tasks and return their outcomes (success or failure) without the main task rejecting.
forkJoinSettled: ReturnsTask<..., { [K in keyof T]: SettledTaskResult<R> }>allTupleSettled: ReturnsTask<..., { -readonly [K in keyof T]: SettledTaskResult<R> }>
Where SettledTaskResult<R> is Result<R, unknown> from neverthrow.
Example (forkJoinSettled):
import { forkJoinSettled, type SettledTaskResult /*, ... */ } from 'effectively';
// Assuming fetchPrimaryData might fail, fetchSecondaryData is more reliable
const fetchCombinedData = createWorkflow(
fromValue('input'),
forkJoinSettled({
primary: fetchPrimaryData, // Task<Ctx, string, PrimaryData>
secondary: fetchSecondaryData // Task<Ctx, string, SecondaryData>
}),
map(results => {
const primary = results.primary.isOk() ? results.primary.value : null;
if (results.primary.isErr()) {
console.warn("Primary data failed to load:", results.primary.error);
}
const secondary = results.secondary.unwrapOr({ default: true }); // Use neverthrow's unwrapOr
return { primary, secondary };
})
);Beyond all and allSettled, this module provides helpers mirroring other Promise combinators:
-
some<C, V, R>(tasks, value, options?, ctx?): Promise<R[]>- Executes all tasks in parallel.
- Resolves with an array containing only the values from successfully fulfilled tasks.
- Rejections are ignored. The order of results is based on completion.
- Useful when you want any available data and failures are acceptable for individual items.
-
race<C, V, R>(tasks, value, options?, ctx?): Promise<R>- Like
Promise.race. Resolves or rejects with the outcome (value or reason) of the first task to settle (either fulfill or reject). - Other tasks are signalled to cancel (via the internal
raceController). optionsexcludepreserveOrderandbatching.
- Like
-
any<C, V, R>(tasks, value, options?, ctx?): Promise<R>- Like
Promise.any. Resolves with the value of the first task to fulfill. - If all tasks reject, it rejects with an
AggregateErrorcontaining all rejection reasons. - The "first" successful task is determined by iterating through the
allSettledresults (order depends onpreserveOrderinoptions). For temporally first, seestreamAny.
- Like
When dealing with a very large number of tasks, or when tasks are generated dynamically (e.g., from a paginated API), loading them all into an array for allSettled can be memory-intensive. Streaming utilities process tasks from an Iterable or AsyncIterable, yielding results as they complete with controlled concurrency.
Options: Omit<ParallelOptions, 'preserveOrder' | 'batching'> (these don't apply to unordered streams).
stream<C, V, R>(tasksIterable, value, options?, ctx?): AsyncIterable<ParallelResult<R>>- The core streaming engine. Takes an iterable of tasks.
- Executes tasks with specified
concurrencyandpriority. - Yields
ParallelResult<R>objects (likeallSettled) as each task completes. - Highly memory efficient.
Example (stream):
import { stream, defineTask, run, createContext, type BaseContext } from 'effectively';
// async function* generateTasks(count: number, inputValue: string): AsyncIterable<Task<MyContext, string, string>> {
// for (let i = 0; i < count; i++) {
// await new Promise(r => setTimeout(r, 10)); // Simulate dynamic task generation
// yield appDefineTask(async (ctx, val: string) => {
// await new Promise(r => setTimeout(r, Math.random() * 100));
// return `Result from ${val} - task ${i}`;
// });
// }
// }
// const processStreamedResults = appDefineTask(async (ctx, { count, inputValue }) => {
// const taskIterable = generateTasks(count, inputValue);
// let successful = 0;
// for await (const result of stream(taskIterable, inputValue, { concurrency: 3 }, ctx)) {
// if (result.status === 'fulfilled') {
// console.log('Streamed result:', result.value);
// successful++;
// } else {
// console.error('Streamed task failed:', result.reason);
// }
// }
// return `Processed ${count} tasks, ${successful} succeeded.`;
// });
// await appRun(processStreamedResults, { count: 100, inputValue: 'stream-input' });Derived Streaming Helpers:
streamAll<C, V, R>(...): AsyncIterable<R>: Yields successful results; throws on the first task failure (fail-fast).streamSome<C, V, R>(...): AsyncIterable<R>: Yields successful results; ignores task failures.streamAny<C, V, R>(...): Promise<R>: Resolves with the value of the temporally first task to fulfill from the iterable; rejects withAggregateErrorif all fail.raceFrom<C, V, R>(...): Promise<R>: Resolves/rejects with the outcome of the temporally first task to settle from the iterable.
While not strictly a parallel execution utility itself, ift (if-then-else) is a crucial control flow operator from parallel-utils.ts (or your control flow module) that can determine which parallel (or sequential) workflow to execute.
API: ift<C, V, R1, R2>(predicate, onTrueTask, onFalseTask): Task<C, V, R1 | R2>
Example:
// import { ift, all, fromValue, createWorkflow } from 'effectively';
// const complexProcessing = appDefineTask(async (ctx, data) => { /* ... */ return "complex"; });
// const simpleProcessing = appDefineTask(async (ctx, data) => { /* ... */ return "simple"; });
// const routeData = appDefineTask(async (ctx, initialData) => {
// return createWorkflow(
// fromValue(initialData),
// ift(
// (data) => data.isComplex, // Predicate
// complexProcessing, // Task to run if true
// simpleProcessing // Task to run if false
// )
// )(ctx, initialData); // Immediately invoke the created workflow task
// });
// const result = await appRun(routeData, { isComplex: true, payload: "..." }); // result is "complex"This module features a sophisticated scheduler that:
- Detects Native Support: Checks for
globalThis.scheduler.postTask(from the W3C Scheduling APIs proposal). - Uses Native Scheduler: If available, tasks are posted using the browser's own optimized scheduler, respecting priorities like
'user-blocking','user-visible', and'background'. - Provides Fallback: If the native API is absent (e.g., older browsers, some Node.js versions without specific flags/polyfills):
'user-blocking'tasks are scheduled viaqueueMicrotaskfor immediate execution after the current task.'user-visible'tasks are scheduled viasetTimeout(fn, 0).'background'tasks are scheduled viasetTimeout(fn, 4)(or a slightly higher configurable delay) to yield to more critical work.
- Handles Cancellation: All tasks posted via
scheduler.postTask(both native and fallback) respectAbortSignals.
You can inspect which scheduler is active:
import { scheduler } from 'effectively'; // Assuming scheduler is exported from the main parallel module
if (scheduler.isNative) {
console.log('Using native environment scheduler.');
} else {
console.log('Using Promise-based fallback scheduler.');
}You can also use scheduler.postTask directly for custom, low-level scheduling needs if the higher-level utilities don't fit your exact use case.
- Concurrency Limits:
- For CPU-bound tasks (heavy calculations), set
concurrencyaroundnavigator.hardwareConcurrency(if available) or a small number (e.g., 2-4) to avoid thrashing. - For I/O-bound tasks (network requests, file system operations), you can often use higher concurrency (e.g., 5-10, or even more for many fast API calls), but be mindful of:
- Browser connection limits per domain (typically 6-8).
- Server rate limits.
- Database connection pool sizes.
- For CPU-bound tasks (heavy calculations), set
- Task Priority:
- Use
'user-blocking'sparingly for only the most critical operations needed for immediate UI responsiveness. 'user-visible'is a good default for most work that affects what the user sees.'background'is excellent for deferring non-critical work (analytics, logging, pre-fetching offscreen data).
- Use
- Batching:
- Consider
batching: trueif:- You have many small, quick tasks where the overhead of scheduling each individually is significant.
- There's a setup/teardown cost per group of operations (e.g., opening/closing a connection for a batch).
- The underlying API you're calling supports batch requests (e.g.,
GET /users?ids=1,2,3).
- Consider
preserveOrder: falseforallSettled/some: If you are processing results as they come in and don't need them in the original task order, settingpreserveOrder: falsecan sometimes allow for slightly faster processing of the first available results.- Streaming (
streamand variants): For very large datasets or when tasks are generated dynamically, streaming utilities are vastly more memory-efficient than array-based ones as they don't require all tasks or all results to be in memory at once. - Error Handling Choice:
- Use
all/forkJoin/allTuple/streamAll(fail-fast) when all parallel operations must succeed. - Use
allSettled/forkJoinSettled/allTupleSettled/stream(settled) when you need to handle individual successes and failures and the overall operation should proceed.
- Use
- Cancellation: Ensure your individual
Taskimplementations respect theircontext.scope.signalto make them truly cancellable, allowing the parallel utilities to terminate work early.
-
Tasks Not Running in Parallel / Slow Performance:
- Check
concurrencyoption. If it's too low (e.g., 1), tasks will run sequentially. - Ensure your tasks are truly asynchronous (
asyncfunctions thatawaitI/O or usescheduler.postTaskfor CPU work). A long-running synchronous task will block one of your concurrency "slots." - Browser DevTools (Performance tab) can help identify bottlenecks.
- Check
-
"Too many requests" / Rate Limiting / Server Errors:
- Reduce the
concurrencyfor I/O-bound tasks. - Implement exponential backoff and retry logic within your individual tasks (e.g., using
withRetryfrom@doeixd/effectively/utils) if appropriate, or at a higher level around the parallel call.
- Reduce the
-
AggregateErrorwhen usinganyorstreamAny:- This is expected if all tasks provided to
anyorstreamAnyreject. Inspect theerrorsproperty of theAggregateErrorto see individual rejection reasons.
- This is expected if all tasks provided to
-
Incorrect Result Order:
- If using
allSettled, ensurepreserveOrderistrue(default) if you rely on result order matching input task order. someand streaming utilities (by default, as they yield on completion) do not guarantee input order.
- If using
-
Cancellation Not Working:
- Verify the
AbortSignal(either fromcontext.scope.signalor passed inoptions.signal) is actually being aborted when expected. - Ensure your individual
Taskimplementations checkcontext.scope.signal.abortedat appropriate points, especially within loops or before expensive operations. The scheduler will stop posting new tasks on abort, but already running tasks need to self-terminate.
- Verify the
-
getContext()Issues in Tasks:- If tasks run by the scheduler use the global
getContext(), ensure theruncall that initiated the parallel operation is fromContextToolscreated bycreateContext. This sets upcurrentUnctxInstancecorrectly. - Using
providedContextoption in scheduler functions can explicitly pass the desired context.
- If tasks run by the scheduler use the global