| 1 |  |  | import 'dart:async'; | 
                
                
                    | 2 |  |  | import 'dart:collection'; | 
                
                
                    | 3 |  |  |  | 
                
                
                    | 4 |  |  | import 'package:logger/web.dart'; | 
                
                
                    | 5 |  |  | import 'package:using/using.dart'; | 
                
                
                    | 6 |  |  |  | 
                
                
                    | 7 |  |  | import '../concurrency_settings.dart'; | 
                
                
                    | 8 |  |  | import '../exceptions/exception_manager.dart'; | 
                
                
                    | 9 |  |  | import '../exceptions/squadron_error.dart'; | 
                
                
                    | 10 |  |  | import '../exceptions/squadron_exception.dart'; | 
                
                
                    | 11 |  |  | import '../exceptions/task_terminated_exception.dart'; | 
                
                
                    | 12 |  |  | import '../exceptions/worker_exception.dart'; | 
                
                
                    | 13 |  |  | import '../iworker.dart'; | 
                
                
                    | 14 |  |  | import '../stats/perf_counter.dart'; | 
                
                
                    | 15 |  |  | import '../stats/worker_stat.dart'; | 
                
                
                    | 16 |  |  | import '../typedefs.dart'; | 
                
                
                    | 17 |  |  | import '../worker/worker.dart'; | 
                
                
                    | 18 |  |  | import '../worker_service.dart'; | 
                
                
                    | 19 |  |  | import '_pool_worker.dart'; | 
                
                
                    | 20 |  |  | import '_worker_stream_task.dart'; | 
                
                
                    | 21 |  |  | import '_worker_task.dart'; | 
                
                
                    | 22 |  |  | import '_worker_value_task.dart'; | 
                
                
                    | 23 |  |  | import 'stream_task.dart'; | 
                
                
                    | 24 |  |  | import 'task.dart'; | 
                
                
                    | 25 |  |  | import 'value_task.dart'; | 
                
                
                    | 26 |  |  |  | 
                
                
                    | 27 |  |  | typedef WorkerFactory<W> = W Function(ExceptionManager); | 
                
                
                    | 28 |  |  |  | 
                
                
                    | 29 |  |  | /// Worker pool responsible for instantiating, starting and stopping workers running in parallel. | 
                
                
                    | 30 |  |  | /// A [WorkerPool] is also responsible for creating and assigning [WorkerTask]s to [Worker]s. | 
                
                
                    | 31 |  |  | abstract class WorkerPool<W extends Worker> | 
                
                
                    | 32 |  |  |     with Releasable | 
                
                
                    | 33 |  |  |     implements WorkerService, IWorker { | 
                
                
                    | 34 |  |  |   /// Create a worker pool. | 
                
                
                    | 35 |  |  |   /// | 
                
                
                    | 36 |  |  |   /// Workers are instantiated using the provided [_workerFactory]. | 
                
                
                    | 37 |  |  |   /// The pool will only instantiate workers as needed, depending on [concurrencySettings]. | 
                
                
                    | 38 |  |  |   /// The [ConcurrencySettings.minWorkers] and [ConcurrencySettings.maxWorkers] settings control | 
                
                
                    | 39 |  |  |   /// how many workers will live in the pool. The [ConcurrencySettings.maxParallel] setting | 
                
                
                    | 40 |  |  |   /// controls how many tasks can be posted to each individual worker in the pool. | 
                
                
                    | 41 |  | 4 |   WorkerPool(this._workerFactory, | 
                
                
                    | 42 |  |  |       {ConcurrencySettings? concurrencySettings, | 
                
                
                    | 43 |  |  |       ExceptionManager? exceptionManager}) | 
                
                
                    | 44 |  | 0 |       : concurrencySettings = concurrencySettings ?? ConcurrencySettings(), | 
                
                
                    | 45 |  | 3 |         _exceptionManager = exceptionManager ?? ExceptionManager(); | 
                
                
                    | 46 |  |  |  | 
                
                
                    | 47 |  | 4 |   @override | 
                
                
                    | 48 |  |  |   void release() { | 
                
                
                    | 49 |  | 4 |     stop(); | 
                
                
                    | 50 |  | 4 |     super.release(); | 
                
                
                    | 51 |  |  |   } | 
                
                
                    | 52 |  |  |  | 
                
                
                    | 53 |  |  |   final WorkerFactory<W> _workerFactory; | 
                
                
                    | 54 |  |  |  | 
                
                
                    | 55 |  |  |   @override | 
                
                
                    | 56 |  |  |   Logger? channelLogger; | 
                
                
                    | 57 |  |  |  | 
                
                
                    | 58 |  | 4 |   @override | 
                
                
                    | 59 |  |  |   ExceptionManager get exceptionManager => | 
                
                
                    | 60 |  | 4 |       (_exceptionManager ??= ExceptionManager()); | 
                
                
                    | 61 |  |  |   ExceptionManager? _exceptionManager; | 
                
                
                    | 62 |  |  |  | 
                
                
                    | 63 |  |  |   /// Concurrency settings. | 
                
                
                    | 64 |  |  |   final ConcurrencySettings concurrencySettings; | 
                
                
                    | 65 |  |  |  | 
                
                
                    | 66 |  |  |   final _workers = <PoolWorker<W>>[]; | 
                
                
                    | 67 |  |  |  | 
                
                
                    | 68 |  |  |   final _deadWorkerStats = <WorkerStat>[]; | 
                
                
                    | 69 |  |  |  | 
                
                
                    | 70 |  |  |   /// Whether this pool is scheduled for stopping. | 
                
                
                    | 71 |  | 2 |   bool get stopped => _stopped; | 
                
                
                    | 72 |  |  |   bool _stopped = false; | 
                
                
                    | 73 |  |  |  | 
                
                
                    | 74 |  |  |   /// Number of workers. | 
                
                
                    | 75 |  | 3 |   int get size => _workers.length; | 
                
                
                    | 76 |  |  |  | 
                
                
                    | 77 |  |  |   /// Maximum number of workers. | 
                
                
                    | 78 |  | 0 |   int get maxSize => _maxSize; | 
                
                
                    | 79 |  |  |   int _maxSize = 0; | 
                
                
                    | 80 |  |  |  | 
                
                
                    | 81 |  |  |   final _workerPoolListeners = <Object, void Function(WorkerStat, bool)>{}; | 
                
                
                    | 82 |  |  |  | 
                
                
                    | 83 |  |  |   /// Registers a callback to be invoked when a worker thread is added or removed from the pool. | 
                
                
                    | 84 |  | 0 |   Object registerWorkerPoolListener(void Function(WorkerStat, bool) listener) { | 
                
                
                    | 85 |  | 0 |     final token = Object(); | 
                
                
                    | 86 |  | 0 |     _workerPoolListeners[token] = listener; | 
                
                
                    | 87 |  |  |     return token; | 
                
                
                    | 88 |  |  |   } | 
                
                
                    | 89 |  |  |  | 
                
                
                    | 90 |  |  |   /// Unregisters a callback. | 
                
                
                    | 91 |  | 0 |   void unregisterWorkerPoolListener( | 
                
                
                    | 92 |  |  |       {Function(WorkerStat, bool)? listener, Object? token}) { | 
                
                
                    | 93 |  |  |     if (token != null) { | 
                
                
                    | 94 |  | 0 |       _workerPoolListeners.remove(token); | 
                
                
                    | 95 |  |  |     } else if (listener != null) { | 
                
                
                    | 96 |  | 0 |       _workerPoolListeners.removeWhere((key, value) => value == listener); | 
                
                
                    | 97 |  |  |     } | 
                
                
                    | 98 |  |  |   } | 
                
                
                    | 99 |  |  |  | 
                
                
                    | 100 |  |  |   int _startingWorkers = 0; | 
                
                
                    | 101 |  |  |  | 
                
                
                    | 102 |  | 4 |   int _getProvisionNeeds(int workload) { | 
                
                
                    | 103 |  | 8 |     final minWorkers = concurrencySettings.minWorkers; | 
                
                
                    | 104 |  | 4 |     if (workload < minWorkers) { | 
                
                
                    | 105 |  |  |       // at least minWorkers | 
                
                
                    | 106 |  |  |       workload = minWorkers; | 
                
                
                    | 107 |  |  |     } | 
                
                
                    | 108 |  | 8 |     final maxWorkers = concurrencySettings.maxWorkers; | 
                
                
                    | 109 |  | 8 |     if (maxWorkers > 0 && workload > maxWorkers) { | 
                
                
                    | 110 |  |  |       // at most maxWorkers if > 0 | 
                
                
                    | 111 |  |  |       workload = maxWorkers; | 
                
                
                    | 112 |  |  |     } | 
                
                
                    | 113 |  |  |     // adjust by _workers.length and _startingWorkers | 
                
                
                    | 114 |  | 20 |     return workload - _workers.length - _startingWorkers; | 
                
                
                    | 115 |  |  |   } | 
                
                
                    | 116 |  |  |  | 
                
                
                    | 117 |  | 4 |   Future<void> _provisionWorkers(int workload) { | 
                
                
                    | 118 |  | 8 |     final tasks = <Future>[], errors = []; | 
                
                
                    | 119 |  | 8 |     final maxParallel = concurrencySettings.maxParallel; | 
                
                
                    | 120 |  | 8 |     for (var i = 0; i < workload; i++) { | 
                
                
                    | 121 |  |  |       try { | 
                
                
                    | 122 |  | 12 |         final worker = _workerFactory(exceptionManager); | 
                
                
                    | 123 |  | 8 |         worker.channelLogger = channelLogger; | 
                
                
                    | 124 |  |  |  | 
                
                
                    | 125 |  | 4 |         final poolWorker = PoolWorker(worker, maxParallel); | 
                
                
                    | 126 |  | 8 |         _startingWorkers++; | 
                
                
                    | 127 |  | 20 |         tasks.add(poolWorker.worker.start().whenComplete(() { | 
                
                
                    | 128 |  | 8 |           _startingWorkers--; | 
                
                
                    | 129 |  | 8 |         }).then((_) { | 
                
                
                    | 130 |  |  |           // start succeeded: register worker | 
                
                
                    | 131 |  | 4 |           _addWorkerAndNotify(poolWorker); | 
                
                
                    | 132 |  | 5 |         }).catchError((ex, st) { | 
                
                
                    | 133 |  |  |           // start failed, ensure the worker is stopped | 
                
                
                    | 134 |  | 2 |           poolWorker.worker.terminate(); | 
                
                
                    | 135 |  | 2 |           errors.add(SquadronException.from(ex, st)); | 
                
                
                    | 136 |  |  |         })); | 
                
                
                    | 137 |  |  |       } catch (ex, st) { | 
                
                
                    | 138 |  | 0 |         errors.add(SquadronException.from(ex, st)); | 
                
                
                    | 139 |  |  |       } | 
                
                
                    | 140 |  |  |     } | 
                
                
                    | 141 |  |  |  | 
                
                
                    | 142 |  | 12 |     return Future.wait(tasks).whenComplete(() { | 
                
                
                    | 143 |  | 16 |       if (_workers.length > _maxSize) { | 
                
                
                    | 144 |  | 12 |         _maxSize = _workers.length; | 
                
                
                    | 145 |  |  |       } | 
                
                
                    | 146 |  | 4 |       if (errors.isNotEmpty) { | 
                
                
                    | 147 |  | 3 |         if (errors.length < tasks.length) { | 
                
                
                    | 148 |  |  |           // some tasks failed: warn | 
                
                
                    | 149 |  | 0 |           channelLogger?.e(() => 'Error while provisionning workers: $errors'); | 
                
                
                    | 150 |  |  |         } else { | 
                
                
                    | 151 |  |  |           // all tasks failed: throw | 
                
                
                    | 152 |  | 3 |           throw errors.firstWhere((e) => e is SquadronError, | 
                
                
                    | 153 |  | 1 |                   orElse: () => null) ?? | 
                
                
                    | 154 |  | 3 |               errors.firstWhere((e) => e is WorkerException, | 
                
                
                    | 155 |  | 0 |                   orElse: () => null) ?? | 
                
                
                    | 156 |  | 0 |               errors.first; | 
                
                
                    | 157 |  |  |         } | 
                
                
                    | 158 |  |  |       } | 
                
                
                    | 159 |  |  |     }); | 
                
                
                    | 160 |  |  |   } | 
                
                
                    | 161 |  |  |  | 
                
                
                    | 162 |  |  |   /// Ensure at least [ConcurrencySettings.minWorkers] workers are started | 
                
                
                    | 163 |  |  |   /// (defaulting to 1 if [ConcurrencySettings.minWorkers] is zero). | 
                
                
                    | 164 |  | 2 |   @override | 
                
                
                    | 165 |  |  |   FutureOr<void> start() { | 
                
                
                    | 166 |  | 2 |     _stopped = false; | 
                
                
                    | 167 |  | 6 |     final needs = _getProvisionNeeds(_queue.isEmpty ? 1 : _queue.length); | 
                
                
                    | 168 |  | 2 |     if (needs > 0) { | 
                
                
                    | 169 |  | 2 |       return _provisionWorkers(needs); | 
                
                
                    | 170 |  |  |     } | 
                
                
                    | 171 |  |  |   } | 
                
                
                    | 172 |  |  |  | 
                
                
                    | 173 |  | 4 |   void _notify(WorkerStat stats, {required bool removed}) { | 
                
                
                    | 174 |  | 8 |     if (_workerPoolListeners.isNotEmpty) { | 
                
                
                    | 175 |  | 0 |       for (var listener in _workerPoolListeners.values) { | 
                
                
                    | 176 |  |  |         try { | 
                
                
                    | 177 |  | 0 |           listener(stats, removed); | 
                
                
                    | 178 |  |  |         } catch (ex) { | 
                
                
                    | 179 |  |  |           // swallow error from user land | 
                
                
                    | 180 |  |  |         } | 
                
                
                    | 181 |  |  |       } | 
                
                
                    | 182 |  |  |     } | 
                
                
                    | 183 |  |  |   } | 
                
                
                    | 184 |  |  |  | 
                
                
                    | 185 |  | 4 |   int _removeWorkerAndNotify(PoolWorker<W> poolWorker, bool force) { | 
                
                
                    | 186 |  | 5 |     if (force || _workers.length > concurrencySettings.minWorkers) { | 
                
                
                    | 187 |  | 4 |       final worker = poolWorker.worker; | 
                
                
                    | 188 |  | 4 |       worker.stop(); | 
                
                
                    | 189 |  | 8 |       if (_workers.remove(poolWorker)) { | 
                
                
                    | 190 |  | 4 |         final stats = worker.getStats(); | 
                
                
                    | 191 |  | 8 |         _deadWorkerStats.add(stats); | 
                
                
                    | 192 |  | 4 |         _notify(stats, removed: true); | 
                
                
                    | 193 |  |  |         return 1; | 
                
                
                    | 194 |  |  |       } | 
                
                
                    | 195 |  |  |     } | 
                
                
                    | 196 |  |  |     return 0; | 
                
                
                    | 197 |  |  |   } | 
                
                
                    | 198 |  |  |  | 
                
                
                    | 199 |  | 4 |   void _addWorkerAndNotify(PoolWorker<W> poolWorker) { | 
                
                
                    | 200 |  | 8 |     _workers.add(poolWorker); | 
                
                
                    | 201 |  | 12 |     _notify(poolWorker.worker.getStats(), removed: false); | 
                
                
                    | 202 |  |  |   } | 
                
                
                    | 203 |  |  |  | 
                
                
                    | 204 |  |  |   /// Stop idle pool workers matching the [predicate]. | 
                
                
                    | 205 |  |  |   /// If [predicate] is null or not provided, all workers will be stopped. | 
                
                
                    | 206 |  |  |   /// Stopping a worker does not interrupt or cancel processing. Workers will | 
                
                
                    | 207 |  |  |   /// complete pending tasks before shutting down. In the meantime, they will | 
                
                
                    | 208 |  |  |   /// not receive any new workload. | 
                
                
                    | 209 |  |  |   /// Returns the number of workers that have been stopped. | 
                
                
                    | 210 |  | 4 |   @override | 
                
                
                    | 211 |  |  |   int stop([bool Function(W worker)? predicate]) { | 
                
                
                    | 212 |  |  |     List<PoolWorker<W>> targets; | 
                
                
                    | 213 |  |  |     bool force = (predicate == null); | 
                
                
                    | 214 |  | 8 |     _workers.sort(PoolWorker.compareCapacity); | 
                
                
                    | 215 |  |  |     if (force) { | 
                
                
                    | 216 |  |  |       // kill workers while keeping enough workers alive to process pending tasks | 
                
                
                    | 217 |  | 24 |       targets = _workers.reversed.skip(_queue.length).toList(); | 
                
                
                    | 218 |  | 4 |       _stopped = true; | 
                
                
                    | 219 |  |  |     } else { | 
                
                
                    | 220 |  |  |       // kill workers that are idle and satisfy the predicate | 
                
                
                    | 221 |  | 7 |       targets = _workers.where((w) => w.isIdle && predicate(w.worker)).toList(); | 
                
                
                    | 222 |  |  |     } | 
                
                
                    | 223 |  |  |     var stopped = 0; | 
                
                
                    | 224 |  | 8 |     for (var poolWorker in targets) { | 
                
                
                    | 225 |  | 8 |       stopped += _removeWorkerAndNotify(poolWorker, force); | 
                
                
                    | 226 |  |  |     } | 
                
                
                    | 227 |  |  |     return stopped; | 
                
                
                    | 228 |  |  |   } | 
                
                
                    | 229 |  |  |  | 
                
                
                    | 230 |  | 1 |   @override | 
                
                
                    | 231 |  |  |   void terminate([TaskTerminatedException? ex]) { | 
                
                
                    | 232 |  | 1 |     _stopped = true; | 
                
                
                    | 233 |  | 5 |     for (var i = _workers.length - 1; i >= 0; i--) { | 
                
                
                    | 234 |  | 2 |       final w = _workers[i]; | 
                
                
                    | 235 |  | 2 |       w.worker.terminate(ex); | 
                
                
                    | 236 |  | 1 |       _removeWorkerAndNotify(w, true); | 
                
                
                    | 237 |  |  |     } | 
                
                
                    | 238 |  |  |   } | 
                
                
                    | 239 |  |  |  | 
                
                
                    | 240 |  |  |   final _queue = Queue<WorkerTask>(); | 
                
                
                    | 241 |  |  |   final _executing = <WorkerTask>{}; | 
                
                
                    | 242 |  |  |  | 
                
                
                    | 243 |  |  |   /// Gets remaining workload | 
                
                
                    | 244 |  | 6 |   int get pendingWorkload => _queue.length; | 
                
                
                    | 245 |  |  |  | 
                
                
                    | 246 |  | 4 |   WorkerTask<T, W> _enqueue<T>(WorkerTask<T, W> task) { | 
                
                
                    | 247 |  | 4 |     if (_stopped) { | 
                
                
                    | 248 |  | 1 |       throw SquadronErrorImpl.create( | 
                
                
                    | 249 |  |  |         'The pool cannot accept new requests because it is stopped', | 
                
                
                    | 250 |  |  |       ); | 
                
                
                    | 251 |  |  |     } | 
                
                
                    | 252 |  | 8 |     _queue.addLast(task); | 
                
                
                    | 253 |  | 4 |     _schedule(); | 
                
                
                    | 254 |  |  |     return task; | 
                
                
                    | 255 |  |  |   } | 
                
                
                    | 256 |  |  |  | 
                
                
                    | 257 |  |  |   /// Registers and schedules a [task] that returns a single value. | 
                
                
                    | 258 |  |  |   /// Returns a future that completes with the task's value. | 
                
                
                    | 259 |  | 4 |   Future<T> execute<T>(Future<T> Function(W worker) task, | 
                
                
                    | 260 |  |  |           {PerfCounter? counter}) => | 
                
                
                    | 261 |  | 8 |       scheduleValueTask(task, counter: counter).value; | 
                
                
                    | 262 |  |  |  | 
                
                
                    | 263 |  |  |   /// Registers and schedules a [task] that returns a stream of values. | 
                
                
                    | 264 |  |  |   /// Returns a stream containing the task's values. | 
                
                
                    | 265 |  | 4 |   Stream<T> stream<T>(Stream<T> Function(W worker) task, | 
                
                
                    | 266 |  |  |           {PerfCounter? counter}) => | 
                
                
                    | 267 |  | 8 |       scheduleStreamTask(task, counter: counter).stream; | 
                
                
                    | 268 |  |  |  | 
                
                
                    | 269 |  |  |   /// Registers and schedules a [task] that returns a single value. | 
                
                
                    | 270 |  |  |   /// Returns a [ValueTask]. | 
                
                
                    | 271 |  | 4 |   ValueTask<T> scheduleValueTask<T>(Future<T> Function(W worker) task, | 
                
                
                    | 272 |  |  |           {PerfCounter? counter}) => | 
                
                
                    | 273 |  | 8 |       _enqueue<T>(WorkerValueTask<T, W>(task, counter)) as ValueTask<T>; | 
                
                
                    | 274 |  |  |  | 
                
                
                    | 275 |  |  |   /// Registers and schedules a [task] that returns a stream of values. | 
                
                
                    | 276 |  |  |   /// Returns a [StreamTask]. | 
                
                
                    | 277 |  | 4 |   StreamTask<T> scheduleStreamTask<T>(Stream<T> Function(W worker) task, | 
                
                
                    | 278 |  |  |           {PerfCounter? counter}) => | 
                
                
                    | 279 |  | 8 |       _enqueue<T>(WorkerStreamTask<T, W>(task, counter)) as StreamTask<T>; | 
                
                
                    | 280 |  |  |  | 
                
                
                    | 281 |  |  |   Timer? _timer; | 
                
                
                    | 282 |  |  |  | 
                
                
                    | 283 |  | 4 |   void _schedule() { | 
                
                
                    | 284 |  | 12 |     if (_timer?.isActive != true) { | 
                
                
                    | 285 |  | 12 |       _timer = Timer(Duration.zero, __schedule); | 
                
                
                    | 286 |  |  |     } | 
                
                
                    | 287 |  |  |   } | 
                
                
                    | 288 |  |  |  | 
                
                
                    | 289 |  |  |   /// Schedule tasks. | 
                
                
                    | 290 |  | 4 |   void __schedule() { | 
                
                
                    | 291 |  | 16 |     if (_workers.isEmpty && _startingWorkers > 0) { | 
                
                
                    | 292 |  |  |       // workers are still starting, defer | 
                
                
                    | 293 |  | 1 |       _schedule(); | 
                
                
                    | 294 |  |  |       return; | 
                
                
                    | 295 |  |  |     } | 
                
                
                    | 296 |  |  |  | 
                
                
                    | 297 |  |  |     // any work to do? | 
                
                
                    | 298 |  | 8 |     if (_queue.isEmpty) { | 
                
                
                    | 299 |  |  |       // no: effectively stop the pool if needed and return | 
                
                
                    | 300 |  | 12 |       if (_stopped && _executing.isEmpty) { | 
                
                
                    | 301 |  | 4 |         stop(); | 
                
                
                    | 302 |  |  |       } | 
                
                
                    | 303 |  |  |       return; | 
                
                
                    | 304 |  |  |     } | 
                
                
                    | 305 |  |  |  | 
                
                
                    | 306 |  |  |     // yes: dispatch tasks to workers | 
                
                
                    | 307 |  | 4 |     _dispatchTasks(); | 
                
                
                    | 308 |  |  |  | 
                
                
                    | 309 |  |  |     // and provision more workers if possible and necessary | 
                
                
                    | 310 |  | 12 |     final needs = _getProvisionNeeds(_queue.length); | 
                
                
                    | 311 |  | 4 |     if (needs > 0) { | 
                
                
                    | 312 |  | 8 |       _provisionWorkers(needs).then( | 
                
                
                    | 313 |  | 8 |         (_) => _dispatchTasks(), | 
                
                
                    | 314 |  | 1 |         onError: (ex) { | 
                
                
                    | 315 |  | 1 |           channelLogger?.e(() => 'Provisionning workers failed with error $ex'); | 
                
                
                    | 316 |  | 2 |           if (_workers.isEmpty) { | 
                
                
                    | 317 |  | 2 |             while (_queue.isNotEmpty) { | 
                
                
                    | 318 |  | 3 |               _queue.removeFirst().cancel('Provisionning workers failed'); | 
                
                
                    | 319 |  |  |             } | 
                
                
                    | 320 |  |  |           } else { | 
                
                
                    | 321 |  | 0 |             _schedule(); | 
                
                
                    | 322 |  |  |           } | 
                
                
                    | 323 |  |  |         }, | 
                
                
                    | 324 |  |  |       ); | 
                
                
                    | 325 |  |  |     } | 
                
                
                    | 326 |  |  |   } | 
                
                
                    | 327 |  |  |  | 
                
                
                    | 328 |  | 4 |   void _dispatchTasks() { | 
                
                
                    | 329 |  | 8 |     _workers.sort(PoolWorker.compareCapacity); | 
                
                
                    | 330 |  | 20 |     for (var idx = _workers.length - 1; idx >= 0; idx--) { | 
                
                
                    | 331 |  | 8 |       final w = _workers[idx]; | 
                
                
                    | 332 |  | 4 |       if (w.isStopped) { | 
                
                
                    | 333 |  | 0 |         _removeWorkerAndNotify(w, false); | 
                
                
                    | 334 |  |  |         continue; | 
                
                
                    | 335 |  |  |       } | 
                
                
                    | 336 |  | 16 |       while (_queue.isNotEmpty && w.capacity > 0) { | 
                
                
                    | 337 |  | 8 |         final task = _queue.removeFirst(); | 
                
                
                    | 338 |  | 4 |         if (!task.isCanceled) { | 
                
                
                    | 339 |  | 8 |           _executing.add(task); | 
                
                
                    | 340 |  | 12 |           w.run(task).whenComplete(() { | 
                
                
                    | 341 |  | 8 |             _executing.remove(task); | 
                
                
                    | 342 |  | 4 |             _schedule(); | 
                
                
                    | 343 |  |  |           }); | 
                
                
                    | 344 |  |  |         } | 
                
                
                    | 345 |  |  |       } | 
                
                
                    | 346 |  |  |     } | 
                
                
                    | 347 |  |  |   } | 
                
                
                    | 348 |  |  |  | 
                
                
                    | 349 |  |  |   /// Task cancelation. If a specific [task] is provided, only this task will be canceled. | 
                
                
                    | 350 |  |  |   /// Otherwise, all tasks registered with the [WorkerPool] are canceled. | 
                
                
                    | 351 |  | 1 |   void cancel(Task task, [String? message]) { | 
                
                
                    | 352 |  | 2 |     _executing.remove(task); | 
                
                
                    | 353 |  | 4 |     _queue.removeWhere((t) => t == task); | 
                
                
                    | 354 |  | 1 |     task.cancel(message); | 
                
                
                    | 355 |  |  |   } | 
                
                
                    | 356 |  |  |  | 
                
                
                    | 357 |  |  |   /// Task cancelation. If a specific [task] is provided, only this task will be canceled. | 
                
                
                    | 358 |  |  |   /// Otherwise, all tasks registered with the [WorkerPool] are canceled. | 
                
                
                    | 359 |  | 1 |   void cancelAll([String? message]) { | 
                
                
                    | 360 |  | 4 |     final toBeCanceled = _executing.followedBy(_queue).toList(); | 
                
                
                    | 361 |  | 2 |     _executing.clear(); | 
                
                
                    | 362 |  | 2 |     _queue.clear(); | 
                
                
                    | 363 |  | 2 |     for (var task in toBeCanceled) { | 
                
                
                    | 364 |  | 1 |       task.cancel(message); | 
                
                
                    | 365 |  |  |     } | 
                
                
                    | 366 |  |  |   } | 
                
                
                    | 367 |  |  |  | 
                
                
                    | 368 |  |  |   /// Worker statistics. | 
                
                
                    | 369 |  | 3 |   Iterable<WorkerStat> get stats => _workers.map(PoolWorker.getStats); | 
                
                
                    | 370 |  |  |  | 
                
                
                    | 371 |  |  |   /// Full worker statistics. | 
                
                
                    | 372 |  | 0 |   Iterable<WorkerStat> get fullStats => _deadWorkerStats.followedBy(stats); | 
                
                
                    | 373 |  |  |  | 
                
                
                    | 374 |  |  |   /// Worker pools do not need an [operations] map. | 
                
                
                    | 375 |  | 0 |   @override | 
                
                
                    | 376 |  | 0 |   OperationsMap get operations => WorkerService.noOperations; | 
                
                
                    | 377 |  |  | } |