| 1 | | | import 'dart:async'; |
| 2 | | | import 'dart:collection'; |
| 3 | | | |
| 4 | | | import 'package:logger/web.dart'; |
| 5 | | | import 'package:using/using.dart'; |
| 6 | | | |
| 7 | | | import '../concurrency_settings.dart'; |
| 8 | | | import '../exceptions/exception_manager.dart'; |
| 9 | | | import '../exceptions/squadron_error.dart'; |
| 10 | | | import '../exceptions/squadron_exception.dart'; |
| 11 | | | import '../exceptions/task_terminated_exception.dart'; |
| 12 | | | import '../exceptions/worker_exception.dart'; |
| 13 | | | import '../iworker.dart'; |
| 14 | | | import '../stats/perf_counter.dart'; |
| 15 | | | import '../stats/worker_stat.dart'; |
| 16 | | | import '../typedefs.dart'; |
| 17 | | | import '../worker/worker.dart'; |
| 18 | | | import '../worker_service.dart'; |
| 19 | | | import '_pool_worker.dart'; |
| 20 | | | import '_worker_stream_task.dart'; |
| 21 | | | import '_worker_task.dart'; |
| 22 | | | import '_worker_value_task.dart'; |
| 23 | | | import 'stream_task.dart'; |
| 24 | | | import 'task.dart'; |
| 25 | | | import 'value_task.dart'; |
| 26 | | | |
| 27 | | | typedef WorkerFactory<W> = W Function(ExceptionManager); |
| 28 | | | |
| 29 | | | /// Worker pool responsible for instantiating, starting and stopping workers running in parallel. |
| 30 | | | /// A [WorkerPool] is also responsible for creating and assigning [WorkerTask]s to [Worker]s. |
| 31 | | | abstract class WorkerPool<W extends Worker> |
| 32 | | | with Releasable |
| 33 | | | implements WorkerService, IWorker { |
| 34 | | | /// Create a worker pool. |
| 35 | | | /// |
| 36 | | | /// Workers are instantiated using the provided [_workerFactory]. |
| 37 | | | /// The pool will only instantiate workers as needed, depending on [concurrencySettings]. |
| 38 | | | /// The [ConcurrencySettings.minWorkers] and [ConcurrencySettings.maxWorkers] settings control |
| 39 | | | /// how many workers will live in the pool. The [ConcurrencySettings.maxParallel] setting |
| 40 | | | /// controls how many tasks can be posted to each individual worker in the pool. |
| 41 | | 4 | WorkerPool(this._workerFactory, |
| 42 | | | {ConcurrencySettings? concurrencySettings, |
| 43 | | | ExceptionManager? exceptionManager}) |
| 44 | | 0 | : concurrencySettings = concurrencySettings ?? ConcurrencySettings(), |
| 45 | | 3 | _exceptionManager = exceptionManager ?? ExceptionManager(); |
| 46 | | | |
| 47 | | 4 | @override |
| 48 | | | void release() { |
| 49 | | 4 | stop(); |
| 50 | | 4 | super.release(); |
| 51 | | | } |
| 52 | | | |
| 53 | | | final WorkerFactory<W> _workerFactory; |
| 54 | | | |
| 55 | | | @override |
| 56 | | | Logger? channelLogger; |
| 57 | | | |
| 58 | | 4 | @override |
| 59 | | | ExceptionManager get exceptionManager => |
| 60 | | 4 | (_exceptionManager ??= ExceptionManager()); |
| 61 | | | ExceptionManager? _exceptionManager; |
| 62 | | | |
| 63 | | | /// Concurrency settings. |
| 64 | | | final ConcurrencySettings concurrencySettings; |
| 65 | | | |
| 66 | | | final _workers = <PoolWorker<W>>[]; |
| 67 | | | |
| 68 | | | final _deadWorkerStats = <WorkerStat>[]; |
| 69 | | | |
| 70 | | | /// Whether this pool is scheduled for stopping. |
| 71 | | 2 | bool get stopped => _stopped; |
| 72 | | | bool _stopped = false; |
| 73 | | | |
| 74 | | | /// Number of workers. |
| 75 | | 3 | int get size => _workers.length; |
| 76 | | | |
| 77 | | | /// Maximum number of workers. |
| 78 | | 0 | int get maxSize => _maxSize; |
| 79 | | | int _maxSize = 0; |
| 80 | | | |
| 81 | | | final _workerPoolListeners = <Object, void Function(WorkerStat, bool)>{}; |
| 82 | | | |
| 83 | | | /// Registers a callback to be invoked when a worker thread is added or removed from the pool. |
| 84 | | 0 | Object registerWorkerPoolListener(void Function(WorkerStat, bool) listener) { |
| 85 | | 0 | final token = Object(); |
| 86 | | 0 | _workerPoolListeners[token] = listener; |
| 87 | | | return token; |
| 88 | | | } |
| 89 | | | |
| 90 | | | /// Unregisters a callback. |
| 91 | | 0 | void unregisterWorkerPoolListener( |
| 92 | | | {Function(WorkerStat, bool)? listener, Object? token}) { |
| 93 | | | if (token != null) { |
| 94 | | 0 | _workerPoolListeners.remove(token); |
| 95 | | | } else if (listener != null) { |
| 96 | | 0 | _workerPoolListeners.removeWhere((key, value) => value == listener); |
| 97 | | | } |
| 98 | | | } |
| 99 | | | |
| 100 | | | int _startingWorkers = 0; |
| 101 | | | |
| 102 | | 4 | int _getProvisionNeeds(int workload) { |
| 103 | | 8 | final minWorkers = concurrencySettings.minWorkers; |
| 104 | | 4 | if (workload < minWorkers) { |
| 105 | | | // at least minWorkers |
| 106 | | | workload = minWorkers; |
| 107 | | | } |
| 108 | | 8 | final maxWorkers = concurrencySettings.maxWorkers; |
| 109 | | 8 | if (maxWorkers > 0 && workload > maxWorkers) { |
| 110 | | | // at most maxWorkers if > 0 |
| 111 | | | workload = maxWorkers; |
| 112 | | | } |
| 113 | | | // adjust by _workers.length and _startingWorkers |
| 114 | | 20 | return workload - _workers.length - _startingWorkers; |
| 115 | | | } |
| 116 | | | |
| 117 | | 4 | Future<void> _provisionWorkers(int workload) { |
| 118 | | 8 | final tasks = <Future>[], errors = []; |
| 119 | | 8 | final maxParallel = concurrencySettings.maxParallel; |
| 120 | | 8 | for (var i = 0; i < workload; i++) { |
| 121 | | | try { |
| 122 | | 12 | final worker = _workerFactory(exceptionManager); |
| 123 | | 8 | worker.channelLogger = channelLogger; |
| 124 | | | |
| 125 | | 4 | final poolWorker = PoolWorker(worker, maxParallel); |
| 126 | | 8 | _startingWorkers++; |
| 127 | | 20 | tasks.add(poolWorker.worker.start().whenComplete(() { |
| 128 | | 8 | _startingWorkers--; |
| 129 | | 8 | }).then((_) { |
| 130 | | | // start succeeded: register worker |
| 131 | | 4 | _addWorkerAndNotify(poolWorker); |
| 132 | | 5 | }).catchError((ex, st) { |
| 133 | | | // start failed, ensure the worker is stopped |
| 134 | | 2 | poolWorker.worker.terminate(); |
| 135 | | 2 | errors.add(SquadronException.from(ex, st)); |
| 136 | | | })); |
| 137 | | | } catch (ex, st) { |
| 138 | | 0 | errors.add(SquadronException.from(ex, st)); |
| 139 | | | } |
| 140 | | | } |
| 141 | | | |
| 142 | | 12 | return Future.wait(tasks).whenComplete(() { |
| 143 | | 16 | if (_workers.length > _maxSize) { |
| 144 | | 12 | _maxSize = _workers.length; |
| 145 | | | } |
| 146 | | 4 | if (errors.isNotEmpty) { |
| 147 | | 3 | if (errors.length < tasks.length) { |
| 148 | | | // some tasks failed: warn |
| 149 | | 0 | channelLogger?.e(() => 'Error while provisionning workers: $errors'); |
| 150 | | | } else { |
| 151 | | | // all tasks failed: throw |
| 152 | | 3 | throw errors.firstWhere((e) => e is SquadronError, |
| 153 | | 1 | orElse: () => null) ?? |
| 154 | | 3 | errors.firstWhere((e) => e is WorkerException, |
| 155 | | 0 | orElse: () => null) ?? |
| 156 | | 0 | errors.first; |
| 157 | | | } |
| 158 | | | } |
| 159 | | | }); |
| 160 | | | } |
| 161 | | | |
| 162 | | | /// Ensure at least [ConcurrencySettings.minWorkers] workers are started |
| 163 | | | /// (defaulting to 1 if [ConcurrencySettings.minWorkers] is zero). |
| 164 | | 2 | @override |
| 165 | | | FutureOr<void> start() { |
| 166 | | 2 | _stopped = false; |
| 167 | | 6 | final needs = _getProvisionNeeds(_queue.isEmpty ? 1 : _queue.length); |
| 168 | | 2 | if (needs > 0) { |
| 169 | | 2 | return _provisionWorkers(needs); |
| 170 | | | } |
| 171 | | | } |
| 172 | | | |
| 173 | | 4 | void _notify(WorkerStat stats, {required bool removed}) { |
| 174 | | 8 | if (_workerPoolListeners.isNotEmpty) { |
| 175 | | 0 | for (var listener in _workerPoolListeners.values) { |
| 176 | | | try { |
| 177 | | 0 | listener(stats, removed); |
| 178 | | | } catch (ex) { |
| 179 | | | // swallow error from user land |
| 180 | | | } |
| 181 | | | } |
| 182 | | | } |
| 183 | | | } |
| 184 | | | |
| 185 | | 4 | int _removeWorkerAndNotify(PoolWorker<W> poolWorker, bool force) { |
| 186 | | 5 | if (force || _workers.length > concurrencySettings.minWorkers) { |
| 187 | | 4 | final worker = poolWorker.worker; |
| 188 | | 4 | worker.stop(); |
| 189 | | 8 | if (_workers.remove(poolWorker)) { |
| 190 | | 4 | final stats = worker.getStats(); |
| 191 | | 8 | _deadWorkerStats.add(stats); |
| 192 | | 4 | _notify(stats, removed: true); |
| 193 | | | return 1; |
| 194 | | | } |
| 195 | | | } |
| 196 | | | return 0; |
| 197 | | | } |
| 198 | | | |
| 199 | | 4 | void _addWorkerAndNotify(PoolWorker<W> poolWorker) { |
| 200 | | 8 | _workers.add(poolWorker); |
| 201 | | 12 | _notify(poolWorker.worker.getStats(), removed: false); |
| 202 | | | } |
| 203 | | | |
| 204 | | | /// Stop idle pool workers matching the [predicate]. |
| 205 | | | /// If [predicate] is null or not provided, all workers will be stopped. |
| 206 | | | /// Stopping a worker does not interrupt or cancel processing. Workers will |
| 207 | | | /// complete pending tasks before shutting down. In the meantime, they will |
| 208 | | | /// not receive any new workload. |
| 209 | | | /// Returns the number of workers that have been stopped. |
| 210 | | 4 | @override |
| 211 | | | int stop([bool Function(W worker)? predicate]) { |
| 212 | | | List<PoolWorker<W>> targets; |
| 213 | | | bool force = (predicate == null); |
| 214 | | 8 | _workers.sort(PoolWorker.compareCapacity); |
| 215 | | | if (force) { |
| 216 | | | // kill workers while keeping enough workers alive to process pending tasks |
| 217 | | 24 | targets = _workers.reversed.skip(_queue.length).toList(); |
| 218 | | 4 | _stopped = true; |
| 219 | | | } else { |
| 220 | | | // kill workers that are idle and satisfy the predicate |
| 221 | | 7 | targets = _workers.where((w) => w.isIdle && predicate(w.worker)).toList(); |
| 222 | | | } |
| 223 | | | var stopped = 0; |
| 224 | | 8 | for (var poolWorker in targets) { |
| 225 | | 8 | stopped += _removeWorkerAndNotify(poolWorker, force); |
| 226 | | | } |
| 227 | | | return stopped; |
| 228 | | | } |
| 229 | | | |
| 230 | | 1 | @override |
| 231 | | | void terminate([TaskTerminatedException? ex]) { |
| 232 | | 1 | _stopped = true; |
| 233 | | 5 | for (var i = _workers.length - 1; i >= 0; i--) { |
| 234 | | 2 | final w = _workers[i]; |
| 235 | | 2 | w.worker.terminate(ex); |
| 236 | | 1 | _removeWorkerAndNotify(w, true); |
| 237 | | | } |
| 238 | | | } |
| 239 | | | |
| 240 | | | final _queue = Queue<WorkerTask>(); |
| 241 | | | final _executing = <WorkerTask>{}; |
| 242 | | | |
| 243 | | | /// Gets remaining workload |
| 244 | | 6 | int get pendingWorkload => _queue.length; |
| 245 | | | |
| 246 | | 4 | WorkerTask<T, W> _enqueue<T>(WorkerTask<T, W> task) { |
| 247 | | 4 | if (_stopped) { |
| 248 | | 1 | throw SquadronErrorImpl.create( |
| 249 | | | 'The pool cannot accept new requests because it is stopped', |
| 250 | | | ); |
| 251 | | | } |
| 252 | | 8 | _queue.addLast(task); |
| 253 | | 4 | _schedule(); |
| 254 | | | return task; |
| 255 | | | } |
| 256 | | | |
| 257 | | | /// Registers and schedules a [task] that returns a single value. |
| 258 | | | /// Returns a future that completes with the task's value. |
| 259 | | 4 | Future<T> execute<T>(Future<T> Function(W worker) task, |
| 260 | | | {PerfCounter? counter}) => |
| 261 | | 8 | scheduleValueTask(task, counter: counter).value; |
| 262 | | | |
| 263 | | | /// Registers and schedules a [task] that returns a stream of values. |
| 264 | | | /// Returns a stream containing the task's values. |
| 265 | | 4 | Stream<T> stream<T>(Stream<T> Function(W worker) task, |
| 266 | | | {PerfCounter? counter}) => |
| 267 | | 8 | scheduleStreamTask(task, counter: counter).stream; |
| 268 | | | |
| 269 | | | /// Registers and schedules a [task] that returns a single value. |
| 270 | | | /// Returns a [ValueTask]. |
| 271 | | 4 | ValueTask<T> scheduleValueTask<T>(Future<T> Function(W worker) task, |
| 272 | | | {PerfCounter? counter}) => |
| 273 | | 8 | _enqueue<T>(WorkerValueTask<T, W>(task, counter)) as ValueTask<T>; |
| 274 | | | |
| 275 | | | /// Registers and schedules a [task] that returns a stream of values. |
| 276 | | | /// Returns a [StreamTask]. |
| 277 | | 4 | StreamTask<T> scheduleStreamTask<T>(Stream<T> Function(W worker) task, |
| 278 | | | {PerfCounter? counter}) => |
| 279 | | 8 | _enqueue<T>(WorkerStreamTask<T, W>(task, counter)) as StreamTask<T>; |
| 280 | | | |
| 281 | | | Timer? _timer; |
| 282 | | | |
| 283 | | 4 | void _schedule() { |
| 284 | | 12 | if (_timer?.isActive != true) { |
| 285 | | 12 | _timer = Timer(Duration.zero, __schedule); |
| 286 | | | } |
| 287 | | | } |
| 288 | | | |
| 289 | | | /// Schedule tasks. |
| 290 | | 4 | void __schedule() { |
| 291 | | 16 | if (_workers.isEmpty && _startingWorkers > 0) { |
| 292 | | | // workers are still starting, defer |
| 293 | | 1 | _schedule(); |
| 294 | | | return; |
| 295 | | | } |
| 296 | | | |
| 297 | | | // any work to do? |
| 298 | | 8 | if (_queue.isEmpty) { |
| 299 | | | // no: effectively stop the pool if needed and return |
| 300 | | 12 | if (_stopped && _executing.isEmpty) { |
| 301 | | 4 | stop(); |
| 302 | | | } |
| 303 | | | return; |
| 304 | | | } |
| 305 | | | |
| 306 | | | // yes: dispatch tasks to workers |
| 307 | | 4 | _dispatchTasks(); |
| 308 | | | |
| 309 | | | // and provision more workers if possible and necessary |
| 310 | | 12 | final needs = _getProvisionNeeds(_queue.length); |
| 311 | | 4 | if (needs > 0) { |
| 312 | | 8 | _provisionWorkers(needs).then( |
| 313 | | 8 | (_) => _dispatchTasks(), |
| 314 | | 1 | onError: (ex) { |
| 315 | | 1 | channelLogger?.e(() => 'Provisionning workers failed with error $ex'); |
| 316 | | 2 | if (_workers.isEmpty) { |
| 317 | | 2 | while (_queue.isNotEmpty) { |
| 318 | | 3 | _queue.removeFirst().cancel('Provisionning workers failed'); |
| 319 | | | } |
| 320 | | | } else { |
| 321 | | 0 | _schedule(); |
| 322 | | | } |
| 323 | | | }, |
| 324 | | | ); |
| 325 | | | } |
| 326 | | | } |
| 327 | | | |
| 328 | | 4 | void _dispatchTasks() { |
| 329 | | 8 | _workers.sort(PoolWorker.compareCapacity); |
| 330 | | 20 | for (var idx = _workers.length - 1; idx >= 0; idx--) { |
| 331 | | 8 | final w = _workers[idx]; |
| 332 | | 4 | if (w.isStopped) { |
| 333 | | 0 | _removeWorkerAndNotify(w, false); |
| 334 | | | continue; |
| 335 | | | } |
| 336 | | 16 | while (_queue.isNotEmpty && w.capacity > 0) { |
| 337 | | 8 | final task = _queue.removeFirst(); |
| 338 | | 4 | if (!task.isCanceled) { |
| 339 | | 8 | _executing.add(task); |
| 340 | | 12 | w.run(task).whenComplete(() { |
| 341 | | 8 | _executing.remove(task); |
| 342 | | 4 | _schedule(); |
| 343 | | | }); |
| 344 | | | } |
| 345 | | | } |
| 346 | | | } |
| 347 | | | } |
| 348 | | | |
| 349 | | | /// Task cancelation. If a specific [task] is provided, only this task will be canceled. |
| 350 | | | /// Otherwise, all tasks registered with the [WorkerPool] are canceled. |
| 351 | | 1 | void cancel(Task task, [String? message]) { |
| 352 | | 2 | _executing.remove(task); |
| 353 | | 4 | _queue.removeWhere((t) => t == task); |
| 354 | | 1 | task.cancel(message); |
| 355 | | | } |
| 356 | | | |
| 357 | | | /// Task cancelation. If a specific [task] is provided, only this task will be canceled. |
| 358 | | | /// Otherwise, all tasks registered with the [WorkerPool] are canceled. |
| 359 | | 1 | void cancelAll([String? message]) { |
| 360 | | 4 | final toBeCanceled = _executing.followedBy(_queue).toList(); |
| 361 | | 2 | _executing.clear(); |
| 362 | | 2 | _queue.clear(); |
| 363 | | 2 | for (var task in toBeCanceled) { |
| 364 | | 1 | task.cancel(message); |
| 365 | | | } |
| 366 | | | } |
| 367 | | | |
| 368 | | | /// Worker statistics. |
| 369 | | 3 | Iterable<WorkerStat> get stats => _workers.map(PoolWorker.getStats); |
| 370 | | | |
| 371 | | | /// Full worker statistics. |
| 372 | | 0 | Iterable<WorkerStat> get fullStats => _deadWorkerStats.followedBy(stats); |
| 373 | | | |
| 374 | | | /// Worker pools do not need an [operations] map. |
| 375 | | 0 | @override |
| 376 | | 0 | OperationsMap get operations => WorkerService.noOperations; |
| 377 | | | } |
| 378 | | | |
| 379 | | | extension ConcurrencySettingsExt on WorkerPool { |
| 380 | | 3 | int get minWorkers => concurrencySettings.minWorkers; |
| 381 | | 3 | int get maxWorkers => concurrencySettings.maxWorkers; |
| 382 | | 0 | int get maxParallel => concurrencySettings.maxParallel; |
| 383 | | 9 | int get maxConcurrency => concurrencySettings.maxConcurrency; |
| 384 | | | } |