LCOV - code coverage report

Current view
top level - src/pool - worker_pool.dart
Test
lcov.info
Date
2025-03-26
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines12814687.7%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2import 'dart:collection';
3
4import 'package:logger/web.dart';
5import 'package:using/using.dart';
6
7import '../concurrency_settings.dart';
8import '../exceptions/exception_manager.dart';
9import '../exceptions/squadron_error.dart';
10import '../exceptions/squadron_exception.dart';
11import '../exceptions/task_terminated_exception.dart';
12import '../exceptions/worker_exception.dart';
13import '../iworker.dart';
14import '../stats/perf_counter.dart';
15import '../stats/worker_stat.dart';
16import '../typedefs.dart';
17import '../worker/worker.dart';
18import '../worker_service.dart';
19import '_pool_worker.dart';
20import '_worker_stream_task.dart';
21import '_worker_task.dart';
22import '_worker_value_task.dart';
23import 'stream_task.dart';
24import 'task.dart';
25import 'value_task.dart';
26
27typedef WorkerFactory<W> = W Function(ExceptionManager);
28
29/// Worker pool responsible for instantiating, starting and stopping workers running in parallel.
30/// A [WorkerPool] is also responsible for creating and assigning [WorkerTask]s to [Worker]s.
31abstract class WorkerPool<W extends Worker>
32 with Releasable
33 implements WorkerService, IWorker {
34 /// Create a worker pool.
35 ///
36 /// Workers are instantiated using the provided [_workerFactory].
37 /// The pool will only instantiate workers as needed, depending on [concurrencySettings].
38 /// The [ConcurrencySettings.minWorkers] and [ConcurrencySettings.maxWorkers] settings control
39 /// how many workers will live in the pool. The [ConcurrencySettings.maxParallel] setting
40 /// controls how many tasks can be posted to each individual worker in the pool.
414 WorkerPool(this._workerFactory,
42 {ConcurrencySettings? concurrencySettings,
43 ExceptionManager? exceptionManager})
440 : concurrencySettings = concurrencySettings ?? ConcurrencySettings(),
453 _exceptionManager = exceptionManager ?? ExceptionManager();
46
474 @override
48 void release() {
494 stop();
504 super.release();
51 }
52
53 final WorkerFactory<W> _workerFactory;
54
55 @override
56 Logger? channelLogger;
57
584 @override
59 ExceptionManager get exceptionManager =>
604 (_exceptionManager ??= ExceptionManager());
61 ExceptionManager? _exceptionManager;
62
63 /// Concurrency settings.
64 final ConcurrencySettings concurrencySettings;
65
66 final _workers = <PoolWorker<W>>[];
67
68 final _deadWorkerStats = <WorkerStat>[];
69
70 /// Whether this pool is scheduled for stopping.
712 bool get stopped => _stopped;
72 bool _stopped = false;
73
74 /// Number of workers.
753 int get size => _workers.length;
76
77 /// Maximum number of workers.
780 int get maxSize => _maxSize;
79 int _maxSize = 0;
80
81 final _workerPoolListeners = <Object, void Function(WorkerStat, bool)>{};
82
83 /// Registers a callback to be invoked when a worker thread is added or removed from the pool.
840 Object registerWorkerPoolListener(void Function(WorkerStat, bool) listener) {
850 final token = Object();
860 _workerPoolListeners[token] = listener;
87 return token;
88 }
89
90 /// Unregisters a callback.
910 void unregisterWorkerPoolListener(
92 {Function(WorkerStat, bool)? listener, Object? token}) {
93 if (token != null) {
940 _workerPoolListeners.remove(token);
95 } else if (listener != null) {
960 _workerPoolListeners.removeWhere((key, value) => value == listener);
97 }
98 }
99
100 int _startingWorkers = 0;
101
1024 int _getProvisionNeeds(int workload) {
1038 final minWorkers = concurrencySettings.minWorkers;
1044 if (workload < minWorkers) {
105 // at least minWorkers
106 workload = minWorkers;
107 }
1088 final maxWorkers = concurrencySettings.maxWorkers;
1098 if (maxWorkers > 0 && workload > maxWorkers) {
110 // at most maxWorkers if > 0
111 workload = maxWorkers;
112 }
113 // adjust by _workers.length and _startingWorkers
11420 return workload - _workers.length - _startingWorkers;
115 }
116
1174 Future<void> _provisionWorkers(int workload) {
1188 final tasks = <Future>[], errors = [];
1198 final maxParallel = concurrencySettings.maxParallel;
1208 for (var i = 0; i < workload; i++) {
121 try {
12212 final worker = _workerFactory(exceptionManager);
1238 worker.channelLogger = channelLogger;
124
1254 final poolWorker = PoolWorker(worker, maxParallel);
1268 _startingWorkers++;
12720 tasks.add(poolWorker.worker.start().whenComplete(() {
1288 _startingWorkers--;
1298 }).then((_) {
130 // start succeeded: register worker
1314 _addWorkerAndNotify(poolWorker);
1325 }).catchError((ex, st) {
133 // start failed, ensure the worker is stopped
1342 poolWorker.worker.stop();
1352 errors.add(SquadronException.from(ex, st));
136 }));
137 } catch (ex, st) {
1380 errors.add(SquadronException.from(ex, st));
139 }
140 }
141
14212 return Future.wait(tasks).whenComplete(() {
14316 if (_workers.length > _maxSize) {
14412 _maxSize = _workers.length;
145 }
1464 if (errors.isNotEmpty) {
1473 if (errors.length < tasks.length) {
148 // some tasks failed: warn
1490 channelLogger?.e(() => 'Error while provisionning workers: $errors');
150 } else {
151 // all tasks failed: throw
1523 throw errors.firstWhere((e) => e is SquadronError,
1531 orElse: () => null) ??
1543 errors.firstWhere((e) => e is WorkerException,
1550 orElse: () => null) ??
1560 errors.first;
157 }
158 }
159 });
160 }
161
162 /// Ensure at least [ConcurrencySettings.minWorkers] workers are started
163 /// (defaulting to 1 if [ConcurrencySettings.minWorkers] is zero).
1642 @override
165 FutureOr<void> start() {
1662 _stopped = false;
1676 final needs = _getProvisionNeeds(_queue.isEmpty ? 1 : _queue.length);
1682 if (needs > 0) {
1692 return _provisionWorkers(needs);
170 }
171 }
172
1734 void _notify(W worker, {required bool removed}) {
1748 if (_workerPoolListeners.isNotEmpty) {
1750 final stats = worker.stats;
1760 for (var listener in _workerPoolListeners.values) {
177 try {
1780 listener(stats, removed);
179 } catch (ex) {
180 // swallow error from user land
181 }
182 }
183 }
184 }
185
1864 void _removeWorkerAndNotify(PoolWorker<W> poolWorker) {
1878 _workers.remove(poolWorker);
1888 _notify(poolWorker.worker, removed: true);
189 }
190
1914 void _addWorkerAndNotify(PoolWorker<W> poolWorker) {
1928 _workers.add(poolWorker);
1938 _notify(poolWorker.worker, removed: false);
194 }
195
1964 int _removeWorker(PoolWorker<W> poolWorker, bool force) {
1975 if (force || _workers.length > concurrencySettings.minWorkers) {
1984 final worker = poolWorker.worker;
1994 worker.stop();
20012 _deadWorkerStats.add(worker.stats);
2014 _removeWorkerAndNotify(poolWorker);
202 return 1;
203 } else {
204 return 0;
205 }
206 }
207
208 /// Stop idle pool workers matching the [predicate].
209 /// If [predicate] is null or not provided, all workers will be stopped.
210 /// Stopping a worker does not interrupt or cancel processing. Workers will
211 /// complete pending tasks before shutting down. In the meantime, they will
212 /// not receive any new workload.
213 /// Returns the number of workers that have been stopped.
2144 @override
215 int stop([bool Function(W worker)? predicate]) {
216 List<PoolWorker<W>> targets;
217 bool force = (predicate == null);
218 if (force) {
219 // kill workers while keeping enough workers alive to process pending tasks
22020 targets = _workers.skip(_queue.length).toList();
2214 _stopped = true;
222 } else {
223 // kill workers that are idle and satisfy the predicate
2247 targets = _workers.where((w) => w.isIdle && predicate(w.worker)).toList();
225 }
226 var stopped = 0;
2278 for (var poolWorker in targets) {
2288 stopped += _removeWorker(poolWorker, force);
229 }
230 return stopped;
231 }
232
2331 @override
234 void terminate([TaskTerminatedException? ex]) {
2351 _stopped = true;
2362 final targets = _workers.toList();
2372 for (var poolWorker in targets) {
2381 _removeWorker(poolWorker, true);
2392 poolWorker.worker.terminate(ex);
240 }
241 }
242
243 final _queue = Queue<WorkerTask>();
244 final _executing = <WorkerTask>{};
245
246 /// Gets remaining workload
2476 int get pendingWorkload => _queue.length;
248
2494 WorkerTask<T, W> _enqueue<T>(WorkerTask<T, W> task) {
2504 if (_stopped) {
2511 throw SquadronErrorImpl.create(
252 'The pool cannot accept new requests because it is stopped',
253 );
254 }
2558 _queue.addLast(task);
2564 _schedule();
257 return task;
258 }
259
260 /// Registers and schedules a [task] that returns a single value.
261 /// Returns a future that completes with the task's value.
2624 Future<T> execute<T>(Future<T> Function(W worker) task,
263 {PerfCounter? counter}) =>
2648 scheduleValueTask(task, counter: counter).value;
265
266 /// Registers and schedules a [task] that returns a stream of values.
267 /// Returns a stream containing the task's values.
2684 Stream<T> stream<T>(Stream<T> Function(W worker) task,
269 {PerfCounter? counter}) =>
2708 scheduleStreamTask(task, counter: counter).stream;
271
272 /// Registers and schedules a [task] that returns a single value.
273 /// Returns a [ValueTask].
2744 ValueTask<T> scheduleValueTask<T>(Future<T> Function(W worker) task,
275 {PerfCounter? counter}) =>
2768 _enqueue<T>(WorkerValueTask<T, W>(task, counter)) as ValueTask<T>;
277
278 /// Registers and schedules a [task] that returns a stream of values.
279 /// Returns a [StreamTask].
2804 StreamTask<T> scheduleStreamTask<T>(Stream<T> Function(W worker) task,
281 {PerfCounter? counter}) =>
2828 _enqueue<T>(WorkerStreamTask<T, W>(task, counter)) as StreamTask<T>;
283
284 /// Schedule tasks.
2854 void _schedule() {
28616 if (_workers.isEmpty && _startingWorkers > 0) {
287 // workers are still starting, defer
2888 Future(_schedule);
289 return;
290 }
291
292 // remove dead workers
2934 _workers
2944 .where(PoolWorker.isStopped)
2954 .toList() // take a copy
2968 .forEach(_removeWorkerAndNotify);
297
298 // remove canceled tasks
29916 _queue.removeWhere((t) => t.isCanceled);
300
301 // any work to do?
3028 if (_queue.isEmpty) {
303 // no: effectively stop the pool if needed and return
30410 if (_stopped && _executing.isEmpty) {
3053 stop();
306 }
307 return;
308 }
309
310 // yes: dispatch tasks to workers
3114 _dispatchTasks();
312
313 // and provision more workers if possible and necessary
31412 final needs = _getProvisionNeeds(_queue.length);
3154 if (needs > 0) {
3168 _provisionWorkers(needs).then(
3178 (_) => _dispatchTasks(),
3181 onError: (ex) {
3191 channelLogger?.e(() => 'Provisionning workers failed with error $ex');
3202 while (_queue.isNotEmpty) {
3213 _queue.removeFirst().cancel('Provisionning workers failed');
322 }
323 },
324 );
325 }
326 }
327
3284 int _sortAndGetMaxCapacity() {
3298 _workers.sort(PoolWorker.compareCapacityDesc);
33020 return _workers.isEmpty ? 0 : _workers.first.capacity;
331 }
332
3334 void _dispatchTasks() {
334 int maxCapacity;
33516 while (_queue.isNotEmpty && (maxCapacity = _sortAndGetMaxCapacity()) > 0) {
3364 maxCapacity -= 1;
33716 for (var idx = 0; idx < _workers.length; idx++) {
3388 final w = _workers[idx];
33924 if (_queue.isEmpty || w.capacity == 0 || w.capacity < maxCapacity) {
340 break;
341 }
3428 final task = _queue.removeFirst();
3438 _executing.add(task);
34412 w.run(task).whenComplete(() {
3458 _executing.remove(task);
3464 _schedule();
347 });
348 }
349 }
350 }
351
352 /// Task cancelation. If a specific [task] is provided, only this task will be canceled.
353 /// Otherwise, all tasks registered with the [WorkerPool] are canceled.
3541 void cancel(Task task, [String? message]) {
3552 _executing.remove(task);
3564 _queue.removeWhere((t) => t == task);
3571 task.cancel(message);
358 }
359
360 /// Task cancelation. If a specific [task] is provided, only this task will be canceled.
361 /// Otherwise, all tasks registered with the [WorkerPool] are canceled.
3621 void cancelAll([String? message]) {
3634 final toBeCanceled = _executing.followedBy(_queue).toList();
3642 _executing.clear();
3652 _queue.clear();
3662 for (var task in toBeCanceled) {
3671 task.cancel(message);
368 }
369 }
370
371 /// Worker statistics.
3723 Iterable<WorkerStat> get stats => _workers.map(PoolWorker.getStats);
373
374 /// Full worker statistics.
3750 Iterable<WorkerStat> get fullStats => _deadWorkerStats.followedBy(stats);
376
377 /// Worker pools do not need an [operations] map.
3780 @override
3790 OperationsMap get operations => WorkerService.noOperations;
380}
Choose Features