LCOV - code coverage report

Current view
top level - src/pool - worker_pool.dart
Test
lcov.info
Date
2026-02-21
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines12915086.0%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2import 'dart:collection';
3
4import 'package:logger/web.dart';
5import 'package:using/using.dart';
6
7import '../concurrency_settings.dart';
8import '../exceptions/exception_manager.dart';
9import '../exceptions/squadron_error.dart';
10import '../exceptions/squadron_exception.dart';
11import '../exceptions/task_terminated_exception.dart';
12import '../exceptions/worker_exception.dart';
13import '../iworker.dart';
14import '../stats/perf_counter.dart';
15import '../stats/worker_stat.dart';
16import '../typedefs.dart';
17import '../worker/worker.dart';
18import '../worker_service.dart';
19import '_pool_worker.dart';
20import '_worker_stream_task.dart';
21import '_worker_task.dart';
22import '_worker_value_task.dart';
23import 'stream_task.dart';
24import 'task.dart';
25import 'value_task.dart';
26
27typedef WorkerFactory<W> = W Function(ExceptionManager);
28
29/// Worker pool responsible for instantiating, starting and stopping workers running in parallel.
30/// A [WorkerPool] is also responsible for creating and assigning [WorkerTask]s to [Worker]s.
31abstract class WorkerPool<W extends Worker>
32 with Releasable
33 implements WorkerService, IWorker {
34 /// Create a worker pool.
35 ///
36 /// Workers are instantiated using the provided [_workerFactory].
37 /// The pool will only instantiate workers as needed, depending on [concurrencySettings].
38 /// The [ConcurrencySettings.minWorkers] and [ConcurrencySettings.maxWorkers] settings control
39 /// how many workers will live in the pool. The [ConcurrencySettings.maxParallel] setting
40 /// controls how many tasks can be posted to each individual worker in the pool.
414 WorkerPool(this._workerFactory,
42 {ConcurrencySettings? concurrencySettings,
43 ExceptionManager? exceptionManager})
440 : concurrencySettings = concurrencySettings ?? ConcurrencySettings(),
453 _exceptionManager = exceptionManager ?? ExceptionManager();
46
474 @override
48 void release() {
494 stop();
504 super.release();
51 }
52
53 final WorkerFactory<W> _workerFactory;
54
55 @override
56 Logger? channelLogger;
57
584 @override
59 ExceptionManager get exceptionManager =>
604 (_exceptionManager ??= ExceptionManager());
61 ExceptionManager? _exceptionManager;
62
63 /// Concurrency settings.
64 final ConcurrencySettings concurrencySettings;
65
66 final _workers = <PoolWorker<W>>[];
67
68 final _deadWorkerStats = <WorkerStat>[];
69
70 /// Whether this pool is scheduled for stopping.
712 bool get stopped => _stopped;
72 bool _stopped = false;
73
74 /// Number of workers.
753 int get size => _workers.length;
76
77 /// Maximum number of workers.
780 int get maxSize => _maxSize;
79 int _maxSize = 0;
80
81 final _workerPoolListeners = <Object, void Function(WorkerStat, bool)>{};
82
83 /// Registers a callback to be invoked when a worker thread is added or removed from the pool.
840 Object registerWorkerPoolListener(void Function(WorkerStat, bool) listener) {
850 final token = Object();
860 _workerPoolListeners[token] = listener;
87 return token;
88 }
89
90 /// Unregisters a callback.
910 void unregisterWorkerPoolListener(
92 {Function(WorkerStat, bool)? listener, Object? token}) {
93 if (token != null) {
940 _workerPoolListeners.remove(token);
95 } else if (listener != null) {
960 _workerPoolListeners.removeWhere((key, value) => value == listener);
97 }
98 }
99
100 int _startingWorkers = 0;
101
1024 int _getProvisionNeeds(int workload) {
1038 final minWorkers = concurrencySettings.minWorkers;
1044 if (workload < minWorkers) {
105 // at least minWorkers
106 workload = minWorkers;
107 }
1088 final maxWorkers = concurrencySettings.maxWorkers;
1098 if (maxWorkers > 0 && workload > maxWorkers) {
110 // at most maxWorkers if > 0
111 workload = maxWorkers;
112 }
113 // adjust by _workers.length and _startingWorkers
11420 return workload - _workers.length - _startingWorkers;
115 }
116
1174 Future<void> _provisionWorkers(int workload) {
1188 final tasks = <Future>[], errors = [];
1198 final maxParallel = concurrencySettings.maxParallel;
1208 for (var i = 0; i < workload; i++) {
121 try {
12212 final worker = _workerFactory(exceptionManager);
1238 worker.channelLogger = channelLogger;
124
1254 final poolWorker = PoolWorker(worker, maxParallel);
1268 _startingWorkers++;
12720 tasks.add(poolWorker.worker.start().whenComplete(() {
1288 _startingWorkers--;
1298 }).then((_) {
130 // start succeeded: register worker
1314 _addWorkerAndNotify(poolWorker);
1325 }).catchError((ex, st) {
133 // start failed, ensure the worker is stopped
1342 poolWorker.worker.terminate();
1352 errors.add(SquadronException.from(ex, st));
136 }));
137 } catch (ex, st) {
1380 errors.add(SquadronException.from(ex, st));
139 }
140 }
141
14212 return Future.wait(tasks).whenComplete(() {
14316 if (_workers.length > _maxSize) {
14412 _maxSize = _workers.length;
145 }
1464 if (errors.isNotEmpty) {
1473 if (errors.length < tasks.length) {
148 // some tasks failed: warn
1490 channelLogger?.e(() => 'Error while provisionning workers: $errors');
150 } else {
151 // all tasks failed: throw
1523 throw errors.firstWhere((e) => e is SquadronError,
1531 orElse: () => null) ??
1543 errors.firstWhere((e) => e is WorkerException,
1550 orElse: () => null) ??
1560 errors.first;
157 }
158 }
159 });
160 }
161
162 /// Ensure at least [ConcurrencySettings.minWorkers] workers are started
163 /// (defaulting to 1 if [ConcurrencySettings.minWorkers] is zero).
1642 @override
165 Future<void> start() {
1662 _stopped = false;
1676 final needs = _getProvisionNeeds(_queue.isEmpty ? 1 : _queue.length);
1684 return (needs > 0) ? _provisionWorkers(needs) : Future.value();
169 }
170
1714 void _notify(WorkerStat stats, {required bool removed}) {
1728 if (_workerPoolListeners.isNotEmpty) {
1730 for (var listener in _workerPoolListeners.values) {
174 try {
1750 listener(stats, removed);
176 } catch (ex) {
177 // swallow error from user land
178 }
179 }
180 }
181 }
182
1834 int _removeWorkerAndNotify(PoolWorker<W> poolWorker, bool force) {
1845 if (force || _workers.length > concurrencySettings.minWorkers) {
1854 final worker = poolWorker.worker;
1864 worker.stop();
1878 if (_workers.remove(poolWorker)) {
1884 final stats = worker.getStats();
1898 _deadWorkerStats.add(stats);
1904 _notify(stats, removed: true);
191 return 1;
192 }
193 }
194 return 0;
195 }
196
1974 void _addWorkerAndNotify(PoolWorker<W> poolWorker) {
1988 _workers.add(poolWorker);
19912 _notify(poolWorker.worker.getStats(), removed: false);
200 }
201
202 /// Stop idle pool workers matching the [predicate].
203 /// If [predicate] is null or not provided, all workers will be stopped.
204 /// Stopping a worker does not interrupt or cancel processing. Workers will
205 /// complete pending tasks before shutting down. In the meantime, they will
206 /// not receive any new workload.
207 /// Returns the number of workers that have been stopped.
2084 @override
209 int stop([bool Function(W worker)? predicate]) {
210 List<PoolWorker<W>> targets;
211 bool force = (predicate == null);
2128 _workers.sort(PoolWorker.compareCapacity);
213 if (force) {
214 // kill workers while keeping enough workers alive to process pending tasks
21524 targets = _workers.reversed.skip(_queue.length).toList();
2164 _stopped = true;
217 } else {
218 // kill workers that are idle and satisfy the predicate
2197 targets = _workers.where((w) => w.isIdle && predicate(w.worker)).toList();
220 }
221 var stopped = 0;
2228 for (var poolWorker in targets) {
2238 stopped += _removeWorkerAndNotify(poolWorker, force);
224 }
225 return stopped;
226 }
227
2281 @override
229 void terminate([TaskTerminatedException? ex]) {
2301 _stopped = true;
2315 for (var i = _workers.length - 1; i >= 0; i--) {
2322 final w = _workers[i];
2332 w.worker.terminate(ex);
2341 _removeWorkerAndNotify(w, true);
235 }
236 }
237
238 final _queue = Queue<WorkerTask>();
239 final _executing = <WorkerTask>{};
240
241 /// Gets remaining workload
2426 int get pendingWorkload => _queue.length;
243
2444 WorkerTask<T, W> _enqueue<T>(WorkerTask<T, W> task) {
2454 if (_stopped) {
2461 throw SquadronErrorImpl.create(
247 'The pool cannot accept new requests because it is stopped',
248 );
249 }
2508 _queue.addLast(task);
2514 _schedule();
252 return task;
253 }
254
255 /// Registers and schedules a [task] that returns a single value.
256 /// Returns a future that completes with the task's value.
2574 Future<T> execute<T>(Future<T> Function(W worker) task,
258 {PerfCounter? counter}) =>
2598 scheduleValueTask(task, counter: counter).value;
260
261 /// Registers and schedules a [task] that returns a stream of values.
262 /// Returns a stream containing the task's values.
2634 Stream<T> stream<T>(Stream<T> Function(W worker) task,
264 {PerfCounter? counter}) =>
2658 scheduleStreamTask(task, counter: counter).stream;
266
267 /// Registers and schedules a [task] that returns a single value.
268 /// Returns a [ValueTask].
2694 ValueTask<T> scheduleValueTask<T>(Future<T> Function(W worker) task,
270 {PerfCounter? counter}) =>
2718 _enqueue<T>(WorkerValueTask<T, W>(task, counter)) as ValueTask<T>;
272
273 /// Registers and schedules a [task] that returns a stream of values.
274 /// Returns a [StreamTask].
2754 StreamTask<T> scheduleStreamTask<T>(Stream<T> Function(W worker) task,
276 {PerfCounter? counter}) =>
2778 _enqueue<T>(WorkerStreamTask<T, W>(task, counter)) as StreamTask<T>;
278
279 Timer _timer = const _InactiveTimer();
280
2814 void _schedule() {
2828 if (!_timer.isActive) {
28312 _timer = Timer(Duration.zero, __schedule);
284 }
285 }
286
287 /// Schedule tasks.
2884 void __schedule() {
28916 if (_workers.isEmpty && _startingWorkers > 0) {
290 // workers are still starting, defer
2911 _schedule();
292 return;
293 }
294
295 // any work to do?
2968 if (_queue.isEmpty) {
297 // no: effectively stop the pool if needed and return
29812 if (_stopped && _executing.isEmpty) {
2994 stop();
300 }
301 return;
302 }
303
304 // yes: dispatch tasks to workers
3054 _dispatchTasks();
306
307 // and provision more workers if possible and necessary
30812 final needs = _getProvisionNeeds(_queue.length);
3094 if (needs > 0) {
3108 _provisionWorkers(needs).then(
3118 (_) => _dispatchTasks(),
3121 onError: (ex) {
3131 channelLogger?.e(() => 'Provisionning workers failed with error $ex');
3142 if (_workers.isEmpty) {
3152 while (_queue.isNotEmpty) {
3163 _queue.removeFirst().cancel('Provisionning workers failed');
317 }
318 } else {
3190 _schedule();
320 }
321 },
322 );
323 }
324 }
325
3264 void _dispatchTasks() {
3278 _workers.sort(PoolWorker.compareCapacity);
32820 for (var idx = _workers.length - 1; idx >= 0; idx--) {
3298 final w = _workers[idx];
3304 if (w.isStopped) {
3310 _removeWorkerAndNotify(w, false);
332 continue;
333 }
33416 while (_queue.isNotEmpty && w.capacity > 0) {
3358 final task = _queue.removeFirst();
3364 if (!task.isCanceled) {
3378 _executing.add(task);
33812 w.run(task).whenComplete(() {
3398 _executing.remove(task);
3404 _schedule();
341 });
342 }
343 }
344 }
345 }
346
347 /// Task cancelation. If a specific [task] is provided, only this task will be canceled.
348 /// Otherwise, all tasks registered with the [WorkerPool] are canceled.
3491 void cancel(Task task, [String? message]) {
3502 _executing.remove(task);
3514 _queue.removeWhere((t) => t == task);
3521 task.cancel(message);
353 }
354
355 /// Task cancelation. If a specific [task] is provided, only this task will be canceled.
356 /// Otherwise, all tasks registered with the [WorkerPool] are canceled.
3571 void cancelAll([String? message]) {
3584 final toBeCanceled = _executing.followedBy(_queue).toList();
3592 _executing.clear();
3602 _queue.clear();
3612 for (var task in toBeCanceled) {
3621 task.cancel(message);
363 }
364 }
365
366 /// Worker statistics.
3673 Iterable<WorkerStat> get stats => _workers.map(PoolWorker.getStats);
368
369 /// Full worker statistics.
3700 Iterable<WorkerStat> get fullStats => _deadWorkerStats.followedBy(stats);
371
372 /// Worker pools do not need an [operations] map.
3730 @override
374 OperationsMap get operations => WorkerService.noOperations;
375}
376
377extension ConcurrencySettingsExt on WorkerPool {
3783 int get minWorkers => concurrencySettings.minWorkers;
3793 int get maxWorkers => concurrencySettings.maxWorkers;
3800 int get maxParallel => concurrencySettings.maxParallel;
3819 int get maxConcurrency => concurrencySettings.maxConcurrency;
382}
383
384class _InactiveTimer implements Timer {
38512 const _InactiveTimer();
386
3874 @override
388 bool get isActive => false;
389
3900 @override
391 int get tick => 0;
392
3930 @override
394 void cancel() {}
395}
Choose Features