LCOV - code coverage report

Current view
top level - src/pool - _worker_stream_task.dart
Test
lcov.info
Date
2026-02-21
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines3030100.0%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2
3import '../_impl/xplat/_forward_stream_controller.dart';
4import '../exceptions/squadron_exception.dart';
5import '../stats/perf_counter.dart';
6import '../worker/worker.dart';
7import '_worker_task.dart';
8import 'stream_task.dart';
9
10final class WorkerStreamTask<T, W extends Worker> extends WorkerTask<T, W>
11 implements StreamTask<T> {
12 /// Creates a new [StreamTask].
138 WorkerStreamTask(this._producer, PerfCounter? counter) : super(counter) {
1412 _controller = ForwardStreamController<T>(onListen: () async {
15 try {
164 if (canceledException != null) {
17 // the task was canceled
181 throw canceledException!;
19 }
208 final worker = await _worker.future;
214 if (canceledException != null || worker == null) {
22 // the task was canceled
231 throw canceledException!;
24 } else {
25 // otherwise, forward data from the worker
2620 _controller.attachSubscription(_producer(worker).listen(
278 _controller.safeAdd,
283 onError: (ex, st) =>
299 _controller.safeAddError(SquadronException.from(ex, st)),
308 onDone: _controller.close,
31 cancelOnError: false,
32 ));
33 }
34 } catch (ex, st) {
354 _closeWithError(SquadronException.from(ex, st));
36 }
37 });
38 }
39
40 final Stream<T> Function(W worker) _producer;
41 final _worker = Completer<W?>();
42
43 late final ForwardStreamController<T> _controller;
44
454 @override
468 Stream<T> get stream => _controller.stream;
47
482 void _closeWithError(SquadronException ex) {
494 _controller.safeAddError(ex);
504 _controller.close();
51 }
52
532 @override
54 void cancel([String? message]) {
552 super.cancel(message);
564 if (!_worker.isCompleted) {
57 // task canceled before it got scheduled
584 _worker.complete(null);
59 }
604 if (_controller.subscription != null) {
61 // task canceled after a listener subscribed to the stream
622 _closeWithError(canceledException!);
63 }
64 }
65
664 @override
67 Future<bool> execute(W worker) {
684 if (canceledException == null) {
69 // run with worker
708 _worker.complete(worker);
71 }
728 return _controller.done
7312 .then((_) => canceledException == null)
744 .catchError((_) => false);
75 }
76}
Choose Features