1 | | | import 'dart:async'; |
2 | | | import 'dart:collection'; |
3 | | |
|
4 | | | import 'package:logger/web.dart'; |
5 | | | import 'package:using/using.dart'; |
6 | | |
|
7 | | | import '../concurrency_settings.dart'; |
8 | | | import '../exceptions/exception_manager.dart'; |
9 | | | import '../exceptions/squadron_error.dart'; |
10 | | | import '../exceptions/squadron_exception.dart'; |
11 | | | import '../exceptions/worker_exception.dart'; |
12 | | | import '../iworker.dart'; |
13 | | | import '../stats/perf_counter.dart'; |
14 | | | import '../stats/worker_stat.dart'; |
15 | | | import '../worker/worker.dart'; |
16 | | | import '../worker_service.dart'; |
17 | | | import '_pool_worker.dart'; |
18 | | | import '_worker_stream_task.dart'; |
19 | | | import '_worker_task.dart'; |
20 | | | import '_worker_value_task.dart'; |
21 | | | import 'stream_task.dart'; |
22 | | | import 'task.dart'; |
23 | | | import 'value_task.dart'; |
24 | | |
|
25 | | | typedef WorkerFactory<W> = W Function(ExceptionManager); |
26 | | |
|
27 | | | /// Worker pool responsible for instantiating, starting and stopping workers running in parallel. |
28 | | | /// A [WorkerPool] is also responsible for creating and assigning [WorkerTask]s to [Worker]s. |
29 | | | abstract class WorkerPool<W extends Worker> |
30 | | | with Releasable |
31 | | | implements WorkerService, IWorker { |
32 | | | /// Create a worker pool. |
33 | | | /// |
34 | | | /// Workers are instantiated using the provided [_workerFactory]. |
35 | | | /// The pool will only instantiate workers as needed, depending on [concurrencySettings]. |
36 | | | /// The [ConcurrencySettings.minWorkers] and [ConcurrencySettings.maxWorkers] settings control |
37 | | | /// how many workers will live in the pool. The [ConcurrencySettings.maxParallel] setting |
38 | | | /// controls how many tasks can be posted to each individual worker in the pool. |
39 | | 4 | WorkerPool(this._workerFactory, |
40 | | | {ConcurrencySettings? concurrencySettings, |
41 | | | ExceptionManager? exceptionManager}) |
42 | | 0 | : concurrencySettings = concurrencySettings ?? ConcurrencySettings(), |
43 | | 6 | _exceptionManager = exceptionManager ?? ExceptionManager(); |
44 | | |
|
45 | | 3 | @override |
46 | | 3 | void release() { |
47 | | 6 | stop(); |
48 | | 6 | super.release(); |
49 | | 3 | } |
50 | | |
|
51 | | | final WorkerFactory<W> _workerFactory; |
52 | | |
|
53 | | | @override |
54 | | | Logger? channelLogger; |
55 | | |
|
56 | | 4 | @override |
57 | | 1 | ExceptionManager get exceptionManager => |
58 | | 8 | (_exceptionManager ??= ExceptionManager()); |
59 | | | ExceptionManager? _exceptionManager; |
60 | | |
|
61 | | | /// Concurrency settings. |
62 | | | final ConcurrencySettings concurrencySettings; |
63 | | |
|
64 | | | /// Minimum workers. |
65 | | 16 | int get minWorkers => concurrencySettings.minWorkers; |
66 | | |
|
67 | | | /// Maximum workers. |
68 | | 16 | int get maxWorkers => concurrencySettings.maxWorkers; |
69 | | |
|
70 | | | /// Maximum tasks per worker. |
71 | | 15 | int get maxParallel => concurrencySettings.maxParallel; |
72 | | |
|
73 | | | /// Maximum running tasks. |
74 | | 12 | int get maxConcurrency => concurrencySettings.maxConcurrency; |
75 | | |
|
76 | | 4 | final _workers = <PoolWorker<W>>[]; |
77 | | |
|
78 | | 4 | final _deadWorkerStats = <WorkerStat>[]; |
79 | | |
|
80 | | | /// Whether this pool is scheduled for stopping. |
81 | | 3 | bool get stopped => _stopped; |
82 | | | bool _stopped = false; |
83 | | |
|
84 | | | /// Number of workers. |
85 | | 4 | int get size => _workers.length; |
86 | | |
|
87 | | | /// Maximum number of workers. |
88 | | 0 | int get maxSize => _maxSize; |
89 | | | int _maxSize = 0; |
90 | | |
|
91 | | | /// Current workload. |
92 | | 0 | int get workload => stats.fold<int>(0, (p, w) => p + w.workload); |
93 | | |
|
94 | | | /// Maximum workload. |
95 | | 0 | int get maxWorkload => fullStats.fold<int>( |
96 | | 0 | 0, (p, s) => (p >= s.maxWorkload) ? p : s.maxWorkload); |
97 | | |
|
98 | | | /// Total workload. |
99 | | 0 | int get totalWorkload => |
100 | | 0 | fullStats.fold<int>(0, (p, s) => p + s.totalWorkload); |
101 | | |
|
102 | | | /// Number of errors. |
103 | | 0 | int get totalErrors => fullStats.fold<int>(0, (p, s) => p + s.totalErrors); |
104 | | |
|
105 | | | final _workerPoolListeners = |
106 | | 4 | <Object, void Function(W worker, bool removed)>{}; |
107 | | |
|
108 | | | /// Registers a callback to be invoked when a worker thread is added or removed from the pool. |
109 | | 0 | Object registerWorkerPoolListener( |
110 | | | void Function(W worker, bool removed) listener) { |
111 | | 0 | final token = Object(); |
112 | | 0 | _workerPoolListeners[token] = listener; |
113 | | | return token; |
114 | | | } |
115 | | |
|
116 | | | /// Unregisters a callback. |
117 | | 0 | void unregisterWorkerPoolListener( |
118 | | | {Function(W worker, bool removed)? listener, Object? token}) { |
119 | | | if (token != null) { |
120 | | 0 | _workerPoolListeners.remove(token); |
121 | | | } else if (listener != null) { |
122 | | 0 | _workerPoolListeners.removeWhere((key, value) => value == listener); |
123 | | | } |
124 | | | } |
125 | | |
|
126 | | | int _startingWorkers = 0; |
127 | | |
|
128 | | 8 | int _getProvisionNeeds(int workload) { |
129 | | 12 | if (workload < minWorkers) { |
130 | | | // at least minWorkers |
131 | | 4 | workload = minWorkers; |
132 | | | } |
133 | | 20 | if (maxWorkers > 0 && workload > maxWorkers) { |
134 | | | // at most maxWorkers if > 0 |
135 | | 3 | workload = maxWorkers; |
136 | | | } |
137 | | | // adjust by _workers.length and _startingWorkers |
138 | | 24 | return workload - _workers.length - _startingWorkers; |
139 | | 4 | } |
140 | | |
|
141 | | 8 | Future<void> _provisionWorkers(int workload) { |
142 | | 8 | final tasks = <Future>[]; |
143 | | 4 | final errors = []; |
144 | | 12 | for (var i = 0; i < workload; i++) { |
145 | | 0 | try { |
146 | | 16 | final worker = _workerFactory(exceptionManager); |
147 | | 12 | worker.channelLogger = channelLogger; |
148 | | |
|
149 | | 12 | final poolWorker = PoolWorker(worker, maxParallel); |
150 | | 8 | _startingWorkers++; |
151 | | 24 | tasks.add(poolWorker.worker.start().whenComplete(() { |
152 | | 12 | _startingWorkers--; |
153 | | 12 | }).then((_) { |
154 | | | // start succeeded: register worker |
155 | | 8 | _addWorkerAndNotify(poolWorker); |
156 | | 9 | }).catchError((ex, st) { |
157 | | | // start failed, ensure the worker is stopped |
158 | | 3 | poolWorker.worker.stop(); |
159 | | 3 | errors.add(SquadronException.from(ex, st)); |
160 | | 1 | })); |
161 | | 0 | } catch (ex, st) { |
162 | | 0 | errors.add(SquadronException.from(ex, st)); |
163 | | | } |
164 | | | } |
165 | | |
|
166 | | 16 | return Future.wait(tasks).whenComplete(() { |
167 | | 20 | if (_workers.length > _maxSize) { |
168 | | 16 | _maxSize = _workers.length; |
169 | | | } |
170 | | 8 | if (errors.isNotEmpty) { |
171 | | 4 | if (errors.length < tasks.length) { |
172 | | | // some tasks failed: warn |
173 | | 0 | channelLogger?.e(() => 'Error while provisionning workers: $errors'); |
174 | | | } else { |
175 | | | // all tasks failed: throw |
176 | | 7 | throw errors.firstWhere((e) => e is SquadronError, |
177 | | 2 | orElse: () => null) ?? |
178 | | 4 | errors.firstWhere((e) => e is WorkerException, |
179 | | 1 | orElse: () => null) ?? |
180 | | 1 | errors.first; |
181 | | | } |
182 | | | } |
183 | | 4 | }); |
184 | | 4 | } |
185 | | |
|
186 | | | /// Ensure at least [ConcurrencySettings.minWorkers] workers are started |
187 | | | /// (defaulting to 1 if [ConcurrencySettings.minWorkers] is zero). |
188 | | 3 | @override |
189 | | 1 | FutureOr<void> start() { |
190 | | 6 | _stopped = false; |
191 | | 12 | final needs = _getProvisionNeeds(_queue.isEmpty ? 1 : _queue.length); |
192 | | 4 | if (needs > 0) { |
193 | | 6 | return _provisionWorkers(needs); |
194 | | | } |
195 | | 1 | } |
196 | | |
|
197 | | 8 | void _notify(W worker, {required bool removed}) { |
198 | | 12 | for (var listener in _workerPoolListeners.values) { |
199 | | | try { |
200 | | 0 | listener(worker, removed); |
201 | | | } catch (ex) { |
202 | | | // swallow error from user land |
203 | | | } |
204 | | | } |
205 | | 4 | } |
206 | | |
|
207 | | 8 | void _removeWorkerAndNotify(PoolWorker<W> poolWorker) { |
208 | | 12 | _workers.remove(poolWorker); |
209 | | 12 | _notify(poolWorker.worker, removed: true); |
210 | | 4 | } |
211 | | |
|
212 | | 4 | void _addWorkerAndNotify(PoolWorker<W> poolWorker) { |
213 | | 12 | _workers.add(poolWorker); |
214 | | 12 | _notify(poolWorker.worker, removed: false); |
215 | | | } |
216 | | |
|
217 | | 8 | int _removeWorker(PoolWorker<W> poolWorker, bool force) { |
218 | | 9 | if (force || _workers.length > concurrencySettings.minWorkers) { |
219 | | 8 | final worker = poolWorker.worker; |
220 | | 8 | worker.stop(); |
221 | | 16 | _deadWorkerStats.add(worker.stats); |
222 | | 8 | _removeWorkerAndNotify(poolWorker); |
223 | | 4 | return 1; |
224 | | | } else { |
225 | | 1 | return 0; |
226 | | | } |
227 | | 4 | } |
228 | | |
|
229 | | | /// Stop idle pool workers matching the [predicate]. |
230 | | | /// If [predicate] is null or not provided, all workers will be stopped. |
231 | | | /// Stopping a worker does not interrupt or cancel processing. Workers will |
232 | | | /// complete pending tasks before shutting down. In the meantime, they will |
233 | | | /// not receive any new workload. |
234 | | | /// Returns the number of workers that have been stopped. |
235 | | 4 | @override |
236 | | 4 | int stop([bool Function(W worker)? predicate]) { |
237 | | 1 | List<PoolWorker<W>> targets; |
238 | | 1 | bool force = (predicate == null); |
239 | | 1 | if (force) { |
240 | | | // kill workers while keeping enough workers alive to process pending tasks |
241 | | 24 | targets = _workers.skip(_queue.length).toList(); |
242 | | 8 | _stopped = true; |
243 | | | } else { |
244 | | | // kill workers that are idle and satisfy the predicate |
245 | | 8 | targets = _workers.where((w) => w.isIdle && predicate(w.worker)).toList(); |
246 | | | } |
247 | | | var stopped = 0; |
248 | | 12 | for (var poolWorker in targets) { |
249 | | 12 | stopped += _removeWorker(poolWorker, force); |
250 | | | } |
251 | | 4 | return stopped; |
252 | | 4 | } |
253 | | |
|
254 | | 4 | final _queue = Queue<WorkerTask>(); |
255 | | 4 | final _executing = <WorkerTask>{}; |
256 | | |
|
257 | | | /// Gets remaining workload |
258 | | 8 | int get pendingWorkload => _queue.length; |
259 | | |
|
260 | | 7 | WorkerTask<T, W> _enqueue<T>(WorkerTask<T, W> task) { |
261 | | 8 | if (_stopped) { |
262 | | 2 | throw SquadronErrorExt.create( |
263 | | | 'The pool cannot accept new requests because it is stopped', |
264 | | | ); |
265 | | | } |
266 | | 12 | _queue.addLast(task); |
267 | | 8 | _schedule(); |
268 | | 3 | return task; |
269 | | 3 | } |
270 | | |
|
271 | | | /// Registers and schedules a [task] that returns a single value. |
272 | | | /// Returns a future that completes with the task's value. |
273 | | 5 | Future<T> execute<T>(Future<T> Function(W worker) task, |
274 | | | {PerfCounter? counter}) => |
275 | | 9 | scheduleValueTask(task, counter: counter).value; |
276 | | |
|
277 | | | /// Registers and schedules a [task] that returns a stream of values. |
278 | | | /// Returns a stream containing the task's values. |
279 | | 6 | Stream<T> stream<T>(Stream<T> Function(W worker) task, |
280 | | | {PerfCounter? counter}) => |
281 | | 12 | scheduleStreamTask(task, counter: counter).stream; |
282 | | |
|
283 | | | /// Registers and schedules a [task] that returns a single value. |
284 | | | /// Returns a [ValueTask]<T>. |
285 | | 4 | ValueTask<T> scheduleValueTask<T>(Future<T> Function(W worker) task, |
286 | | | {PerfCounter? counter}) => |
287 | | 9 | _enqueue<T>(WorkerValueTask<T, W>(task, counter)) as ValueTask<T>; |
288 | | |
|
289 | | | /// Registers and schedules a [task] that returns a stream of values. |
290 | | | /// Returns a [StreamTask]<T>. |
291 | | 6 | StreamTask<T> scheduleStreamTask<T>(Stream<T> Function(W worker) task, |
292 | | | {PerfCounter? counter}) => |
293 | | 12 | _enqueue<T>(WorkerStreamTask<T, W>(task, counter)) as StreamTask<T>; |
294 | | |
|
295 | | | /// Schedule tasks. |
296 | | 8 | void _schedule() { |
297 | | 16 | if (_workers.isEmpty && _startingWorkers > 0) { |
298 | | | // workers are still starting, defer |
299 | | 6 | Future(_schedule); |
300 | | 2 | return; |
301 | | | } |
302 | | |
|
303 | | | // remove dead workers |
304 | | 4 | _workers |
305 | | 8 | .where(PoolWorker.isStopped) |
306 | | 8 | .toList() // take a copy |
307 | | 12 | .forEach(_removeWorkerAndNotify); |
308 | | |
|
309 | | | // remove canceled tasks |
310 | | 20 | _queue.removeWhere((t) => t.isCanceled); |
311 | | |
|
312 | | | // any work to do? |
313 | | 8 | if (_queue.isEmpty) { |
314 | | | // no: effectively stop the pool if needed and return |
315 | | 12 | if (_stopped && _executing.isEmpty) { |
316 | | 4 | stop(); |
317 | | | } |
318 | | 4 | return; |
319 | | | } |
320 | | |
|
321 | | | // yes: dispatch tasks to workers |
322 | | 8 | _dispatchTasks(); |
323 | | |
|
324 | | | // and provision more workers if possible and necessary |
325 | | 16 | final needs = _getProvisionNeeds(_queue.length); |
326 | | 8 | if (needs > 0) { |
327 | | 6 | _provisionWorkers(needs).then( |
328 | | 8 | (_) => _dispatchTasks(), |
329 | | 3 | onError: (ex) { |
330 | | 2 | channelLogger?.e(() => 'Provisionning workers failed with error $ex'); |
331 | | 3 | while (_queue.isNotEmpty) { |
332 | | 4 | _queue.removeFirst().cancel('Provisionning workers failed'); |
333 | | | } |
334 | | 1 | }, |
335 | | | ); |
336 | | | } |
337 | | 4 | } |
338 | | |
|
339 | | 4 | int _sortAndGetMaxCapacity() { |
340 | | 12 | _workers.sort(PoolWorker.compareCapacityDesc); |
341 | | 24 | return _workers.isEmpty ? 0 : _workers.first.capacity; |
342 | | | } |
343 | | |
|
344 | | 8 | void _dispatchTasks() { |
345 | | | int maxCapacity; |
346 | | 20 | while (_queue.isNotEmpty && (maxCapacity = _sortAndGetMaxCapacity()) > 0) { |
347 | | 8 | maxCapacity -= 1; |
348 | | 20 | for (var idx = 0; idx < _workers.length; idx++) { |
349 | | 12 | final w = _workers[idx]; |
350 | | 28 | if (_queue.isEmpty || w.capacity == 0 || w.capacity < maxCapacity) { |
351 | | 3 | break; |
352 | | | } |
353 | | 12 | final task = _queue.removeFirst(); |
354 | | 12 | _executing.add(task); |
355 | | 16 | w.run(task).whenComplete(() { |
356 | | 12 | _executing.remove(task); |
357 | | 8 | _schedule(); |
358 | | 4 | }); |
359 | | | } |
360 | | | } |
361 | | 4 | } |
362 | | |
|
363 | | | /// Task cancelation. If a specific [task] is provided, only this task will be canceled. |
364 | | | /// Otherwise, all tasks registered with the [WorkerPool] are canceled. |
365 | | 2 | void cancel([Task? task, String? message]) { |
366 | | 1 | if (task != null) { |
367 | | 3 | _executing.remove(task); |
368 | | 5 | _queue.removeWhere((t) => t == task); |
369 | | 2 | task.cancel(message); |
370 | | | } else { |
371 | | 4 | final toBeCanceled = _executing.followedBy(_queue).toList(); |
372 | | 3 | _executing.clear(); |
373 | | 3 | _queue.clear(); |
374 | | 3 | for (var task in toBeCanceled) { |
375 | | 2 | task.cancel(message); |
376 | | | } |
377 | | | } |
378 | | 1 | } |
379 | | |
|
380 | | | /// Worker statistics. |
381 | | 4 | Iterable<WorkerStat> get stats => _workers.map(PoolWorker.getStats); |
382 | | |
|
383 | | | /// Full worker statistics. |
384 | | 0 | Iterable<WorkerStat> get fullStats => _deadWorkerStats.followedBy(stats); |
385 | | |
|
386 | | | /// Worker pools do not need an [operations] map. |
387 | | 0 | @override |
388 | | 0 | Map<int, CommandHandler> get operations => WorkerService.noOperations; |
389 | | | } |