LCOV - code coverage report

Current view
top level - src/_impl/xplat - _worker_runner.dart
Test
lcov.info
Date
2025-03-26
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines12714289.4%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2
3import 'package:logger/web.dart';
4
5import '../../exceptions/squadron_error.dart';
6import '../../exceptions/squadron_exception.dart';
7import '../../local_worker/local_worker.dart';
8import '../../service_installer.dart';
9import '../../tokens/_cancelation_token_ref.dart';
10import '../../tokens/_squadron_cancelation_token.dart';
11import '../../typedefs.dart';
12import '../../worker/worker_channel.dart';
13import '../../worker/worker_request.dart';
14import '../../worker_service.dart';
15import '_internal_logger.dart';
16
17class WorkerRunner {
18 /// Constructs a new worker runner.
1910 WorkerRunner(this._terminate);
20
21 void Function(WorkerRunner) _terminate;
22
23 final internalLogger = InternalLogger();
24
25 WorkerService? _service;
26 OperationsMap? _operations;
27
28 final _cancelTokens = <String, CancelationTokenReference>{};
29
30 bool _terminationRequested = false;
31 int _executing = 0;
32
33 final _streamCancelers = <int, StreamCanceler>{};
34 int _streamId = 0;
35
36 void Function(OutputEvent)? _logForwarder;
37
38 /// Constructs a new worker runner for a [localWorker].
391 factory WorkerRunner.use(LocalWorker localWorker) {
401 final runner = WorkerRunner((r) {
410 r.internalLogger.t('Terminating local Worker');
420 r._service = null;
430 r._operations = null;
44 });
451 runner._service = localWorker;
462 runner._operations = localWorker.operations;
471 runner._checkOperations();
48 return runner;
49 }
50
5110 void _checkOperations() {
5260 final invalidKeys = _operations!.keys.where((k) => k <= 0).toList();
5310 if (invalidKeys.isNotEmpty) {
542 throw SquadronErrorImpl.create(
558 'Invalid command identifier${(invalidKeys.length > 1) ? 's' : ''} in service operations map: ${invalidKeys.join(', ')}. Command ids must be positive.',
56 );
57 }
58 }
59
60 /// Called by the platform worker upon startup, in response to a start
61 /// [WorkerRequest]. [channelInfo] is an opaque object sent back from the
62 /// platform worker to the Squadron [Worker] and used to communicate with the
63 /// platform worker. Typically, [channelInfo] would be a [SendPort] (native)
64 /// or a [MessagePort] (browser). [initializer] is called to build the
65 /// [WorkerService] associated to the worker. The runner's [_service] map
66 /// will be set with operations from the service.
6710 Future<void> connect(WorkerRequest? startRequest, PlatformChannel channelInfo,
68 WorkerInitializer initializer) async {
69 WorkerChannel? channel;
70 try {
7120 startRequest?.unwrapInPlace(internalLogger);
7210 channel = startRequest?.channel;
73
74 if (startRequest == null) {
752 throw SquadronErrorImpl.create('Missing connection request');
76 } else if (channel == null) {
770 throw SquadronErrorImpl.create('Missing client for connection request');
78 }
79
8010 if (_logForwarder == null) {
8110 final logger = channel.log;
8240 _logForwarder = (event) => logger(event.origin);
8320 Logger.addOutputListener(_logForwarder!);
84 }
85
8610 if (!startRequest.isConnection) {
870 throw SquadronErrorImpl.create('Connection request expected');
8820 } else if (_service != null || _operations != null) {
890 throw SquadronErrorImpl.create('Already connected');
90 }
91
9220 _service = await initializer(startRequest);
9330 _operations = _service!.operations;
9410 _checkOperations();
95
9610 channel.connect(channelInfo);
97
9820 if (_service is ServiceInstaller) {
994 _installCompleter = Completer()
1004 ..complete((() async {
101 try {
1024 await (_service as ServiceInstaller).install();
103 } catch (ex, st) {
1044 internalLogger.e(() => 'Service installation failed: $ex');
1051 channel?.error(ex, st);
1061 channel?.closeStream();
1072 _installError = SquadronException.from(ex, st);
108 }
1092 })());
110 }
111 } catch (ex, st) {
1128 internalLogger.e(() => 'Connection failed: $ex');
1132 channel?.error(ex, st);
1142 _exit();
115 }
116 }
117
118 Completer<void>? _installCompleter;
119 SquadronException? _installError;
120
121 /// [WorkerRequest] handler dispatching commands according to the
122 /// [_service] map. Make sure this method doesn't throw.
12310 void processRequest(WorkerRequest request) async {
124 WorkerChannel? channel;
125 try {
12620 request.unwrapInPlace(internalLogger);
12710 channel = request.channel;
128
12910 if (request.isTermination) {
130 // terminate the worker
13110 return _shutdown();
132 }
133
134 // check installation result if necessary
13512 final pendingInstallation = _installCompleter?.future;
136 if (pendingInstallation != null) {
137 await pendingInstallation;
1382 _installCompleter = null;
139 }
140
14110 if (_installError != null) {
142 // service installation failed
1431 throw _installError!;
144 }
145
146 // ==== these requests do not send a response ====
147
14810 if (request.isTokenCancelation) {
149 // cancel a token
1502 final token = request.cancelToken!;
1514 return _getTokenRef(token).update(token);
15210 } else if (request.isStreamCancelation) {
153 // cancel a stream
1549 final canceler = _streamCancelers[request.streamId];
1553 return canceler?.call();
156 }
157
158 // make sure the worker is connected
159
16010 if (request.isConnection) {
161 // connection requests are handled by connect().
1620 throw SquadronErrorImpl.create(
1630 'Unexpected connection request: $request');
16410 } else if (_operations == null) {
165 // commands are not available yet (maybe connect() wasn't called or awaited)
1660 throw SquadronErrorImpl.create('Worker service is not ready');
167 }
168
169 // ==== other requests require a client to send the response ====
170
171 if (channel == null) {
1720 throw SquadronErrorImpl.create('Missing client for request: $request');
173 }
174
17510 final token = request.cancelToken;
1762 token?.throwIfCanceled();
177
178 // start monitoring execution
17910 final tokenRef = _begin(request);
180 try {
181 // find the operation matching the request command
18230 final cmd = request.command, op = _operations![cmd];
183 if (op == null) {
1842 throw SquadronErrorImpl.create('Unknown command: $cmd');
185 }
186
187 // process
18810 var result = op(request);
18910 if (result is Future) {
190 result = await result;
191 }
192
19310 final reply = request.reply!;
19416 if (result is Stream && channel.canStream(result)) {
195 // result is a stream: forward data to the client
1965 final replyWithError = channel.error;
1973 void postError(Object exception, [StackTrace? stackTrace]) {
1983 replyWithError(exception, stackTrace, cmd);
199 }
200
2015 void post(data) {
202 try {
2035 reply(data);
204 } catch (ex, st) {
2050 postError(ex, st);
206 }
207 }
208
2095 await _pipe(result, channel, post, postError, token);
210 } else {
211 // result is a value: send to the client
21210 reply(result);
213 }
214 } finally {
215 // stop monitoring execution
21610 _done(tokenRef);
217 }
218 } catch (ex, st) {
219 if (channel != null) {
2206 channel.error(ex, st, request.command);
221 } else {
2223 internalLogger.e('Unhandled error: $ex');
223 }
224 }
225 }
226
22710 CancelationTokenReference _getTokenRef(SquadronCancelationToken? token) =>
228 (token == null)
22910 ? CancelationTokenReference.noToken
2304 : _cancelTokens.putIfAbsent(
2318 token.id, () => CancelationTokenReference(token.id));
232
233 /// Starts monitoring execution of this [request]. If the request contains a
234 /// cancelation token, it is overridden with a [CancelationTokenReference]
235 /// and this reference is returned to the sender. Otherwise, returns
236 /// [CancelationTokenReference.noToken].
23710 CancelationTokenReference _begin(WorkerRequest request) {
23820 _executing++;
23920 final token = _getTokenRef(request.cancelToken);
24010 token.usedBy(request);
241 return token;
242 }
243
244 /// Stops monitoring execution and releases the [tokenRef].
24510 void _done(CancelationTokenReference tokenRef) {
24610 tokenRef.release();
24720 if (tokenRef.refCount == 0) {
2486 _cancelTokens.remove(tokenRef.id);
249 }
25020 _executing--;
25114 if (_terminationRequested && _executing == 0) {
2522 _unmount();
253 }
254 }
255
256 /// Forwards stream events to client.
2575 Future<void> _pipe(
258 Stream<dynamic> stream,
259 WorkerChannel channel,
260 void Function(dynamic) post,
261 void Function(Object exception, [StackTrace? stackTrace]) postError,
262 SquadronCancelationToken? token,
263 ) {
264 late final StreamSubscription subscription;
2655 final done = Completer();
266
267 late final int streamId;
268
269 // send endOfStream to client
2705 Future<void> onDone() async {
2715 _unregisterStreamCanceler(streamId);
2725 channel.closeStream();
2735 await subscription.cancel();
2745 done.complete();
275 }
276
277 final bool Function() checkToken;
278 if (token == null) {
2795 checkToken = () => true;
280 } else {
2812 checkToken = () {
2822 final ex = token.exception;
283 if (ex != null) {
2840 postError(ex);
2850 onDone();
286 }
287 return (ex == null);
288 };
289 }
290
291 // register stream canceler callback and connect stream with client
2925 streamId = _registerStreamCanceler(onDone);
2935 post(streamId);
2945 if (checkToken()) {
295 // start forwarding messages to the client
2965 subscription = stream.listen(
2975 (data) {
29810 if (checkToken()) post(data);
299 },
3003 onError: (ex, st) {
3016 if (checkToken()) postError(ex, st);
302 },
303 onDone: onDone,
304 cancelOnError: false,
305 );
306 }
307
3085 return done.future;
309 }
310
311 /// Assigns a stream ID to the stream canceler callback and registers the
312 /// callback.
3135 int _registerStreamCanceler(StreamCanceler canceler) {
31410 final streamId = ++_streamId;
31510 _streamCancelers[streamId] = canceler;
316 return streamId;
317 }
318
319 /// Unregisters the stream canceled callback associated to the [streamId].
3205 void _unregisterStreamCanceler(int streamId) {
32110 _streamCancelers.remove(streamId);
322 }
323
324 /// Terminates the worker if there is no pending execution. Otherwise, marks
325 /// the worker as terminating and termination will be effective when all
326 /// pending executions have completed.
32710 void _shutdown() {
32810 _terminationRequested = true;
32920 if (_executing == 0) {
33010 _unmount();
331 }
332 }
333
334 // should not throw
33510 void _unmount() async {
336 try {
337 // uninstall the service if necessary
33820 if (_service is ServiceInstaller) {
339 // check installation result
3402 final pendingInstallation = _installCompleter?.future;
341 if (pendingInstallation != null) {
342 await pendingInstallation;
3430 _installCompleter = null;
344 }
3452 if (_installError == null) {
346 // uninstall iif the service installed succesfuly
3474 await (_service as ServiceInstaller).uninstall();
348 }
349 }
350 } catch (ex) {
3513 internalLogger.e('Service uninstallation failed with error: $ex');
352 } finally {
35310 _exit();
354 }
355 }
356
357 // should not throw
35810 void _exit() {
359 try {
36020 _terminate(this);
361 } catch (ex) {
3620 internalLogger.e('Worker termination failed with error: $ex');
363 }
36410 if (_logForwarder != null) {
36520 Logger.removeOutputListener(_logForwarder!);
366 }
367 }
368}
Choose Features