LCOV - code coverage report

Current view
top level - src/pool - _worker_stream_task.dart
Test
lcov.info
Date
2025-09-28
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines3030100.0%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2
3import '../_impl/xplat/_forward_stream_controller.dart';
4import '../exceptions/squadron_exception.dart';
5import '../stats/perf_counter.dart';
6import '../worker/worker.dart';
7import '_worker_task.dart';
8import 'stream_task.dart';
9
10final class WorkerStreamTask<T, W extends Worker> extends WorkerTask<T, W>
11 implements StreamTask<T> {
12 /// Creates a new [StreamTask].
138 WorkerStreamTask(this._producer, PerfCounter? counter) : super(counter) {
1412 _controller = ForwardStreamController<T>(onListen: () async {
15 try {
168 final worker = await _worker.future;
174 if (canceledException != null || worker == null) {
18 // the task was canceled
192 throw canceledException!;
20 } else {
21 // otherwise, forward data from the worker
2220 _controller.attachSubscription(_producer(worker).listen(
234 _onData,
244 onError: _onError,
258 onDone: _controller.close,
26 cancelOnError: false,
27 ));
28 }
29 } catch (ex, st) {
304 _closeWithError(SquadronException.from(ex, st));
31 }
32 });
33 }
34
35 final Stream<T> Function(W worker) _producer;
36 final _worker = Completer<W?>();
37
38 late final ForwardStreamController<T> _controller;
39
404 @override
418 Stream<T> get stream => _controller.stream;
42
432 void _closeWithError(SquadronException ex) {
444 _controller.addError(ex);
454 _controller.close();
46 }
47
4812 void _onData(T data) => _controller.add(data);
49
503 void _onError(Object ex, [StackTrace? st]) =>
519 _controller.addError(SquadronException.from(ex, st));
52
532 @override
54 void cancel([String? message]) {
552 super.cancel(message);
564 if (!_worker.isCompleted) {
57 // task canceled before it got scheduled
584 _worker.complete(null);
59 }
604 if (_controller.subscription != null) {
61 // task canceled after a listener subscribed to the stream
622 _closeWithError(canceledException!);
63 }
64 }
65
664 @override
67 Future<bool> execute(W worker) {
684 if (canceledException == null) {
69 // run with worker
708 _worker.complete(worker);
71 }
728 return _controller.done
7312 .then((_) => canceledException == null)
744 .catchError((_) => false);
75 }
76}
Choose Features