LCOV - code coverage report

Current view
top level - src/pool - worker_pool.dart
Test
lcov.info
Date
2024-11-13
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines16318588.1%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2import 'dart:collection';
3
4import 'package:logger/web.dart';
5import 'package:using/using.dart';
6
7import '../concurrency_settings.dart';
8import '../exceptions/exception_manager.dart';
9import '../exceptions/squadron_error.dart';
10import '../exceptions/squadron_exception.dart';
11import '../exceptions/worker_exception.dart';
12import '../iworker.dart';
13import '../stats/perf_counter.dart';
14import '../stats/worker_stat.dart';
15import '../worker/worker.dart';
16import '../worker_service.dart';
17import '_pool_worker.dart';
18import '_worker_stream_task.dart';
19import '_worker_task.dart';
20import '_worker_value_task.dart';
21import 'stream_task.dart';
22import 'task.dart';
23import 'value_task.dart';
24
25typedef WorkerFactory<W> = W Function(ExceptionManager);
26
27/// Worker pool responsible for instantiating, starting and stopping workers running in parallel.
28/// A [WorkerPool] is also responsible for creating and assigning [WorkerTask]s to [Worker]s.
29abstract class WorkerPool<W extends Worker>
30 with Releasable
31 implements WorkerService, IWorker {
32 /// Create a worker pool.
33 ///
34 /// Workers are instantiated using the provided [_workerFactory].
35 /// The pool will only instantiate workers as needed, depending on [concurrencySettings].
36 /// The [ConcurrencySettings.minWorkers] and [ConcurrencySettings.maxWorkers] settings control
37 /// how many workers will live in the pool. The [ConcurrencySettings.maxParallel] setting
38 /// controls how many tasks can be posted to each individual worker in the pool.
394 WorkerPool(this._workerFactory,
40 {ConcurrencySettings? concurrencySettings,
41 ExceptionManager? exceptionManager})
420 : concurrencySettings = concurrencySettings ?? ConcurrencySettings(),
436 _exceptionManager = exceptionManager ?? ExceptionManager();
44
453 @override
463 void release() {
476 stop();
486 super.release();
493 }
50
51 final WorkerFactory<W> _workerFactory;
52
53 @override
54 Logger? channelLogger;
55
564 @override
571 ExceptionManager get exceptionManager =>
588 (_exceptionManager ??= ExceptionManager());
59 ExceptionManager? _exceptionManager;
60
61 /// Concurrency settings.
62 final ConcurrencySettings concurrencySettings;
63
64 /// Minimum workers.
6516 int get minWorkers => concurrencySettings.minWorkers;
66
67 /// Maximum workers.
6816 int get maxWorkers => concurrencySettings.maxWorkers;
69
70 /// Maximum tasks per worker.
7115 int get maxParallel => concurrencySettings.maxParallel;
72
73 /// Maximum running tasks.
7412 int get maxConcurrency => concurrencySettings.maxConcurrency;
75
764 final _workers = <PoolWorker<W>>[];
77
784 final _deadWorkerStats = <WorkerStat>[];
79
80 /// Whether this pool is scheduled for stopping.
813 bool get stopped => _stopped;
82 bool _stopped = false;
83
84 /// Number of workers.
854 int get size => _workers.length;
86
87 /// Maximum number of workers.
880 int get maxSize => _maxSize;
89 int _maxSize = 0;
90
91 /// Current workload.
920 int get workload => stats.fold<int>(0, (p, w) => p + w.workload);
93
94 /// Maximum workload.
950 int get maxWorkload => fullStats.fold<int>(
960 0, (p, s) => (p >= s.maxWorkload) ? p : s.maxWorkload);
97
98 /// Total workload.
990 int get totalWorkload =>
1000 fullStats.fold<int>(0, (p, s) => p + s.totalWorkload);
101
102 /// Number of errors.
1030 int get totalErrors => fullStats.fold<int>(0, (p, s) => p + s.totalErrors);
104
105 final _workerPoolListeners =
1064 <Object, void Function(W worker, bool removed)>{};
107
108 /// Registers a callback to be invoked when a worker thread is added or removed from the pool.
1090 Object registerWorkerPoolListener(
110 void Function(W worker, bool removed) listener) {
1110 final token = Object();
1120 _workerPoolListeners[token] = listener;
113 return token;
114 }
115
116 /// Unregisters a callback.
1170 void unregisterWorkerPoolListener(
118 {Function(W worker, bool removed)? listener, Object? token}) {
119 if (token != null) {
1200 _workerPoolListeners.remove(token);
121 } else if (listener != null) {
1220 _workerPoolListeners.removeWhere((key, value) => value == listener);
123 }
124 }
125
126 int _startingWorkers = 0;
127
1288 int _getProvisionNeeds(int workload) {
12912 if (workload < minWorkers) {
130 // at least minWorkers
1314 workload = minWorkers;
132 }
13320 if (maxWorkers > 0 && workload > maxWorkers) {
134 // at most maxWorkers if > 0
1353 workload = maxWorkers;
136 }
137 // adjust by _workers.length and _startingWorkers
13824 return workload - _workers.length - _startingWorkers;
1394 }
140
1418 Future<void> _provisionWorkers(int workload) {
1428 final tasks = <Future>[];
1434 final errors = [];
14412 for (var i = 0; i < workload; i++) {
1450 try {
14616 final worker = _workerFactory(exceptionManager);
14712 worker.channelLogger = channelLogger;
148
14912 final poolWorker = PoolWorker(worker, maxParallel);
1508 _startingWorkers++;
15124 tasks.add(poolWorker.worker.start().whenComplete(() {
15212 _startingWorkers--;
15312 }).then((_) {
154 // start succeeded: register worker
1558 _addWorkerAndNotify(poolWorker);
1569 }).catchError((ex, st) {
157 // start failed, ensure the worker is stopped
1583 poolWorker.worker.stop();
1593 errors.add(SquadronException.from(ex, st));
1601 }));
1610 } catch (ex, st) {
1620 errors.add(SquadronException.from(ex, st));
163 }
164 }
165
16616 return Future.wait(tasks).whenComplete(() {
16720 if (_workers.length > _maxSize) {
16816 _maxSize = _workers.length;
169 }
1708 if (errors.isNotEmpty) {
1714 if (errors.length < tasks.length) {
172 // some tasks failed: warn
1730 channelLogger?.e(() => 'Error while provisionning workers: $errors');
174 } else {
175 // all tasks failed: throw
1767 throw errors.firstWhere((e) => e is SquadronError,
1772 orElse: () => null) ??
1784 errors.firstWhere((e) => e is WorkerException,
1791 orElse: () => null) ??
1801 errors.first;
181 }
182 }
1834 });
1844 }
185
186 /// Ensure at least [ConcurrencySettings.minWorkers] workers are started
187 /// (defaulting to 1 if [ConcurrencySettings.minWorkers] is zero).
1883 @override
1891 FutureOr<void> start() {
1906 _stopped = false;
19112 final needs = _getProvisionNeeds(_queue.isEmpty ? 1 : _queue.length);
1924 if (needs > 0) {
1936 return _provisionWorkers(needs);
194 }
1951 }
196
1978 void _notify(W worker, {required bool removed}) {
19812 for (var listener in _workerPoolListeners.values) {
199 try {
2000 listener(worker, removed);
201 } catch (ex) {
202 // swallow error from user land
203 }
204 }
2054 }
206
2078 void _removeWorkerAndNotify(PoolWorker<W> poolWorker) {
20812 _workers.remove(poolWorker);
20912 _notify(poolWorker.worker, removed: true);
2104 }
211
2124 void _addWorkerAndNotify(PoolWorker<W> poolWorker) {
21312 _workers.add(poolWorker);
21412 _notify(poolWorker.worker, removed: false);
215 }
216
2178 int _removeWorker(PoolWorker<W> poolWorker, bool force) {
2189 if (force || _workers.length > concurrencySettings.minWorkers) {
2198 final worker = poolWorker.worker;
2208 worker.stop();
22116 _deadWorkerStats.add(worker.stats);
2228 _removeWorkerAndNotify(poolWorker);
2234 return 1;
224 } else {
2251 return 0;
226 }
2274 }
228
229 /// Stop idle pool workers matching the [predicate].
230 /// If [predicate] is null or not provided, all workers will be stopped.
231 /// Stopping a worker does not interrupt or cancel processing. Workers will
232 /// complete pending tasks before shutting down. In the meantime, they will
233 /// not receive any new workload.
234 /// Returns the number of workers that have been stopped.
2354 @override
2364 int stop([bool Function(W worker)? predicate]) {
2371 List<PoolWorker<W>> targets;
2381 bool force = (predicate == null);
2391 if (force) {
240 // kill workers while keeping enough workers alive to process pending tasks
24124 targets = _workers.skip(_queue.length).toList();
2428 _stopped = true;
243 } else {
244 // kill workers that are idle and satisfy the predicate
2458 targets = _workers.where((w) => w.isIdle && predicate(w.worker)).toList();
246 }
247 var stopped = 0;
24812 for (var poolWorker in targets) {
24912 stopped += _removeWorker(poolWorker, force);
250 }
2514 return stopped;
2524 }
253
2544 final _queue = Queue<WorkerTask>();
2554 final _executing = <WorkerTask>{};
256
257 /// Gets remaining workload
2588 int get pendingWorkload => _queue.length;
259
2607 WorkerTask<T, W> _enqueue<T>(WorkerTask<T, W> task) {
2618 if (_stopped) {
2622 throw SquadronErrorExt.create(
263 'The pool cannot accept new requests because it is stopped',
264 );
265 }
26612 _queue.addLast(task);
2678 _schedule();
2683 return task;
2693 }
270
271 /// Registers and schedules a [task] that returns a single value.
272 /// Returns a future that completes with the task's value.
2735 Future<T> execute<T>(Future<T> Function(W worker) task,
274 {PerfCounter? counter}) =>
2759 scheduleValueTask(task, counter: counter).value;
276
277 /// Registers and schedules a [task] that returns a stream of values.
278 /// Returns a stream containing the task's values.
2796 Stream<T> stream<T>(Stream<T> Function(W worker) task,
280 {PerfCounter? counter}) =>
28112 scheduleStreamTask(task, counter: counter).stream;
282
283 /// Registers and schedules a [task] that returns a single value.
284 /// Returns a [ValueTask]<T>.
2854 ValueTask<T> scheduleValueTask<T>(Future<T> Function(W worker) task,
286 {PerfCounter? counter}) =>
2879 _enqueue<T>(WorkerValueTask<T, W>(task, counter)) as ValueTask<T>;
288
289 /// Registers and schedules a [task] that returns a stream of values.
290 /// Returns a [StreamTask]<T>.
2916 StreamTask<T> scheduleStreamTask<T>(Stream<T> Function(W worker) task,
292 {PerfCounter? counter}) =>
29312 _enqueue<T>(WorkerStreamTask<T, W>(task, counter)) as StreamTask<T>;
294
295 /// Schedule tasks.
2968 void _schedule() {
29716 if (_workers.isEmpty && _startingWorkers > 0) {
298 // workers are still starting, defer
2996 Future(_schedule);
3002 return;
301 }
302
303 // remove dead workers
3044 _workers
3058 .where(PoolWorker.isStopped)
3068 .toList() // take a copy
30712 .forEach(_removeWorkerAndNotify);
308
309 // remove canceled tasks
31020 _queue.removeWhere((t) => t.isCanceled);
311
312 // any work to do?
3138 if (_queue.isEmpty) {
314 // no: effectively stop the pool if needed and return
31512 if (_stopped && _executing.isEmpty) {
3164 stop();
317 }
3184 return;
319 }
320
321 // yes: dispatch tasks to workers
3228 _dispatchTasks();
323
324 // and provision more workers if possible and necessary
32516 final needs = _getProvisionNeeds(_queue.length);
3268 if (needs > 0) {
3276 _provisionWorkers(needs).then(
3288 (_) => _dispatchTasks(),
3293 onError: (ex) {
3302 channelLogger?.e(() => 'Provisionning workers failed with error $ex');
3313 while (_queue.isNotEmpty) {
3324 _queue.removeFirst().cancel('Provisionning workers failed');
333 }
3341 },
335 );
336 }
3374 }
338
3394 int _sortAndGetMaxCapacity() {
34012 _workers.sort(PoolWorker.compareCapacityDesc);
34124 return _workers.isEmpty ? 0 : _workers.first.capacity;
342 }
343
3448 void _dispatchTasks() {
345 int maxCapacity;
34620 while (_queue.isNotEmpty && (maxCapacity = _sortAndGetMaxCapacity()) > 0) {
3478 maxCapacity -= 1;
34820 for (var idx = 0; idx < _workers.length; idx++) {
34912 final w = _workers[idx];
35028 if (_queue.isEmpty || w.capacity == 0 || w.capacity < maxCapacity) {
3513 break;
352 }
35312 final task = _queue.removeFirst();
35412 _executing.add(task);
35516 w.run(task).whenComplete(() {
35612 _executing.remove(task);
3578 _schedule();
3584 });
359 }
360 }
3614 }
362
363 /// Task cancelation. If a specific [task] is provided, only this task will be canceled.
364 /// Otherwise, all tasks registered with the [WorkerPool] are canceled.
3652 void cancel([Task? task, String? message]) {
3661 if (task != null) {
3673 _executing.remove(task);
3685 _queue.removeWhere((t) => t == task);
3692 task.cancel(message);
370 } else {
3714 final toBeCanceled = _executing.followedBy(_queue).toList();
3723 _executing.clear();
3733 _queue.clear();
3743 for (var task in toBeCanceled) {
3752 task.cancel(message);
376 }
377 }
3781 }
379
380 /// Worker statistics.
3814 Iterable<WorkerStat> get stats => _workers.map(PoolWorker.getStats);
382
383 /// Full worker statistics.
3840 Iterable<WorkerStat> get fullStats => _deadWorkerStats.followedBy(stats);
385
386 /// Worker pools do not need an [operations] map.
3870 @override
3880 Map<int, CommandHandler> get operations => WorkerService.noOperations;
389}
Choose Features