LCOV - code coverage report

Current view
top level - src/_impl/xplat - _worker_runner.dart
Test
lcov.info
Date
2024-11-13
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines13716483.5%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2
3import 'package:logger/web.dart';
4
5import '../../exceptions/squadron_error.dart';
6import '../../exceptions/squadron_exception.dart';
7import '../../local_worker/local_worker.dart';
8import '../../tokens/_cancelation_token_ref.dart';
9import '../../tokens/_squadron_cancelation_token.dart';
10import '../../typedefs.dart';
11import '../../worker/worker_channel.dart';
12import '../../worker/worker_request.dart';
13import '../../worker_service.dart';
14import '_internal_logger.dart';
15
16class WorkerRunner {
17 /// Constructs a new worker runner.
1810 WorkerRunner(this._terminate);
19
20 void Function(WorkerRunner) _terminate;
21
22 final internalLogger = InternalLogger();
23
24 WorkerService? _service;
25
261 final _cancelTokens = <String, CancelationTokenReference>{};
27
28 bool _terminationRequested = false;
29 int _executing = 0;
30
311 final _streamCancelers = <int, SquadronCallback>{};
32 int _streamId = 0;
33
34 void Function(OutputEvent)? _logForwarder;
35
36 /// Constructs a new worker runner for a [localWorker].
372 factory WorkerRunner.use(LocalWorker localWorker) {
382 final runner = WorkerRunner((r) {
390 r.internalLogger.t('Terminating local Worker');
400 r._service = null;
410 });
422 runner._service = localWorker;
431 return runner;
441 }
45
46 /// Called by the platform worker upon startup, in response to a start
47 /// [WorkerRequest]. [channelInfo] is an opaque object sent back from the
48 /// platform worker to the Squadron [Worker] and used to communicate with the
49 /// platform worker. Typically, [channelInfo] would be a [SendPort] (native)
50 /// or a [MessagePort] (browser). [initializer] is called to build the
51 /// [WorkerService] associated to the worker. The runner's [_service] map
52 /// will be set with operations from the service.
539 Future<void> connect(WorkerRequest? startRequest, PlatformChannel channelInfo,
54 WorkerInitializer initializer) async {
55 WorkerChannel? channel;
56 try {
5718 startRequest?.unwrapInPlace(internalLogger);
589 channel = startRequest?.channel;
59
60 if (startRequest == null) {
612 throw SquadronErrorExt.create('Missing connection request');
62 } else if (channel == null) {
630 throw SquadronErrorExt.create('Missing client for connection request');
64 }
65
669 if (_logForwarder == null) {
679 final logger = channel.log;
6836 _logForwarder = (event) => logger(event.origin);
6918 Logger.addOutputListener(_logForwarder!);
70 }
71
729 if (!startRequest.isConnection) {
730 throw SquadronErrorExt.create('Connection request expected');
749 } else if (_service != null) {
750 throw SquadronErrorExt.create('Already connected');
76 }
77
7818 _service = await initializer(startRequest);
79
8063 if (_service!.operations.keys.where((k) => k <= 0).isNotEmpty) {
812 throw SquadronErrorExt.create(
82 'Invalid command identifier in service operations map; command ids must be > 0',
83 );
84 }
85
869 channel.connect(channelInfo);
87
8818 if (_service is ServiceInstaller) {
892 _installCompleter = Completer()
902 ..complete((() async {
91 try {
922 await (_service as ServiceInstaller).install();
93 } catch (ex, st) {
944 internalLogger.e(() => 'Service installation failed: $ex');
951 channel?.error(ex, st);
961 channel?.closeStream();
972 _installResult = SquadronException.from(ex, st);
98 }
991 })());
100 }
101 } catch (ex, st) {
1028 internalLogger.e(() => 'Connection failed: $ex');
1032 channel?.error(ex, st);
1042 _exit();
105 }
106 }
107
108 Completer<void>? _installCompleter;
109 SquadronException? _installResult;
110
111 /// [WorkerRequest] handler dispatching commands according to the
112 /// [_service] map. Make sure this method doesn't throw.
11310 void processRequest(WorkerRequest request) async {
1141 WorkerChannel? channel;
1151 try {
11619 request.unwrapInPlace(internalLogger);
1179 channel = request.channel;
118
1199 if (request.isTermination) {
120 // terminate the worker
1219 return _shutdown();
122 }
123
124 // check installation result if necessary
12510 final pendingInstallation = _installCompleter?.future;
1261 if (pendingInstallation != null) {
1270 await pendingInstallation;
1281 _installCompleter = null;
129 }
130
13110 if (_installResult != null) {
132 // service installation failed
1331 throw _installResult!;
134 }
135
136 // ==== these requests do not send a response ====
137
1389 if (request.isTokenCancelation) {
139 // cancel a token
1402 final token = request.cancelToken!;
1414 return _getTokenRef(token).update(token);
1429 } else if (request.isStreamCancelation) {
143 // cancel a stream
1449 final canceler = _streamCancelers[request.streamId];
1453 return canceler?.call();
146 }
147
148 // make sure the worker is connected
149
1509 if (request.isConnection) {
151 // connection requests are handled by connect().
1520 throw SquadronErrorExt.create(
1530 'Unexpected connection request: $request');
15410 } else if (_service == null) {
155 // commands are not available yet (maybe connect() wasn't called or awaited)
1560 throw SquadronErrorExt.create('Worker service is not ready');
157 }
158
159 // ==== other requests require a client to send the response ====
160
161 if (channel == null) {
1620 throw SquadronErrorExt.create('Missing client for request: $request');
163 }
164
1659 final token = request.cancelToken;
1662 token?.throwIfCanceled();
167
168 // start monitoring execution
1699 final tokenRef = _begin(request);
170 try {
171 // find the operation matching the request command
17237 final cmd = request.command, op = _service?.operations[cmd];
173 if (op == null) {
1742 throw SquadronErrorExt.create('Unknown command: $cmd');
175 }
176
177 // process
17810 var result = op(request);
17910 if (result is Future) {
1800 result = await result;
181 }
182
18310 final reply = request.reply!;
18416 if (result is Stream && channel.canStream(result)) {
185 // result is a stream: forward data to the client
1866 final replyWithError = channel.error;
1873 void postError(Object exception, [StackTrace? stackTrace]) {
1883 replyWithError(exception, stackTrace, cmd);
1890 }
190
1916 void post(data) {
1921 try {
1936 reply(data);
1940 } catch (ex, st) {
1950 postError(ex, st);
196 }
1971 }
198
1996 await _pipe(result, channel, post, postError, token);
200 } else {
201 // result is a value: send to the client
2029 reply(result);
203 }
204 } finally {
205 // stop monitoring execution
2069 _done(tokenRef);
207 }
2081 } catch (ex, st) {
209 if (channel != null) {
2106 channel.error(ex, st, request.command);
211 } else {
2123 internalLogger.e('Unhandled error: $ex');
213 }
214 }
2151 }
216
21710 CancelationTokenReference _getTokenRef(SquadronCancelationToken? token) =>
2181 (token == null)
21910 ? CancelationTokenReference.noToken
2204 : _cancelTokens.putIfAbsent(
2219 token.id, () => CancelationTokenReference(token.id));
222
223 /// Starts monitoring execution of this [request]. If the request contains a
224 /// cancelation token, it is overridden with a [CancelationTokenReference]
225 /// and this reference is returned to the sender. Otherwise, returns
226 /// [CancelationTokenReference.noToken].
2279 CancelationTokenReference _begin(WorkerRequest request) {
22818 _executing++;
22919 final token = _getTokenRef(request.cancelToken);
2309 token.usedBy(request);
231 return token;
232 }
233
234 /// Stops monitoring execution and releases the [tokenRef].
2359 void _done(CancelationTokenReference tokenRef) {
23610 tokenRef.release();
23718 if (tokenRef.refCount == 0) {
2386 _cancelTokens.remove(tokenRef.id);
239 }
24018 _executing--;
24114 if (_terminationRequested && _executing == 0) {
2422 _unmount();
243 }
244 }
245
246 /// Forwards stream events to client.
2476 Future<void> _pipe(
248 Stream<dynamic> stream,
249 WorkerChannel channel,
250 void Function(dynamic) post,
251 void Function(Object exception, [StackTrace? stackTrace]) postError,
252 SquadronCancelationToken? token,
253 ) {
2541 late final StreamSubscription subscription;
2555 final done = Completer();
256
2571 late final int streamId;
258
259 // send endOfStream to client
2606 Future<void> onDone() async {
2616 _unregisterStreamCanceler(streamId);
2626 channel.closeStream();
2636 await subscription.cancel();
2646 done.complete();
2651 }
266
267 final bool Function() checkToken;
268 if (token == null) {
2696 checkToken = () => true;
270 } else {
2712 checkToken = () {
2722 final ex = token.exception;
2730 if (ex != null) {
2740 postError(ex);
2750 onDone();
276 }
2770 return (ex == null);
2780 };
279 }
280
281 // register stream canceler callback and connect stream with client
2826 streamId = _registerStreamCanceler(onDone);
2836 post(streamId);
2846 if (checkToken()) {
285 // start forwarding messages to the client
2866 subscription = stream.listen(
2876 (data) {
28811 if (checkToken()) post(data);
2891 },
2904 onError: (ex, st) {
2916 if (checkToken()) postError(ex, st);
2920 },
293 onDone: onDone,
294 cancelOnError: false,
295 );
296 }
297
2986 return done.future;
2991 }
300
301 /// Assigns a stream ID to the stream canceler callback and registers the
302 /// callback.
3035 int _registerStreamCanceler(SquadronCallback canceler) {
30410 final streamId = ++_streamId;
30511 _streamCancelers[streamId] = canceler;
306 return streamId;
307 }
308
309 /// Unregisters the stream canceled callback associated to the [streamId].
3105 void _unregisterStreamCanceler(int streamId) {
31111 _streamCancelers.remove(streamId);
312 }
313
314 /// Terminates the worker if there is no pending execution. Otherwise, marks
315 /// the worker as terminating and termination will be effective when all
316 /// pending executions have completed.
3179 void _shutdown() {
3189 _terminationRequested = true;
31918 if (_executing == 0) {
3208 _unmount();
321 }
322 }
323
324 // should not throw
3259 void _unmount() async {
3260 try {
327 // uninstall the service if necessary
32818 if (_service is ServiceInstaller) {
329 // check installation result
3301 final pendingInstallation = _installCompleter?.future;
331 if (pendingInstallation != null) {
332 await pendingInstallation;
3330 _installCompleter = null;
334 }
3351 if (_installResult == null) {
336 // uninstall iif the service installed succesfuly
3372 await (_service as ServiceInstaller).uninstall();
338 }
339 }
340 } catch (ex) {
3413 internalLogger.e('Service uninstallation failed with error: $ex');
342 } finally {
3439 _exit();
344 }
3450 }
346
347 // should not throw
3489 void _exit() {
3490 try {
35018 _terminate(this);
351 } catch (ex) {
3520 internalLogger.e('Worker termination failed with error: $ex');
353 }
3549 if (_logForwarder != null) {
35518 Logger.removeOutputListener(_logForwarder!);
356 }
3570 }
358}
Choose Features