LCOV - code coverage report

Current view
top level - src/pool - worker_pool.dart
Test
lcov.info
Date
2025-07-05
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines12514486.8%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2import 'dart:collection';
3
4import 'package:logger/web.dart';
5import 'package:using/using.dart';
6
7import '../concurrency_settings.dart';
8import '../exceptions/exception_manager.dart';
9import '../exceptions/squadron_error.dart';
10import '../exceptions/squadron_exception.dart';
11import '../exceptions/task_terminated_exception.dart';
12import '../exceptions/worker_exception.dart';
13import '../iworker.dart';
14import '../stats/perf_counter.dart';
15import '../stats/worker_stat.dart';
16import '../typedefs.dart';
17import '../worker/worker.dart';
18import '../worker_service.dart';
19import '_pool_worker.dart';
20import '_worker_stream_task.dart';
21import '_worker_task.dart';
22import '_worker_value_task.dart';
23import 'stream_task.dart';
24import 'task.dart';
25import 'value_task.dart';
26
27typedef WorkerFactory<W> = W Function(ExceptionManager);
28
29/// Worker pool responsible for instantiating, starting and stopping workers running in parallel.
30/// A [WorkerPool] is also responsible for creating and assigning [WorkerTask]s to [Worker]s.
31abstract class WorkerPool<W extends Worker>
32 with Releasable
33 implements WorkerService, IWorker {
34 /// Create a worker pool.
35 ///
36 /// Workers are instantiated using the provided [_workerFactory].
37 /// The pool will only instantiate workers as needed, depending on [concurrencySettings].
38 /// The [ConcurrencySettings.minWorkers] and [ConcurrencySettings.maxWorkers] settings control
39 /// how many workers will live in the pool. The [ConcurrencySettings.maxParallel] setting
40 /// controls how many tasks can be posted to each individual worker in the pool.
414 WorkerPool(this._workerFactory,
42 {ConcurrencySettings? concurrencySettings,
43 ExceptionManager? exceptionManager})
440 : concurrencySettings = concurrencySettings ?? ConcurrencySettings(),
453 _exceptionManager = exceptionManager ?? ExceptionManager();
46
474 @override
48 void release() {
494 stop();
504 super.release();
51 }
52
53 final WorkerFactory<W> _workerFactory;
54
55 @override
56 Logger? channelLogger;
57
584 @override
59 ExceptionManager get exceptionManager =>
604 (_exceptionManager ??= ExceptionManager());
61 ExceptionManager? _exceptionManager;
62
63 /// Concurrency settings.
64 final ConcurrencySettings concurrencySettings;
65
66 final _workers = <PoolWorker<W>>[];
67
68 final _deadWorkerStats = <WorkerStat>[];
69
70 /// Whether this pool is scheduled for stopping.
712 bool get stopped => _stopped;
72 bool _stopped = false;
73
74 /// Number of workers.
753 int get size => _workers.length;
76
77 /// Maximum number of workers.
780 int get maxSize => _maxSize;
79 int _maxSize = 0;
80
81 final _workerPoolListeners = <Object, void Function(WorkerStat, bool)>{};
82
83 /// Registers a callback to be invoked when a worker thread is added or removed from the pool.
840 Object registerWorkerPoolListener(void Function(WorkerStat, bool) listener) {
850 final token = Object();
860 _workerPoolListeners[token] = listener;
87 return token;
88 }
89
90 /// Unregisters a callback.
910 void unregisterWorkerPoolListener(
92 {Function(WorkerStat, bool)? listener, Object? token}) {
93 if (token != null) {
940 _workerPoolListeners.remove(token);
95 } else if (listener != null) {
960 _workerPoolListeners.removeWhere((key, value) => value == listener);
97 }
98 }
99
100 int _startingWorkers = 0;
101
1024 int _getProvisionNeeds(int workload) {
1038 final minWorkers = concurrencySettings.minWorkers;
1044 if (workload < minWorkers) {
105 // at least minWorkers
106 workload = minWorkers;
107 }
1088 final maxWorkers = concurrencySettings.maxWorkers;
1098 if (maxWorkers > 0 && workload > maxWorkers) {
110 // at most maxWorkers if > 0
111 workload = maxWorkers;
112 }
113 // adjust by _workers.length and _startingWorkers
11420 return workload - _workers.length - _startingWorkers;
115 }
116
1174 Future<void> _provisionWorkers(int workload) {
1188 final tasks = <Future>[], errors = [];
1198 final maxParallel = concurrencySettings.maxParallel;
1208 for (var i = 0; i < workload; i++) {
121 try {
12212 final worker = _workerFactory(exceptionManager);
1238 worker.channelLogger = channelLogger;
124
1254 final poolWorker = PoolWorker(worker, maxParallel);
1268 _startingWorkers++;
12720 tasks.add(poolWorker.worker.start().whenComplete(() {
1288 _startingWorkers--;
1298 }).then((_) {
130 // start succeeded: register worker
1314 _addWorkerAndNotify(poolWorker);
1325 }).catchError((ex, st) {
133 // start failed, ensure the worker is stopped
1342 poolWorker.worker.terminate();
1352 errors.add(SquadronException.from(ex, st));
136 }));
137 } catch (ex, st) {
1380 errors.add(SquadronException.from(ex, st));
139 }
140 }
141
14212 return Future.wait(tasks).whenComplete(() {
14316 if (_workers.length > _maxSize) {
14412 _maxSize = _workers.length;
145 }
1464 if (errors.isNotEmpty) {
1473 if (errors.length < tasks.length) {
148 // some tasks failed: warn
1490 channelLogger?.e(() => 'Error while provisionning workers: $errors');
150 } else {
151 // all tasks failed: throw
1523 throw errors.firstWhere((e) => e is SquadronError,
1531 orElse: () => null) ??
1543 errors.firstWhere((e) => e is WorkerException,
1550 orElse: () => null) ??
1560 errors.first;
157 }
158 }
159 });
160 }
161
162 /// Ensure at least [ConcurrencySettings.minWorkers] workers are started
163 /// (defaulting to 1 if [ConcurrencySettings.minWorkers] is zero).
1642 @override
165 FutureOr<void> start() {
1662 _stopped = false;
1676 final needs = _getProvisionNeeds(_queue.isEmpty ? 1 : _queue.length);
1682 if (needs > 0) {
1692 return _provisionWorkers(needs);
170 }
171 }
172
1734 void _notify(WorkerStat stats, {required bool removed}) {
1748 if (_workerPoolListeners.isNotEmpty) {
1750 for (var listener in _workerPoolListeners.values) {
176 try {
1770 listener(stats, removed);
178 } catch (ex) {
179 // swallow error from user land
180 }
181 }
182 }
183 }
184
1854 int _removeWorkerAndNotify(PoolWorker<W> poolWorker, bool force) {
1865 if (force || _workers.length > concurrencySettings.minWorkers) {
1874 final worker = poolWorker.worker;
1884 worker.stop();
1898 if (_workers.remove(poolWorker)) {
1904 final stats = worker.getStats();
1918 _deadWorkerStats.add(stats);
1924 _notify(stats, removed: true);
193 return 1;
194 }
195 }
196 return 0;
197 }
198
1994 void _addWorkerAndNotify(PoolWorker<W> poolWorker) {
2008 _workers.add(poolWorker);
20112 _notify(poolWorker.worker.getStats(), removed: false);
202 }
203
204 /// Stop idle pool workers matching the [predicate].
205 /// If [predicate] is null or not provided, all workers will be stopped.
206 /// Stopping a worker does not interrupt or cancel processing. Workers will
207 /// complete pending tasks before shutting down. In the meantime, they will
208 /// not receive any new workload.
209 /// Returns the number of workers that have been stopped.
2104 @override
211 int stop([bool Function(W worker)? predicate]) {
212 List<PoolWorker<W>> targets;
213 bool force = (predicate == null);
2148 _workers.sort(PoolWorker.compareCapacity);
215 if (force) {
216 // kill workers while keeping enough workers alive to process pending tasks
21724 targets = _workers.reversed.skip(_queue.length).toList();
2184 _stopped = true;
219 } else {
220 // kill workers that are idle and satisfy the predicate
2217 targets = _workers.where((w) => w.isIdle && predicate(w.worker)).toList();
222 }
223 var stopped = 0;
2248 for (var poolWorker in targets) {
2258 stopped += _removeWorkerAndNotify(poolWorker, force);
226 }
227 return stopped;
228 }
229
2301 @override
231 void terminate([TaskTerminatedException? ex]) {
2321 _stopped = true;
2335 for (var i = _workers.length - 1; i >= 0; i--) {
2342 final w = _workers[i];
2352 w.worker.terminate(ex);
2361 _removeWorkerAndNotify(w, true);
237 }
238 }
239
240 final _queue = Queue<WorkerTask>();
241 final _executing = <WorkerTask>{};
242
243 /// Gets remaining workload
2446 int get pendingWorkload => _queue.length;
245
2464 WorkerTask<T, W> _enqueue<T>(WorkerTask<T, W> task) {
2474 if (_stopped) {
2481 throw SquadronErrorImpl.create(
249 'The pool cannot accept new requests because it is stopped',
250 );
251 }
2528 _queue.addLast(task);
2534 _schedule();
254 return task;
255 }
256
257 /// Registers and schedules a [task] that returns a single value.
258 /// Returns a future that completes with the task's value.
2594 Future<T> execute<T>(Future<T> Function(W worker) task,
260 {PerfCounter? counter}) =>
2618 scheduleValueTask(task, counter: counter).value;
262
263 /// Registers and schedules a [task] that returns a stream of values.
264 /// Returns a stream containing the task's values.
2654 Stream<T> stream<T>(Stream<T> Function(W worker) task,
266 {PerfCounter? counter}) =>
2678 scheduleStreamTask(task, counter: counter).stream;
268
269 /// Registers and schedules a [task] that returns a single value.
270 /// Returns a [ValueTask].
2714 ValueTask<T> scheduleValueTask<T>(Future<T> Function(W worker) task,
272 {PerfCounter? counter}) =>
2738 _enqueue<T>(WorkerValueTask<T, W>(task, counter)) as ValueTask<T>;
274
275 /// Registers and schedules a [task] that returns a stream of values.
276 /// Returns a [StreamTask].
2774 StreamTask<T> scheduleStreamTask<T>(Stream<T> Function(W worker) task,
278 {PerfCounter? counter}) =>
2798 _enqueue<T>(WorkerStreamTask<T, W>(task, counter)) as StreamTask<T>;
280
281 Timer? _timer;
282
2834 void _schedule() {
28412 if (_timer?.isActive != true) {
28512 _timer = Timer(Duration.zero, __schedule);
286 }
287 }
288
289 /// Schedule tasks.
2904 void __schedule() {
29116 if (_workers.isEmpty && _startingWorkers > 0) {
292 // workers are still starting, defer
2931 _schedule();
294 return;
295 }
296
297 // any work to do?
2988 if (_queue.isEmpty) {
299 // no: effectively stop the pool if needed and return
30012 if (_stopped && _executing.isEmpty) {
3014 stop();
302 }
303 return;
304 }
305
306 // yes: dispatch tasks to workers
3074 _dispatchTasks();
308
309 // and provision more workers if possible and necessary
31012 final needs = _getProvisionNeeds(_queue.length);
3114 if (needs > 0) {
3128 _provisionWorkers(needs).then(
3138 (_) => _dispatchTasks(),
3141 onError: (ex) {
3151 channelLogger?.e(() => 'Provisionning workers failed with error $ex');
3162 if (_workers.isEmpty) {
3172 while (_queue.isNotEmpty) {
3183 _queue.removeFirst().cancel('Provisionning workers failed');
319 }
320 } else {
3210 _schedule();
322 }
323 },
324 );
325 }
326 }
327
3284 void _dispatchTasks() {
3298 _workers.sort(PoolWorker.compareCapacity);
33020 for (var idx = _workers.length - 1; idx >= 0; idx--) {
3318 final w = _workers[idx];
3324 if (w.isStopped) {
3330 _removeWorkerAndNotify(w, false);
334 continue;
335 }
33616 while (_queue.isNotEmpty && w.capacity > 0) {
3378 final task = _queue.removeFirst();
3384 if (!task.isCanceled) {
3398 _executing.add(task);
34012 w.run(task).whenComplete(() {
3418 _executing.remove(task);
3424 _schedule();
343 });
344 }
345 }
346 }
347 }
348
349 /// Task cancelation. If a specific [task] is provided, only this task will be canceled.
350 /// Otherwise, all tasks registered with the [WorkerPool] are canceled.
3511 void cancel(Task task, [String? message]) {
3522 _executing.remove(task);
3534 _queue.removeWhere((t) => t == task);
3541 task.cancel(message);
355 }
356
357 /// Task cancelation. If a specific [task] is provided, only this task will be canceled.
358 /// Otherwise, all tasks registered with the [WorkerPool] are canceled.
3591 void cancelAll([String? message]) {
3604 final toBeCanceled = _executing.followedBy(_queue).toList();
3612 _executing.clear();
3622 _queue.clear();
3632 for (var task in toBeCanceled) {
3641 task.cancel(message);
365 }
366 }
367
368 /// Worker statistics.
3693 Iterable<WorkerStat> get stats => _workers.map(PoolWorker.getStats);
370
371 /// Full worker statistics.
3720 Iterable<WorkerStat> get fullStats => _deadWorkerStats.followedBy(stats);
373
374 /// Worker pools do not need an [operations] map.
3750 @override
3760 OperationsMap get operations => WorkerService.noOperations;
377}
Choose Features