LCOV - code coverage report

Current view
top level - src/worker - worker.dart
Test
lcov.info
Date
2025-07-05
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines707988.6%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2
3import 'package:cancelation_token/cancelation_token.dart';
4import 'package:logger/web.dart';
5import 'package:using/using.dart';
6
7import '../_impl/xplat/_forward_completer.dart';
8import '../_impl/xplat/_forward_stream_controller.dart';
9import '../_impl/xplat/_time_stamp.dart';
10import '../channel.dart';
11import '../exceptions/exception_manager.dart';
12import '../exceptions/squadron_exception.dart';
13import '../exceptions/task_terminated_exception.dart';
14import '../exceptions/worker_exception.dart';
15import '../invoker.dart';
16import '../iworker.dart';
17import '../stats/worker_stat.dart';
18import '../tokens/_squadron_cancelation_token.dart';
19import '../typedefs.dart';
20import '../worker/worker_request.dart';
21import '../worker_service.dart';
22
23part 'worker.stats.dart';
24
25/// Base worker class.
26///
27/// This base class takes care of creating the [Channel] and firing up the
28/// worker. Typically, derived classes should add proxy methods sending
29/// [WorkerRequest]s to the worker.
30abstract class Worker
31 with Releasable
32 implements WorkerService, IWorker, Invoker {
33 /// Creates a [Worker] with the specified entrypoint.
3411 Worker(this._entryPoint,
35 {PlatformThreadHook? threadHook, ExceptionManager? exceptionManager})
36 : _threadHook = threadHook,
37 _exceptionManager = exceptionManager;
38
3911 @override
40 void release() {
4111 stop();
4211 super.release();
43 }
44
45 /// The [Worker]'s entry point; typically, a top-level function in native
46 /// world or the Uri to a JavaScript file in browser world.
47 final EntryPoint _entryPoint;
48
49 @override
50 Logger? channelLogger;
51
5211 @override
53 ExceptionManager get exceptionManager =>
5421 (_exceptionManager ??= ExceptionManager());
55 ExceptionManager? _exceptionManager;
56
57 final PlatformThreadHook? _threadHook;
58
59 /// The [Worker]'s start arguments.
60 List? getStartArgs();
61
6222 late final _stats = _Stats(this);
63
6433 bool get isStopped => _stats.isStopped;
65
66 /// [Worker] statistics.
670 @Deprecated('Use getStats()')
680 WorkerStat get stats => _stats.snapshot;
69
70 /// [Worker] statistics.
7118 WorkerStat getStats() => _stats.snapshot;
72
73 /// Returns true if the [Worker] is connected i.e., it has a valid [Channel].
74 /// Returns false otherwise.
752 bool get isConnected => _channel != null;
76
77 /// Shared [Channel] that can be used to communicate with the worker.
786 Channel? getSharedChannel() => _channel?.share();
79
80 Channel? _channel;
81 Future<Channel>? _openChannel;
82
83 /// Sends a workload to the worker.
8410 @override
85 Future<dynamic> send(
86 int command, {
87 List args = const [],
88 CancelationToken? token,
89 bool inspectRequest = false,
90 bool inspectResponse = false,
91 }) async {
920 token?.throwIfCanceled();
93
94 // get the channel, start the worker if necessary
9518 final channel = _channel ?? await start();
96
9710 final completer = ForwardCompleter();
98
990 final squadronToken = token?.wrap();
1000 squadronToken?.onCanceled.then((ex) {
1010 _channel?.cancelToken(squadronToken);
1020 completer.failure(SquadronException.from(ex, null, command));
103 });
104
10520 _stats.beginWork();
10650 completer.future.whenComplete(_stats.endWork).ignore();
107
108 try {
10910 final res = await channel.sendRequest(
110 command,
111 args,
112 token: squadronToken,
113 inspectRequest: inspectRequest,
114 inspectResponse: inspectResponse,
115 );
11610 completer.success(res);
117 } catch (ex, st) {
1186 _stats.failed();
1196 completer.failure(SquadronException.from(ex, st, command));
120 }
121
12210 return completer.future;
123 }
124
125 /// Sends a streaming workload to the worker.
1265 @override
127 Stream<dynamic> stream(
128 int command, {
129 List args = const [],
130 CancelationToken? token,
131 bool inspectRequest = false,
132 bool inspectResponse = false,
133 }) {
1342 final squadronToken = token?.wrap();
135
136 late final ForwardStreamController controller;
137
1386 squadronToken?.onCanceled.then((ex) {
1392 if (!controller.isClosed) {
1404 controller.subscription?.cancel();
1414 controller.addError(SquadronException.from(ex, null, command));
1422 controller.close();
143 }
1444 _channel?.cancelToken(squadronToken);
145 });
146
14710 controller = ForwardStreamController(onListen: () async {
148 try {
1495 if (controller.isClosed) return;
1502 squadronToken?.throwIfCanceled();
1518 final channel = _channel ?? await start();
1525 if (controller.isClosed) return;
153
15410 _stats.beginWork();
15525 controller.done.whenComplete(_stats.endWork).ignore();
156
1575 controller.attachSubscription(channel
1585 .sendStreamingRequest(
159 command,
160 args,
161 token: squadronToken,
162 inspectRequest: inspectRequest,
163 inspectResponse: inspectResponse,
164 )
1655 .listen(
1665 controller.add,
1673 onError: (ex, st) =>
1686 controller.addError(SquadronException.from(ex, st, command)),
1695 onDone: controller.close,
170 cancelOnError: false,
171 ));
172 } catch (ex, st) {
1732 _stats.failed();
1741 controller.subscription?.cancel();
1752 controller.addError(SquadronException.from(ex, st, command));
1761 controller.close();
177 }
178 });
179
1805 return controller.stream;
181 }
182
183 /// Creates a [Channel] and starts the worker using the [_entryPoint].
18411 @override
185 Future<Channel> start() async {
18611 if (isStopped) {
1871 throw WorkerException('Invalid state: worker is stopped');
188 }
189
19011 final args = getStartArgs() ?? const [];
19122 _openChannel ??= Channel.open(
19244 exceptionManager, channelLogger, _entryPoint, args, _threadHook);
19322 final channel = _channel ?? await _openChannel;
19410 if (_channel == null) {
19510 _channel = channel;
19620 _stats.start();
197 }
19810 return _channel!;
199 }
200
201 /// Stops this worker.
20211 @override
203 void stop() {
20411 if (!isStopped) {
20514 channelLogger?.d('Stop worker');
20622 _stats.stop();
20711 _openChannel = null;
20821 _channel?.close();
20911 _channel = null;
210 }
211 }
212
213 /// Terminates this worker.
2142 @override
215 void terminate([TaskTerminatedException? ex]) {
216 // terminate channel and stop worker
2172 ex ??= TaskTerminatedException('Worker has been terminated');
2184 _channel?.terminate(ex);
2192 stop();
220 }
221
222 /// Workers do not need an [operations] map.
2230 @override
2240 OperationsMap get operations => WorkerService.noOperations;
225}
Choose Features