LCOV - code coverage report

Current view
top level - src/worker - worker.dart
Test
lcov.info
Date
2026-02-21
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines728188.9%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2
3import 'package:cancelation_token/cancelation_token.dart';
4import 'package:logger/web.dart';
5import 'package:using/using.dart';
6
7import '../_impl/xplat/_forward_completer.dart';
8import '../_impl/xplat/_forward_stream_controller.dart';
9import '../_impl/xplat/_time_stamp.dart';
10import '../channel.dart';
11import '../exceptions/exception_manager.dart';
12import '../exceptions/squadron_exception.dart';
13import '../exceptions/task_terminated_exception.dart';
14import '../exceptions/worker_exception.dart';
15import '../invoker.dart';
16import '../iworker.dart';
17import '../stats/worker_stat.dart';
18import '../tokens/_squadron_cancelation_token.dart';
19import '../typedefs.dart';
20import '../worker/worker_request.dart';
21import '../worker_service.dart';
22
23part 'worker.stats.dart';
24
25/// Base worker class.
26///
27/// This base class takes care of creating the [Channel] and firing up the
28/// worker. Typically, derived classes should add proxy methods sending
29/// [WorkerRequest]s to the worker.
30abstract class Worker
31 with Releasable
32 implements WorkerService, IWorker, Invoker {
33 /// Creates a [Worker] with the specified entrypoint.
3411 Worker(this._entryPoint,
35 {PlatformThreadHook? threadHook, ExceptionManager? exceptionManager})
36 : _threadHook = threadHook,
37 _exceptionManager = exceptionManager {
3822 _stats = _Stats(this);
39 }
40
4111 @override
42 void release() {
4311 stop();
4411 super.release();
45 }
46
47 /// The [Worker]'s entry point; typically, a top-level function in native
48 /// world or the Uri to a JavaScript file in browser world.
49 final EntryPoint _entryPoint;
50
51 @override
52 Logger? channelLogger;
53
5411 @override
55 ExceptionManager get exceptionManager =>
5621 (_exceptionManager ??= ExceptionManager());
57 ExceptionManager? _exceptionManager;
58
59 final PlatformThreadHook? _threadHook;
60
61 /// The [Worker]'s start arguments.
62 List? getStartArgs();
63
64 late final _Stats _stats;
65
6633 bool get isStopped => _stats.isStopped;
67
68 /// [Worker] statistics.
690 @Deprecated('Use getStats()')
700 WorkerStat get stats => _stats.snapshot;
71
72 /// [Worker] statistics.
7318 WorkerStat getStats() => _stats.snapshot;
74
75 /// Returns true if the [Worker] is connected i.e., it has a valid [Channel].
76 /// Returns false otherwise.
772 bool get isConnected => _channel != null;
78
79 /// Shared [Channel] that can be used to communicate with the worker.
806 Channel? getSharedChannel() => _channel?.share();
81
82 Channel? _channel;
83 Future<Channel>? _openChannel;
84
85 /// Sends a workload to the worker.
8610 @override
87 Future<dynamic> send(
88 int command, {
89 List args = const [],
90 CancelationToken? token,
91 bool inspectRequest = false,
92 bool inspectResponse = false,
93 }) async {
940 token?.throwIfCanceled();
95
96 // get the channel, start the worker if necessary
9718 final channel = _channel ?? await start();
98
9910 final completer = ForwardCompleter();
100
1010 final squadronToken = token?.wrap();
1020 squadronToken?.onCanceled.then((ex) {
1030 _channel?.cancelToken(squadronToken);
1040 completer.failure(SquadronException.from(ex, null, command));
105 });
106
10720 _stats.beginWork();
10850 completer.future.whenComplete(_stats.endWork).ignore();
109
110 try {
11110 final res = await channel.sendRequest(
112 command,
113 args,
114 token: squadronToken,
115 inspectRequest: inspectRequest,
116 inspectResponse: inspectResponse,
117 );
11810 completer.success(res);
119 } catch (ex, st) {
1206 _stats.failed();
1216 completer.failure(SquadronException.from(ex, st, command));
122 }
123
12410 return completer.future;
125 }
126
127 /// Sends a streaming workload to the worker.
1285 @override
129 Stream<dynamic> stream(
130 int command, {
131 List args = const [],
132 CancelationToken? token,
133 bool inspectRequest = false,
134 bool inspectResponse = false,
135 }) {
1362 final squadronToken = token?.wrap();
137
138 late final ForwardStreamController controller;
139
1406 squadronToken?.onCanceled.then((ex) {
1412 if (!controller.isClosed) {
1424 controller.subscription?.cancel();
1434 controller.safeAddError(SquadronException.from(ex, null, command));
1442 controller.close();
145 }
1464 _channel?.cancelToken(squadronToken);
147 });
148
14910 controller = ForwardStreamController(onListen: () async {
150 try {
1515 if (controller.isClosed) return;
1522 squadronToken?.throwIfCanceled();
1538 final channel = _channel ?? await start();
1545 if (controller.isClosed) return;
155
15610 _stats.beginWork();
15725 controller.done.whenComplete(_stats.endWork).ignore();
158
1595 controller.attachSubscription(channel
1605 .sendStreamingRequest(
161 command,
162 args,
163 token: squadronToken,
164 inspectRequest: inspectRequest,
165 inspectResponse: inspectResponse,
166 )
1675 .listen(
1685 controller.safeAdd,
1693 onError: (ex, st) => controller
1706 .safeAddError(SquadronException.from(ex, st, command)),
1715 onDone: controller.close,
172 cancelOnError: false,
173 ));
174 } catch (ex, st) {
1752 _stats.failed();
1761 controller.subscription?.cancel();
1772 controller.safeAddError(SquadronException.from(ex, st, command));
1781 controller.close();
179 }
180 });
181
1825 return controller.stream;
183 }
184
185 /// Creates a [Channel] and starts the worker using the [_entryPoint].
18611 @override
187 Future<Channel> start() {
18811 if (isStopped) {
1891 throw WorkerException('Invalid state: worker is stopped');
190 }
191
19211 if (_channel != null) {
1930 return Future.value(_channel!);
194 }
195
19622 return _openChannel ??= Channel.open(
19711 exceptionManager,
19811 channelLogger,
19911 _entryPoint,
20011 getStartArgs() ?? const [],
20111 _threadHook,
20221 ).then((channel) {
20310 _channel = channel;
20420 _stats.start();
205 return channel;
206 });
207 }
208
209 /// Stops this worker.
21011 @override
211 void stop() {
21211 if (!isStopped) {
21314 channelLogger?.d('Stop worker');
21422 _stats.stop();
21511 _openChannel = null;
21621 _channel?.close();
21711 _channel = null;
218 }
219 }
220
221 /// Terminates this worker.
2222 @override
223 void terminate([TaskTerminatedException? ex]) {
224 // terminate channel and stop worker
2252 ex ??= TaskTerminatedException('Worker has been terminated');
2264 _channel?.terminate(ex);
2272 stop();
228 }
229
230 /// Workers do not need an [operations] map.
2310 @override
232 OperationsMap get operations => WorkerService.noOperations;
233}
Choose Features