LCOV - code coverage report

Current view
top level - src/_impl/xplat - _worker_runner.dart
Test
lcov.info
Date
2026-02-21
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines12814290.1%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2
3import 'package:logger/web.dart';
4
5import '../../exceptions/squadron_error.dart';
6import '../../exceptions/squadron_exception.dart';
7import '../../local_worker/local_worker.dart';
8import '../../service_installer.dart';
9import '../../tokens/_cancelation_token_ref.dart';
10import '../../tokens/_squadron_cancelation_token.dart';
11import '../../typedefs.dart';
12import '../../worker/worker_channel.dart';
13import '../../worker/worker_request.dart';
14import '../../worker_service.dart';
15import '_internal_logger.dart';
16
17class WorkerRunner {
18 /// Constructs a new worker runner.
1910 WorkerRunner(this._terminate);
20
21 void Function(WorkerRunner) _terminate;
22
23 final internalLogger = InternalLogger();
24
25 WorkerService? _service;
26 OperationsMap? _operations;
27
28 final _cancelTokens = <String, CancelationTokenReference>{};
29
30 bool _terminationRequested = false;
31 int _executing = 0;
32
33 final _streamCancelers = <int, StreamCanceler>{};
34 int _streamId = 0;
35
36 void Function(OutputEvent)? _logForwarder;
37
38 /// Constructs a new worker runner for a [localWorker].
391 factory WorkerRunner.use(LocalWorker localWorker) {
401 final runner = WorkerRunner((r) {
410 r.internalLogger.t('Terminating local Worker');
420 r._service = null;
430 r._operations = null;
44 });
452 _checkOperations(localWorker.operations);
461 runner._service = localWorker;
472 runner._operations = localWorker.operations;
48 return runner;
49 }
50
5110 static void _checkOperations(OperationsMap ops) {
5250 final invalidKeys = ops.keys.where((k) => k <= 0).toList();
5310 if (invalidKeys.isNotEmpty) {
542 throw SquadronErrorImpl.create(
558 'Invalid command identifier${(invalidKeys.length > 1) ? 's' : ''} in service operations map: ${invalidKeys.join(', ')}. Command ids must be positive.',
56 );
57 }
58 }
59
60 /// Called by the platform worker upon startup, in response to a start
61 /// [WorkerRequest]. [channelInfo] is an opaque object sent back from the
62 /// platform worker to the Squadron [Worker] and used to communicate with the
63 /// platform worker. Typically, [channelInfo] would be a [SendPort] (native)
64 /// or a [MessagePort] (browser). [initializer] is called to build the
65 /// [WorkerService] associated to the worker. The runner's [_service] map
66 /// will be set with operations from the service.
6710 void connect(WorkerRequest? startRequest, PlatformChannel channelInfo,
68 WorkerInitializer initializer) async {
69 late final WorkerChannel? channel;
70 try {
7120 startRequest?.unwrapInPlace(internalLogger);
7210 channel = startRequest?.channel;
73
74 if (startRequest == null) {
752 throw SquadronErrorImpl.create('Missing connection request');
76 } else if (channel == null) {
770 throw SquadronErrorImpl.create('Missing client for connection request');
78 }
79
8010 if (_logForwarder == null) {
8110 final logger = channel.log;
8240 _logForwarder = (event) => logger(event.origin);
8320 Logger.addOutputListener(_logForwarder!);
84 }
85
8610 if (!startRequest.isConnection) {
870 throw SquadronErrorImpl.create('Connection request expected');
8820 } else if (_service != null || _operations != null) {
890 throw SquadronErrorImpl.create('Already connected');
90 }
91
9210 var service = initializer(startRequest);
9310 if (service is Future<WorkerService>) service = await service;
94 service as WorkerService;
9520 _checkOperations(service.operations);
9610 _service = service;
9720 _operations = service.operations;
98
9910 channel.connect(channelInfo);
100
10110 if (service is ServiceInstaller) {
1024 _installCompleter = Completer<void>();
103 try {
1042 final result = (service as ServiceInstaller).install();
1052 if (result is Future) await result;
106 } catch (ex, st) {
1074 internalLogger.e(() => 'Service installation failed: $ex');
1081 channel.error(ex, st);
1091 channel.closeStream();
1102 _installError = SquadronException.from(ex, st);
111 } finally {
1124 _installCompleter?.complete();
113 }
114 }
115 } catch (ex, st) {
1168 internalLogger.e(() => 'Connection failed: $ex');
1172 channel?.error(ex, st);
1182 _exit();
119 }
120 }
121
122 Completer<void>? _installCompleter;
123 SquadronException? _installError;
124
125 /// [WorkerRequest] handler dispatching commands according to the
126 /// [_service] map. Make sure this method doesn't throw.
12710 void processRequest(WorkerRequest request) async {
128 WorkerChannel? channel;
129 try {
13020 request.unwrapInPlace(internalLogger);
13110 channel = request.channel;
132
13310 if (request.isTermination) {
134 // terminate the worker
13510 return _shutdown();
136 }
137
138 // check installation result if necessary
13912 final pendingInstallation = _installCompleter?.future;
140 if (pendingInstallation != null) {
141 await pendingInstallation;
1422 _installCompleter = null;
143 }
144
14510 if (_installError != null) {
146 // service installation failed
1471 throw _installError!;
148 }
149
150 // ==== these requests do not send a response ====
151
15210 if (request.isTokenCancelation) {
153 // cancel a token
1542 final token = request.cancelToken!;
1554 return _getTokenRef(token).update(token);
15610 } else if (request.isStreamCancelation) {
157 // cancel a stream
15812 final canceler = _streamCancelers[request.streamId];
1593 return canceler?.call();
160 }
161
162 // make sure the worker is connected
163
16410 if (request.isConnection) {
165 // connection requests are handled by connect().
1660 throw SquadronErrorImpl.create(
1670 'Unexpected connection request: $request');
168 }
169
170 // find the operation matching the request command
17130 final cmd = request.command, op = _operations![cmd];
172 if (op == null) {
173 // unknown command, or commands are not available yet (maybe connect() wasn't called or awaited)
1742 throw SquadronErrorImpl.create((_operations == null)
175 ? 'Worker service is not ready'
1761 : 'Unknown command: $cmd');
177 }
178
179 // ==== other requests require a client to send the response ====
180
181 if (channel == null) {
1820 throw SquadronErrorImpl.create('Missing client for request: $request');
183 }
184
18510 final token = request.cancelToken;
1862 token?.throwIfCanceled();
187
188 // start monitoring execution
18910 final tokenRef = _begin(request);
190 try {
191 // process
19210 var result = op(request);
19310 if (result is Future) {
194 result = await result;
195 }
196
19710 final reply = request.reply!;
19816 if (result is Stream && channel.canStream(result)) {
199 // result is a stream: forward data to the client
2005 final replyWithError = channel.error;
2013 void $postError(Object exception, [StackTrace? stackTrace]) {
2023 replyWithError(exception, stackTrace, cmd);
203 }
204
2055 void post(data) {
206 try {
2075 reply(data);
208 } catch (ex, st) {
2090 $postError(ex, st);
210 }
211 }
212
2135 await _pipe(result, channel, post, $postError, token);
214 } else {
215 // result is a value: send to the client
21610 reply(result);
217 }
218 } finally {
219 // stop monitoring execution
22010 _done(tokenRef);
221 }
222 } catch (ex, st) {
223 if (channel != null) {
2246 channel.error(ex, st, request.command);
225 } else {
2263 internalLogger.e('Unhandled error: $ex');
227 }
228 }
229 }
230
23110 CancelationTokenReference _getTokenRef(SquadronCancelationToken? token) =>
232 (token == null)
23310 ? CancelationTokenReference.noToken
2344 : _cancelTokens.putIfAbsent(
2358 token.id, () => CancelationTokenReference(token.id));
236
237 /// Starts monitoring execution of this [request]. If the request contains a
238 /// cancelation token, it is overridden with a [CancelationTokenReference]
239 /// and this reference is returned to the sender. Otherwise, returns
240 /// [CancelationTokenReference.noToken].
24110 CancelationTokenReference _begin(WorkerRequest request) {
24220 _executing++;
24320 final token = _getTokenRef(request.cancelToken);
24410 token.usedBy(request);
245 return token;
246 }
247
248 /// Stops monitoring execution and releases the [tokenRef].
24910 void _done(CancelationTokenReference tokenRef) {
25010 tokenRef.release();
25120 if (tokenRef.refCount == 0) {
2526 _cancelTokens.remove(tokenRef.id);
253 }
25420 _executing--;
25514 if (_terminationRequested && _executing == 0) {
2562 _unmount();
257 }
258 }
259
260 /// Forwards stream events to client.
2615 Future<void> _pipe(
262 Stream<dynamic> stream,
263 WorkerChannel channel,
264 void Function(dynamic) post,
265 void Function(Object exception, [StackTrace? stackTrace]) postError,
266 SquadronCancelationToken? token,
267 ) {
268 late final StreamSubscription subscription;
2695 final done = Completer();
270
271 late final int streamId;
272
273 // send endOfStream to client
2745 Future<void> onDone() {
2755 _unregisterStreamCanceler(streamId);
2765 channel.closeStream();
27715 return subscription.cancel().whenComplete(done.complete);
278 }
279
280 final bool Function() checkToken;
281 if (token == null) {
2825 checkToken = () => true;
283 } else {
2842 checkToken = () {
2852 final ex = token.exception;
286 if (ex != null) {
2870 postError(ex);
2880 onDone();
289 }
290 return (ex == null);
291 };
292 }
293
294 // register stream canceler callback and connect stream with client
2955 streamId = _registerStreamCanceler(onDone);
2965 post(streamId);
2975 if (checkToken()) {
298 // start forwarding messages to the client
2995 subscription = stream.listen(
3005 (data) {
30110 if (checkToken()) post(data);
302 },
3033 onError: (ex, st) {
3046 if (checkToken()) postError(ex, st);
305 },
306 onDone: onDone,
307 cancelOnError: false,
308 );
309 }
310
3115 return done.future;
312 }
313
314 /// Assigns a stream ID to the stream canceler callback and registers the
315 /// callback.
3165 int _registerStreamCanceler(StreamCanceler canceler) {
31710 final streamId = ++_streamId;
31810 _streamCancelers[streamId] = canceler;
319 return streamId;
320 }
321
322 /// Unregisters the stream canceled callback associated to the [streamId].
3235 void _unregisterStreamCanceler(int streamId) {
32410 _streamCancelers.remove(streamId);
325 }
326
327 /// Terminates the worker if there is no pending execution. Otherwise, marks
328 /// the worker as terminating and termination will be effective when all
329 /// pending executions have completed.
33010 void _shutdown() {
33110 _terminationRequested = true;
33220 if (_executing == 0) {
33310 _unmount();
334 }
335 }
336
337 // should not throw
33810 void _unmount() async {
339 try {
340 // uninstall the service if necessary
34120 if (_service is ServiceInstaller) {
342 // check installation result
3432 final pendingInstallation = _installCompleter?.future;
344 if (pendingInstallation != null) {
345 await pendingInstallation;
3460 _installCompleter = null;
347 }
3482 if (_installError == null) {
349 // uninstall iif the service installed succesfuly
3504 await (_service as ServiceInstaller).uninstall();
351 }
352 }
353 } catch (ex) {
3543 internalLogger.e('Service uninstallation failed with error: $ex');
355 } finally {
35610 _exit();
357 }
358 }
359
360 // should not throw
36110 void _exit() {
362 try {
36320 _terminate(this);
364 } catch (ex) {
3650 internalLogger.e('Worker termination failed with error: $ex');
366 }
36710 if (_logForwarder != null) {
36820 Logger.removeOutputListener(_logForwarder!);
369 }
370 }
371}
Choose Features