1 | | | import 'dart:async'; |
2 | | |
|
3 | | | import 'package:logger/web.dart'; |
4 | | |
|
5 | | | import '../../exceptions/squadron_error.dart'; |
6 | | | import '../../exceptions/squadron_exception.dart'; |
7 | | | import '../../local_worker/local_worker.dart'; |
8 | | | import '../../tokens/_cancelation_token_ref.dart'; |
9 | | | import '../../tokens/_squadron_cancelation_token.dart'; |
10 | | | import '../../typedefs.dart'; |
11 | | | import '../../worker/worker_channel.dart'; |
12 | | | import '../../worker/worker_request.dart'; |
13 | | | import '../../worker_service.dart'; |
14 | | | import '_internal_logger.dart'; |
15 | | |
|
16 | | | class WorkerRunner { |
17 | | | /// Constructs a new worker runner. |
18 | | 10 | WorkerRunner(this._terminate); |
19 | | |
|
20 | | | void Function(WorkerRunner) _terminate; |
21 | | |
|
22 | | | final internalLogger = InternalLogger(); |
23 | | |
|
24 | | | WorkerService? _service; |
25 | | |
|
26 | | 1 | final _cancelTokens = <String, CancelationTokenReference>{}; |
27 | | |
|
28 | | | bool _terminationRequested = false; |
29 | | | int _executing = 0; |
30 | | |
|
31 | | 1 | final _streamCancelers = <int, SquadronCallback>{}; |
32 | | | int _streamId = 0; |
33 | | |
|
34 | | | void Function(OutputEvent)? _logForwarder; |
35 | | |
|
36 | | | /// Constructs a new worker runner for a [localWorker]. |
37 | | 2 | factory WorkerRunner.use(LocalWorker localWorker) { |
38 | | 2 | final runner = WorkerRunner((r) { |
39 | | 0 | r.internalLogger.t('Terminating local Worker'); |
40 | | 0 | r._service = null; |
41 | | 0 | }); |
42 | | 2 | runner._service = localWorker; |
43 | | 1 | return runner; |
44 | | 1 | } |
45 | | |
|
46 | | | /// Called by the platform worker upon startup, in response to a start |
47 | | | /// [WorkerRequest]. [channelInfo] is an opaque object sent back from the |
48 | | | /// platform worker to the Squadron [Worker] and used to communicate with the |
49 | | | /// platform worker. Typically, [channelInfo] would be a [SendPort] (native) |
50 | | | /// or a [MessagePort] (browser). [initializer] is called to build the |
51 | | | /// [WorkerService] associated to the worker. The runner's [_service] map |
52 | | | /// will be set with operations from the service. |
53 | | 9 | Future<void> connect(WorkerRequest? startRequest, PlatformChannel channelInfo, |
54 | | | WorkerInitializer initializer) async { |
55 | | | WorkerChannel? channel; |
56 | | | try { |
57 | | 18 | startRequest?.unwrapInPlace(internalLogger); |
58 | | 9 | channel = startRequest?.channel; |
59 | | |
|
60 | | | if (startRequest == null) { |
61 | | 2 | throw SquadronErrorExt.create('Missing connection request'); |
62 | | | } else if (channel == null) { |
63 | | 0 | throw SquadronErrorExt.create('Missing client for connection request'); |
64 | | | } |
65 | | |
|
66 | | 9 | if (_logForwarder == null) { |
67 | | 9 | final logger = channel.log; |
68 | | 36 | _logForwarder = (event) => logger(event.origin); |
69 | | 18 | Logger.addOutputListener(_logForwarder!); |
70 | | | } |
71 | | |
|
72 | | 9 | if (!startRequest.isConnection) { |
73 | | 0 | throw SquadronErrorExt.create('Connection request expected'); |
74 | | 9 | } else if (_service != null) { |
75 | | 0 | throw SquadronErrorExt.create('Already connected'); |
76 | | | } |
77 | | |
|
78 | | 18 | _service = await initializer(startRequest); |
79 | | |
|
80 | | 63 | if (_service!.operations.keys.where((k) => k <= 0).isNotEmpty) { |
81 | | 2 | throw SquadronErrorExt.create( |
82 | | | 'Invalid command identifier in service operations map; command ids must be > 0', |
83 | | | ); |
84 | | | } |
85 | | |
|
86 | | 9 | channel.connect(channelInfo); |
87 | | |
|
88 | | 18 | if (_service is ServiceInstaller) { |
89 | | 2 | _installCompleter = Completer() |
90 | | 2 | ..complete((() async { |
91 | | | try { |
92 | | 2 | await (_service as ServiceInstaller).install(); |
93 | | | } catch (ex, st) { |
94 | | 4 | internalLogger.e(() => 'Service installation failed: $ex'); |
95 | | 1 | channel?.error(ex, st); |
96 | | 1 | channel?.closeStream(); |
97 | | 2 | _installResult = SquadronException.from(ex, st); |
98 | | | } |
99 | | 1 | })()); |
100 | | | } |
101 | | | } catch (ex, st) { |
102 | | 8 | internalLogger.e(() => 'Connection failed: $ex'); |
103 | | 2 | channel?.error(ex, st); |
104 | | 2 | _exit(); |
105 | | | } |
106 | | | } |
107 | | |
|
108 | | | Completer<void>? _installCompleter; |
109 | | | SquadronException? _installResult; |
110 | | |
|
111 | | | /// [WorkerRequest] handler dispatching commands according to the |
112 | | | /// [_service] map. Make sure this method doesn't throw. |
113 | | 10 | void processRequest(WorkerRequest request) async { |
114 | | 1 | WorkerChannel? channel; |
115 | | 1 | try { |
116 | | 19 | request.unwrapInPlace(internalLogger); |
117 | | 9 | channel = request.channel; |
118 | | |
|
119 | | 9 | if (request.isTermination) { |
120 | | | // terminate the worker |
121 | | 9 | return _shutdown(); |
122 | | | } |
123 | | |
|
124 | | | // check installation result if necessary |
125 | | 10 | final pendingInstallation = _installCompleter?.future; |
126 | | 1 | if (pendingInstallation != null) { |
127 | | 0 | await pendingInstallation; |
128 | | 1 | _installCompleter = null; |
129 | | | } |
130 | | |
|
131 | | 10 | if (_installResult != null) { |
132 | | | // service installation failed |
133 | | 1 | throw _installResult!; |
134 | | | } |
135 | | |
|
136 | | | // ==== these requests do not send a response ==== |
137 | | |
|
138 | | 9 | if (request.isTokenCancelation) { |
139 | | | // cancel a token |
140 | | 2 | final token = request.cancelToken!; |
141 | | 4 | return _getTokenRef(token).update(token); |
142 | | 9 | } else if (request.isStreamCancelation) { |
143 | | | // cancel a stream |
144 | | 9 | final canceler = _streamCancelers[request.streamId]; |
145 | | 3 | return canceler?.call(); |
146 | | | } |
147 | | |
|
148 | | | // make sure the worker is connected |
149 | | |
|
150 | | 9 | if (request.isConnection) { |
151 | | | // connection requests are handled by connect(). |
152 | | 0 | throw SquadronErrorExt.create( |
153 | | 0 | 'Unexpected connection request: $request'); |
154 | | 10 | } else if (_service == null) { |
155 | | | // commands are not available yet (maybe connect() wasn't called or awaited) |
156 | | 0 | throw SquadronErrorExt.create('Worker service is not ready'); |
157 | | | } |
158 | | |
|
159 | | | // ==== other requests require a client to send the response ==== |
160 | | |
|
161 | | | if (channel == null) { |
162 | | 0 | throw SquadronErrorExt.create('Missing client for request: $request'); |
163 | | | } |
164 | | |
|
165 | | 9 | final token = request.cancelToken; |
166 | | 2 | token?.throwIfCanceled(); |
167 | | |
|
168 | | | // start monitoring execution |
169 | | 9 | final tokenRef = _begin(request); |
170 | | | try { |
171 | | | // find the operation matching the request command |
172 | | 37 | final cmd = request.command, op = _service?.operations[cmd]; |
173 | | | if (op == null) { |
174 | | 2 | throw SquadronErrorExt.create('Unknown command: $cmd'); |
175 | | | } |
176 | | |
|
177 | | | // process |
178 | | 10 | var result = op(request); |
179 | | 10 | if (result is Future) { |
180 | | 0 | result = await result; |
181 | | | } |
182 | | |
|
183 | | 10 | final reply = request.reply!; |
184 | | 16 | if (result is Stream && channel.canStream(result)) { |
185 | | | // result is a stream: forward data to the client |
186 | | 6 | final replyWithError = channel.error; |
187 | | 3 | void postError(Object exception, [StackTrace? stackTrace]) { |
188 | | 3 | replyWithError(exception, stackTrace, cmd); |
189 | | 0 | } |
190 | | |
|
191 | | 6 | void post(data) { |
192 | | 1 | try { |
193 | | 6 | reply(data); |
194 | | 0 | } catch (ex, st) { |
195 | | 0 | postError(ex, st); |
196 | | | } |
197 | | 1 | } |
198 | | |
|
199 | | 6 | await _pipe(result, channel, post, postError, token); |
200 | | | } else { |
201 | | | // result is a value: send to the client |
202 | | 9 | reply(result); |
203 | | | } |
204 | | | } finally { |
205 | | | // stop monitoring execution |
206 | | 9 | _done(tokenRef); |
207 | | | } |
208 | | 1 | } catch (ex, st) { |
209 | | | if (channel != null) { |
210 | | 6 | channel.error(ex, st, request.command); |
211 | | | } else { |
212 | | 3 | internalLogger.e('Unhandled error: $ex'); |
213 | | | } |
214 | | | } |
215 | | 1 | } |
216 | | |
|
217 | | 10 | CancelationTokenReference _getTokenRef(SquadronCancelationToken? token) => |
218 | | 1 | (token == null) |
219 | | 10 | ? CancelationTokenReference.noToken |
220 | | 4 | : _cancelTokens.putIfAbsent( |
221 | | 9 | token.id, () => CancelationTokenReference(token.id)); |
222 | | |
|
223 | | | /// Starts monitoring execution of this [request]. If the request contains a |
224 | | | /// cancelation token, it is overridden with a [CancelationTokenReference] |
225 | | | /// and this reference is returned to the sender. Otherwise, returns |
226 | | | /// [CancelationTokenReference.noToken]. |
227 | | 9 | CancelationTokenReference _begin(WorkerRequest request) { |
228 | | 18 | _executing++; |
229 | | 19 | final token = _getTokenRef(request.cancelToken); |
230 | | 9 | token.usedBy(request); |
231 | | | return token; |
232 | | | } |
233 | | |
|
234 | | | /// Stops monitoring execution and releases the [tokenRef]. |
235 | | 9 | void _done(CancelationTokenReference tokenRef) { |
236 | | 10 | tokenRef.release(); |
237 | | 18 | if (tokenRef.refCount == 0) { |
238 | | 6 | _cancelTokens.remove(tokenRef.id); |
239 | | | } |
240 | | 18 | _executing--; |
241 | | 14 | if (_terminationRequested && _executing == 0) { |
242 | | 2 | _unmount(); |
243 | | | } |
244 | | | } |
245 | | |
|
246 | | | /// Forwards stream events to client. |
247 | | 6 | Future<void> _pipe( |
248 | | | Stream<dynamic> stream, |
249 | | | WorkerChannel channel, |
250 | | | void Function(dynamic) post, |
251 | | | void Function(Object exception, [StackTrace? stackTrace]) postError, |
252 | | | SquadronCancelationToken? token, |
253 | | | ) { |
254 | | 1 | late final StreamSubscription subscription; |
255 | | 5 | final done = Completer(); |
256 | | |
|
257 | | 1 | late final int streamId; |
258 | | |
|
259 | | | // send endOfStream to client |
260 | | 6 | Future<void> onDone() async { |
261 | | 6 | _unregisterStreamCanceler(streamId); |
262 | | 6 | channel.closeStream(); |
263 | | 6 | await subscription.cancel(); |
264 | | 6 | done.complete(); |
265 | | 1 | } |
266 | | |
|
267 | | | final bool Function() checkToken; |
268 | | | if (token == null) { |
269 | | 6 | checkToken = () => true; |
270 | | | } else { |
271 | | 2 | checkToken = () { |
272 | | 2 | final ex = token.exception; |
273 | | 0 | if (ex != null) { |
274 | | 0 | postError(ex); |
275 | | 0 | onDone(); |
276 | | | } |
277 | | 0 | return (ex == null); |
278 | | 0 | }; |
279 | | | } |
280 | | |
|
281 | | | // register stream canceler callback and connect stream with client |
282 | | 6 | streamId = _registerStreamCanceler(onDone); |
283 | | 6 | post(streamId); |
284 | | 6 | if (checkToken()) { |
285 | | | // start forwarding messages to the client |
286 | | 6 | subscription = stream.listen( |
287 | | 6 | (data) { |
288 | | 11 | if (checkToken()) post(data); |
289 | | 1 | }, |
290 | | 4 | onError: (ex, st) { |
291 | | 6 | if (checkToken()) postError(ex, st); |
292 | | 0 | }, |
293 | | | onDone: onDone, |
294 | | | cancelOnError: false, |
295 | | | ); |
296 | | | } |
297 | | |
|
298 | | 6 | return done.future; |
299 | | 1 | } |
300 | | |
|
301 | | | /// Assigns a stream ID to the stream canceler callback and registers the |
302 | | | /// callback. |
303 | | 5 | int _registerStreamCanceler(SquadronCallback canceler) { |
304 | | 10 | final streamId = ++_streamId; |
305 | | 11 | _streamCancelers[streamId] = canceler; |
306 | | | return streamId; |
307 | | | } |
308 | | |
|
309 | | | /// Unregisters the stream canceled callback associated to the [streamId]. |
310 | | 5 | void _unregisterStreamCanceler(int streamId) { |
311 | | 11 | _streamCancelers.remove(streamId); |
312 | | | } |
313 | | |
|
314 | | | /// Terminates the worker if there is no pending execution. Otherwise, marks |
315 | | | /// the worker as terminating and termination will be effective when all |
316 | | | /// pending executions have completed. |
317 | | 9 | void _shutdown() { |
318 | | 9 | _terminationRequested = true; |
319 | | 18 | if (_executing == 0) { |
320 | | 8 | _unmount(); |
321 | | | } |
322 | | | } |
323 | | |
|
324 | | | // should not throw |
325 | | 9 | void _unmount() async { |
326 | | 0 | try { |
327 | | | // uninstall the service if necessary |
328 | | 18 | if (_service is ServiceInstaller) { |
329 | | | // check installation result |
330 | | 1 | final pendingInstallation = _installCompleter?.future; |
331 | | | if (pendingInstallation != null) { |
332 | | | await pendingInstallation; |
333 | | 0 | _installCompleter = null; |
334 | | | } |
335 | | 1 | if (_installResult == null) { |
336 | | | // uninstall iif the service installed succesfuly |
337 | | 2 | await (_service as ServiceInstaller).uninstall(); |
338 | | | } |
339 | | | } |
340 | | | } catch (ex) { |
341 | | 3 | internalLogger.e('Service uninstallation failed with error: $ex'); |
342 | | | } finally { |
343 | | 9 | _exit(); |
344 | | | } |
345 | | 0 | } |
346 | | |
|
347 | | | // should not throw |
348 | | 9 | void _exit() { |
349 | | 0 | try { |
350 | | 18 | _terminate(this); |
351 | | | } catch (ex) { |
352 | | 0 | internalLogger.e('Worker termination failed with error: $ex'); |
353 | | | } |
354 | | 9 | if (_logForwarder != null) { |
355 | | 18 | Logger.removeOutputListener(_logForwarder!); |
356 | | | } |
357 | | 0 | } |
358 | | | } |