| 1 | | | import 'dart:async'; |
| 2 | | | |
| 3 | | | import 'package:logger/web.dart'; |
| 4 | | | |
| 5 | | | import '../../exceptions/squadron_error.dart'; |
| 6 | | | import '../../exceptions/squadron_exception.dart'; |
| 7 | | | import '../../local_worker/local_worker.dart'; |
| 8 | | | import '../../service_installer.dart'; |
| 9 | | | import '../../tokens/_cancelation_token_ref.dart'; |
| 10 | | | import '../../tokens/_squadron_cancelation_token.dart'; |
| 11 | | | import '../../typedefs.dart'; |
| 12 | | | import '../../worker/worker_channel.dart'; |
| 13 | | | import '../../worker/worker_request.dart'; |
| 14 | | | import '../../worker_service.dart'; |
| 15 | | | import '_internal_logger.dart'; |
| 16 | | | |
| 17 | | | class WorkerRunner { |
| 18 | | | /// Constructs a new worker runner. |
| 19 | | 10 | WorkerRunner(this._terminate); |
| 20 | | | |
| 21 | | | void Function(WorkerRunner) _terminate; |
| 22 | | | |
| 23 | | | final internalLogger = InternalLogger(); |
| 24 | | | |
| 25 | | | WorkerService? _service; |
| 26 | | | OperationsMap? _operations; |
| 27 | | | |
| 28 | | | final _cancelTokens = <String, CancelationTokenReference>{}; |
| 29 | | | |
| 30 | | | bool _terminationRequested = false; |
| 31 | | | int _executing = 0; |
| 32 | | | |
| 33 | | | final _streamCancelers = <int, StreamCanceler>{}; |
| 34 | | | int _streamId = 0; |
| 35 | | | |
| 36 | | | void Function(OutputEvent)? _logForwarder; |
| 37 | | | |
| 38 | | | /// Constructs a new worker runner for a [localWorker]. |
| 39 | | 1 | factory WorkerRunner.use(LocalWorker localWorker) { |
| 40 | | 1 | final runner = WorkerRunner((r) { |
| 41 | | 0 | r.internalLogger.t('Terminating local Worker'); |
| 42 | | 0 | r._service = null; |
| 43 | | 0 | r._operations = null; |
| 44 | | | }); |
| 45 | | 1 | runner._service = localWorker; |
| 46 | | 2 | runner._operations = localWorker.operations; |
| 47 | | 1 | runner._checkOperations(); |
| 48 | | | return runner; |
| 49 | | | } |
| 50 | | | |
| 51 | | 10 | void _checkOperations() { |
| 52 | | 60 | final invalidKeys = _operations!.keys.where((k) => k <= 0).toList(); |
| 53 | | 10 | if (invalidKeys.isNotEmpty) { |
| 54 | | 2 | throw SquadronErrorImpl.create( |
| 55 | | 8 | 'Invalid command identifier${(invalidKeys.length > 1) ? 's' : ''} in service operations map: ${invalidKeys.join(', ')}. Command ids must be positive.', |
| 56 | | | ); |
| 57 | | | } |
| 58 | | | } |
| 59 | | | |
| 60 | | | /// Called by the platform worker upon startup, in response to a start |
| 61 | | | /// [WorkerRequest]. [channelInfo] is an opaque object sent back from the |
| 62 | | | /// platform worker to the Squadron [Worker] and used to communicate with the |
| 63 | | | /// platform worker. Typically, [channelInfo] would be a [SendPort] (native) |
| 64 | | | /// or a [MessagePort] (browser). [initializer] is called to build the |
| 65 | | | /// [WorkerService] associated to the worker. The runner's [_service] map |
| 66 | | | /// will be set with operations from the service. |
| 67 | | 10 | Future<void> connect(WorkerRequest? startRequest, PlatformChannel channelInfo, |
| 68 | | | WorkerInitializer initializer) async { |
| 69 | | | WorkerChannel? channel; |
| 70 | | | try { |
| 71 | | 20 | startRequest?.unwrapInPlace(internalLogger); |
| 72 | | 10 | channel = startRequest?.channel; |
| 73 | | | |
| 74 | | | if (startRequest == null) { |
| 75 | | 2 | throw SquadronErrorImpl.create('Missing connection request'); |
| 76 | | | } else if (channel == null) { |
| 77 | | 0 | throw SquadronErrorImpl.create('Missing client for connection request'); |
| 78 | | | } |
| 79 | | | |
| 80 | | 10 | if (_logForwarder == null) { |
| 81 | | 10 | final logger = channel.log; |
| 82 | | 40 | _logForwarder = (event) => logger(event.origin); |
| 83 | | 20 | Logger.addOutputListener(_logForwarder!); |
| 84 | | | } |
| 85 | | | |
| 86 | | 10 | if (!startRequest.isConnection) { |
| 87 | | 0 | throw SquadronErrorImpl.create('Connection request expected'); |
| 88 | | 20 | } else if (_service != null || _operations != null) { |
| 89 | | 0 | throw SquadronErrorImpl.create('Already connected'); |
| 90 | | | } |
| 91 | | | |
| 92 | | 20 | _service = await initializer(startRequest); |
| 93 | | 30 | _operations = _service!.operations; |
| 94 | | 10 | _checkOperations(); |
| 95 | | | |
| 96 | | 10 | channel.connect(channelInfo); |
| 97 | | | |
| 98 | | 20 | if (_service is ServiceInstaller) { |
| 99 | | 4 | _installCompleter = Completer() |
| 100 | | 4 | ..complete((() async { |
| 101 | | | try { |
| 102 | | 4 | await (_service as ServiceInstaller).install(); |
| 103 | | | } catch (ex, st) { |
| 104 | | 4 | internalLogger.e(() => 'Service installation failed: $ex'); |
| 105 | | 1 | channel?.error(ex, st); |
| 106 | | 1 | channel?.closeStream(); |
| 107 | | 2 | _installError = SquadronException.from(ex, st); |
| 108 | | | } |
| 109 | | 2 | })()); |
| 110 | | | } |
| 111 | | | } catch (ex, st) { |
| 112 | | 8 | internalLogger.e(() => 'Connection failed: $ex'); |
| 113 | | 2 | channel?.error(ex, st); |
| 114 | | 2 | _exit(); |
| 115 | | | } |
| 116 | | | } |
| 117 | | | |
| 118 | | | Completer<void>? _installCompleter; |
| 119 | | | SquadronException? _installError; |
| 120 | | | |
| 121 | | | /// [WorkerRequest] handler dispatching commands according to the |
| 122 | | | /// [_service] map. Make sure this method doesn't throw. |
| 123 | | 10 | void processRequest(WorkerRequest request) async { |
| 124 | | | WorkerChannel? channel; |
| 125 | | | try { |
| 126 | | 20 | request.unwrapInPlace(internalLogger); |
| 127 | | 10 | channel = request.channel; |
| 128 | | | |
| 129 | | 10 | if (request.isTermination) { |
| 130 | | | // terminate the worker |
| 131 | | 10 | return _shutdown(); |
| 132 | | | } |
| 133 | | | |
| 134 | | | // check installation result if necessary |
| 135 | | 12 | final pendingInstallation = _installCompleter?.future; |
| 136 | | | if (pendingInstallation != null) { |
| 137 | | | await pendingInstallation; |
| 138 | | 2 | _installCompleter = null; |
| 139 | | | } |
| 140 | | | |
| 141 | | 10 | if (_installError != null) { |
| 142 | | | // service installation failed |
| 143 | | 1 | throw _installError!; |
| 144 | | | } |
| 145 | | | |
| 146 | | | // ==== these requests do not send a response ==== |
| 147 | | | |
| 148 | | 10 | if (request.isTokenCancelation) { |
| 149 | | | // cancel a token |
| 150 | | 2 | final token = request.cancelToken!; |
| 151 | | 4 | return _getTokenRef(token).update(token); |
| 152 | | 10 | } else if (request.isStreamCancelation) { |
| 153 | | | // cancel a stream |
| 154 | | 12 | final canceler = _streamCancelers[request.streamId]; |
| 155 | | 3 | return canceler?.call(); |
| 156 | | | } |
| 157 | | | |
| 158 | | | // make sure the worker is connected |
| 159 | | | |
| 160 | | 10 | if (request.isConnection) { |
| 161 | | | // connection requests are handled by connect(). |
| 162 | | 0 | throw SquadronErrorImpl.create( |
| 163 | | 0 | 'Unexpected connection request: $request'); |
| 164 | | 10 | } else if (_operations == null) { |
| 165 | | | // commands are not available yet (maybe connect() wasn't called or awaited) |
| 166 | | 0 | throw SquadronErrorImpl.create('Worker service is not ready'); |
| 167 | | | } |
| 168 | | | |
| 169 | | | // ==== other requests require a client to send the response ==== |
| 170 | | | |
| 171 | | | if (channel == null) { |
| 172 | | 0 | throw SquadronErrorImpl.create('Missing client for request: $request'); |
| 173 | | | } |
| 174 | | | |
| 175 | | 10 | final token = request.cancelToken; |
| 176 | | 2 | token?.throwIfCanceled(); |
| 177 | | | |
| 178 | | | // start monitoring execution |
| 179 | | 10 | final tokenRef = _begin(request); |
| 180 | | | try { |
| 181 | | | // find the operation matching the request command |
| 182 | | 30 | final cmd = request.command, op = _operations![cmd]; |
| 183 | | | if (op == null) { |
| 184 | | 2 | throw SquadronErrorImpl.create('Unknown command: $cmd'); |
| 185 | | | } |
| 186 | | | |
| 187 | | | // process |
| 188 | | 10 | var result = op(request); |
| 189 | | 10 | if (result is Future) { |
| 190 | | | result = await result; |
| 191 | | | } |
| 192 | | | |
| 193 | | 10 | final reply = request.reply!; |
| 194 | | 16 | if (result is Stream && channel.canStream(result)) { |
| 195 | | | // result is a stream: forward data to the client |
| 196 | | 5 | final replyWithError = channel.error; |
| 197 | | 3 | void $postError(Object exception, [StackTrace? stackTrace]) { |
| 198 | | 3 | replyWithError(exception, stackTrace, cmd); |
| 199 | | | } |
| 200 | | | |
| 201 | | 5 | void post(data) { |
| 202 | | | try { |
| 203 | | 5 | reply(data); |
| 204 | | | } catch (ex, st) { |
| 205 | | 0 | $postError(ex, st); |
| 206 | | | } |
| 207 | | | } |
| 208 | | | |
| 209 | | 5 | await _pipe(result, channel, post, $postError, token); |
| 210 | | | } else { |
| 211 | | | // result is a value: send to the client |
| 212 | | 10 | reply(result); |
| 213 | | | } |
| 214 | | | } finally { |
| 215 | | | // stop monitoring execution |
| 216 | | 10 | _done(tokenRef); |
| 217 | | | } |
| 218 | | | } catch (ex, st) { |
| 219 | | | if (channel != null) { |
| 220 | | 6 | channel.error(ex, st, request.command); |
| 221 | | | } else { |
| 222 | | 3 | internalLogger.e('Unhandled error: $ex'); |
| 223 | | | } |
| 224 | | | } |
| 225 | | | } |
| 226 | | | |
| 227 | | 10 | CancelationTokenReference _getTokenRef(SquadronCancelationToken? token) => |
| 228 | | | (token == null) |
| 229 | | 10 | ? CancelationTokenReference.noToken |
| 230 | | 4 | : _cancelTokens.putIfAbsent( |
| 231 | | 8 | token.id, () => CancelationTokenReference(token.id)); |
| 232 | | | |
| 233 | | | /// Starts monitoring execution of this [request]. If the request contains a |
| 234 | | | /// cancelation token, it is overridden with a [CancelationTokenReference] |
| 235 | | | /// and this reference is returned to the sender. Otherwise, returns |
| 236 | | | /// [CancelationTokenReference.noToken]. |
| 237 | | 10 | CancelationTokenReference _begin(WorkerRequest request) { |
| 238 | | 20 | _executing++; |
| 239 | | 20 | final token = _getTokenRef(request.cancelToken); |
| 240 | | 10 | token.usedBy(request); |
| 241 | | | return token; |
| 242 | | | } |
| 243 | | | |
| 244 | | | /// Stops monitoring execution and releases the [tokenRef]. |
| 245 | | 10 | void _done(CancelationTokenReference tokenRef) { |
| 246 | | 10 | tokenRef.release(); |
| 247 | | 20 | if (tokenRef.refCount == 0) { |
| 248 | | 6 | _cancelTokens.remove(tokenRef.id); |
| 249 | | | } |
| 250 | | 20 | _executing--; |
| 251 | | 14 | if (_terminationRequested && _executing == 0) { |
| 252 | | 2 | _unmount(); |
| 253 | | | } |
| 254 | | | } |
| 255 | | | |
| 256 | | | /// Forwards stream events to client. |
| 257 | | 5 | Future<void> _pipe( |
| 258 | | | Stream<dynamic> stream, |
| 259 | | | WorkerChannel channel, |
| 260 | | | void Function(dynamic) post, |
| 261 | | | void Function(Object exception, [StackTrace? stackTrace]) postError, |
| 262 | | | SquadronCancelationToken? token, |
| 263 | | | ) { |
| 264 | | | late final StreamSubscription subscription; |
| 265 | | 5 | final done = Completer(); |
| 266 | | | |
| 267 | | | late final int streamId; |
| 268 | | | |
| 269 | | | // send endOfStream to client |
| 270 | | 5 | Future<void> onDone() async { |
| 271 | | 5 | _unregisterStreamCanceler(streamId); |
| 272 | | 5 | channel.closeStream(); |
| 273 | | 5 | await subscription.cancel(); |
| 274 | | 5 | done.complete(); |
| 275 | | | } |
| 276 | | | |
| 277 | | | final bool Function() checkToken; |
| 278 | | | if (token == null) { |
| 279 | | 5 | checkToken = () => true; |
| 280 | | | } else { |
| 281 | | 2 | checkToken = () { |
| 282 | | 2 | final ex = token.exception; |
| 283 | | | if (ex != null) { |
| 284 | | 0 | postError(ex); |
| 285 | | 0 | onDone(); |
| 286 | | | } |
| 287 | | | return (ex == null); |
| 288 | | | }; |
| 289 | | | } |
| 290 | | | |
| 291 | | | // register stream canceler callback and connect stream with client |
| 292 | | 5 | streamId = _registerStreamCanceler(onDone); |
| 293 | | 5 | post(streamId); |
| 294 | | 5 | if (checkToken()) { |
| 295 | | | // start forwarding messages to the client |
| 296 | | 5 | subscription = stream.listen( |
| 297 | | 5 | (data) { |
| 298 | | 10 | if (checkToken()) post(data); |
| 299 | | | }, |
| 300 | | 3 | onError: (ex, st) { |
| 301 | | 6 | if (checkToken()) postError(ex, st); |
| 302 | | | }, |
| 303 | | | onDone: onDone, |
| 304 | | | cancelOnError: false, |
| 305 | | | ); |
| 306 | | | } |
| 307 | | | |
| 308 | | 5 | return done.future; |
| 309 | | | } |
| 310 | | | |
| 311 | | | /// Assigns a stream ID to the stream canceler callback and registers the |
| 312 | | | /// callback. |
| 313 | | 5 | int _registerStreamCanceler(StreamCanceler canceler) { |
| 314 | | 10 | final streamId = ++_streamId; |
| 315 | | 10 | _streamCancelers[streamId] = canceler; |
| 316 | | | return streamId; |
| 317 | | | } |
| 318 | | | |
| 319 | | | /// Unregisters the stream canceled callback associated to the [streamId]. |
| 320 | | 5 | void _unregisterStreamCanceler(int streamId) { |
| 321 | | 10 | _streamCancelers.remove(streamId); |
| 322 | | | } |
| 323 | | | |
| 324 | | | /// Terminates the worker if there is no pending execution. Otherwise, marks |
| 325 | | | /// the worker as terminating and termination will be effective when all |
| 326 | | | /// pending executions have completed. |
| 327 | | 10 | void _shutdown() { |
| 328 | | 10 | _terminationRequested = true; |
| 329 | | 20 | if (_executing == 0) { |
| 330 | | 10 | _unmount(); |
| 331 | | | } |
| 332 | | | } |
| 333 | | | |
| 334 | | | // should not throw |
| 335 | | 10 | void _unmount() async { |
| 336 | | | try { |
| 337 | | | // uninstall the service if necessary |
| 338 | | 20 | if (_service is ServiceInstaller) { |
| 339 | | | // check installation result |
| 340 | | 2 | final pendingInstallation = _installCompleter?.future; |
| 341 | | | if (pendingInstallation != null) { |
| 342 | | | await pendingInstallation; |
| 343 | | 0 | _installCompleter = null; |
| 344 | | | } |
| 345 | | 2 | if (_installError == null) { |
| 346 | | | // uninstall iif the service installed succesfuly |
| 347 | | 4 | await (_service as ServiceInstaller).uninstall(); |
| 348 | | | } |
| 349 | | | } |
| 350 | | | } catch (ex) { |
| 351 | | 3 | internalLogger.e('Service uninstallation failed with error: $ex'); |
| 352 | | | } finally { |
| 353 | | 10 | _exit(); |
| 354 | | | } |
| 355 | | | } |
| 356 | | | |
| 357 | | | // should not throw |
| 358 | | 10 | void _exit() { |
| 359 | | | try { |
| 360 | | 20 | _terminate(this); |
| 361 | | | } catch (ex) { |
| 362 | | 0 | internalLogger.e('Worker termination failed with error: $ex'); |
| 363 | | | } |
| 364 | | 10 | if (_logForwarder != null) { |
| 365 | | 20 | Logger.removeOutputListener(_logForwarder!); |
| 366 | | | } |
| 367 | | | } |
| 368 | | | } |