LCOV - code coverage report

Current view
top level - src/worker - worker.dart
Test
lcov.info
Date
2025-03-26
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines697690.8%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2
3import 'package:cancelation_token/cancelation_token.dart';
4import 'package:logger/web.dart';
5import 'package:using/using.dart';
6
7import '../_impl/xplat/_forward_completer.dart';
8import '../_impl/xplat/_forward_stream_controller.dart';
9import '../_impl/xplat/_time_stamp.dart';
10import '../channel.dart';
11import '../exceptions/exception_manager.dart';
12import '../exceptions/squadron_exception.dart';
13import '../exceptions/task_terminated_exception.dart';
14import '../exceptions/worker_exception.dart';
15import '../invoker.dart';
16import '../iworker.dart';
17import '../stats/worker_stat.dart';
18import '../tokens/_squadron_cancelation_token.dart';
19import '../typedefs.dart';
20import '../worker/worker_request.dart';
21import '../worker_service.dart';
22
23part 'worker.stats.dart';
24
25/// Base worker class.
26///
27/// This base class takes care of creating the [Channel] and firing up the
28/// worker. Typically, derived classes should add proxy methods sending
29/// [WorkerRequest]s to the worker.
30abstract class Worker
31 with Releasable
32 implements WorkerService, IWorker, Invoker {
33 /// Creates a [Worker] with the specified entrypoint.
3411 Worker(this._entryPoint,
35 {PlatformThreadHook? threadHook, ExceptionManager? exceptionManager})
36 : _threadHook = threadHook,
37 _exceptionManager = exceptionManager;
38
3911 @override
40 void release() {
4111 stop();
4211 super.release();
43 }
44
45 /// The [Worker]'s entry point; typically, a top-level function in native
46 /// world or the Uri to a JavaScript file in browser world.
47 final EntryPoint _entryPoint;
48
49 @override
50 Logger? channelLogger;
51
5211 @override
53 ExceptionManager get exceptionManager =>
5421 (_exceptionManager ??= ExceptionManager());
55 ExceptionManager? _exceptionManager;
56
57 final PlatformThreadHook? _threadHook;
58
59 /// The [Worker]'s start arguments.
60 List? getStartArgs();
61
62 /// [Worker] statistics.
6333 WorkerStat get stats => _stats.snapshot;
64
6522 late final _stats = _Stats(this);
66
67 /// Returns true if the [Worker] is connected i.e., it has a valid [Channel].
68 /// Returns false otherwise.
692 bool get isConnected => _channel != null;
70
71 /// Shared [Channel] that can be used to communicate with the worker.
726 Channel? getSharedChannel() => _channel?.share();
73
74 Channel? _channel;
75 Future<Channel>? _openChannel;
76
77 /// Sends a workload to the worker.
7810 @override
79 Future<dynamic> send(
80 int command, {
81 List args = const [],
82 CancelationToken? token,
83 bool inspectRequest = false,
84 bool inspectResponse = false,
85 }) async {
860 token?.throwIfCanceled();
87
88 // get the channel, start the worker if necessary
8918 final channel = _channel ?? await start();
90
9110 final completer = ForwardCompleter();
92
930 final squadronToken = token?.wrap();
940 squadronToken?.onCanceled.then((ex) {
950 _channel?.cancelToken(squadronToken);
960 completer.failure(SquadronException.from(ex, null, command));
97 });
98
9920 _stats.beginWork();
10050 completer.future.whenComplete(_stats.endWork).ignore();
101
102 try {
10310 final res = await channel.sendRequest(
104 command,
105 args,
106 token: squadronToken,
107 inspectRequest: inspectRequest,
108 inspectResponse: inspectResponse,
109 );
11010 completer.success(res);
111 } catch (ex, st) {
1126 _stats.failed();
1136 completer.failure(SquadronException.from(ex, st, command));
114 }
115
11610 return completer.future;
117 }
118
119 /// Sends a streaming workload to the worker.
1205 @override
121 Stream<dynamic> stream(
122 int command, {
123 List args = const [],
124 CancelationToken? token,
125 bool inspectRequest = false,
126 bool inspectResponse = false,
127 }) {
1282 final squadronToken = token?.wrap();
129
130 late final ForwardStreamController controller;
131
1326 squadronToken?.onCanceled.then((ex) {
1332 if (!controller.isClosed) {
1344 controller.subscription?.cancel();
1354 controller.addError(SquadronException.from(ex, null, command));
1362 controller.close();
137 }
1384 _channel?.cancelToken(squadronToken);
139 });
140
14110 controller = ForwardStreamController(onListen: () async {
142 try {
1435 if (controller.isClosed) return;
1442 squadronToken?.throwIfCanceled();
1458 final channel = _channel ?? await start();
1465 if (controller.isClosed) return;
147
14810 _stats.beginWork();
14925 controller.done.whenComplete(_stats.endWork).ignore();
150
1515 controller.attachSubscription(channel
1525 .sendStreamingRequest(
153 command,
154 args,
155 token: squadronToken,
156 inspectRequest: inspectRequest,
157 inspectResponse: inspectResponse,
158 )
1595 .listen(
1605 controller.add,
1613 onError: (ex, st) =>
1626 controller.addError(SquadronException.from(ex, st, command)),
1635 onDone: controller.close,
164 cancelOnError: false,
165 ));
166 } catch (ex, st) {
1672 _stats.failed();
1681 controller.subscription?.cancel();
1692 controller.addError(SquadronException.from(ex, st, command));
1701 controller.close();
171 }
172 });
173
1745 return controller.stream;
175 }
176
177 /// Creates a [Channel] and starts the worker using the [_entryPoint].
17811 @override
179 Future<Channel> start() async {
18022 if (stats.isStopped) {
1811 throw WorkerException('Invalid state: worker is stopped');
182 }
183
18411 final args = getStartArgs() ?? const [];
18522 _openChannel ??= Channel.open(
18644 exceptionManager, channelLogger, _entryPoint, args, _threadHook);
18722 final channel = _channel ?? await _openChannel;
18810 if (_channel == null) {
18910 _channel = channel;
19020 _stats.start();
191 }
19210 return _channel!;
193 }
194
195 /// Stops this worker.
19611 @override
197 void stop() {
19822 if (!_stats.isStopped) {
19914 channelLogger?.d('Stop worker');
20022 _stats.stop();
20111 _openChannel = null;
20221 _channel?.close();
20311 _channel = null;
204 }
205 }
206
207 /// Terminates this worker.
2082 @override
209 void terminate([TaskTerminatedException? ex]) {
210 // terminate channel and stop worker
2112 ex ??= TaskTerminatedException('Worker has been terminated');
2123 _channel?.terminate(ex);
2132 stop();
214 }
215
216 /// Workers do not need an [operations] map.
2170 @override
2180 OperationsMap get operations => WorkerService.noOperations;
219}
Choose Features