| 1 | | | import 'dart:async'; |
| 2 | | | |
| 3 | | | import 'package:cancelation_token/cancelation_token.dart'; |
| 4 | | | import 'package:logger/web.dart'; |
| 5 | | | import 'package:using/using.dart'; |
| 6 | | | |
| 7 | | | import '../_impl/xplat/_forward_completer.dart'; |
| 8 | | | import '../_impl/xplat/_forward_stream_controller.dart'; |
| 9 | | | import '../_impl/xplat/_time_stamp.dart'; |
| 10 | | | import '../channel.dart'; |
| 11 | | | import '../exceptions/exception_manager.dart'; |
| 12 | | | import '../exceptions/squadron_exception.dart'; |
| 13 | | | import '../exceptions/task_terminated_exception.dart'; |
| 14 | | | import '../exceptions/worker_exception.dart'; |
| 15 | | | import '../invoker.dart'; |
| 16 | | | import '../iworker.dart'; |
| 17 | | | import '../stats/worker_stat.dart'; |
| 18 | | | import '../tokens/_squadron_cancelation_token.dart'; |
| 19 | | | import '../typedefs.dart'; |
| 20 | | | import '../worker/worker_request.dart'; |
| 21 | | | import '../worker_service.dart'; |
| 22 | | | |
| 23 | | | part 'worker.stats.dart'; |
| 24 | | | |
| 25 | | | /// Base worker class. |
| 26 | | | /// |
| 27 | | | /// This base class takes care of creating the [Channel] and firing up the |
| 28 | | | /// worker. Typically, derived classes should add proxy methods sending |
| 29 | | | /// [WorkerRequest]s to the worker. |
| 30 | | | abstract class Worker |
| 31 | | | with Releasable |
| 32 | | | implements WorkerService, IWorker, Invoker { |
| 33 | | | /// Creates a [Worker] with the specified entrypoint. |
| 34 | | 11 | Worker(this._entryPoint, |
| 35 | | | {PlatformThreadHook? threadHook, ExceptionManager? exceptionManager}) |
| 36 | | | : _threadHook = threadHook, |
| 37 | | | _exceptionManager = exceptionManager; |
| 38 | | | |
| 39 | | 11 | @override |
| 40 | | | void release() { |
| 41 | | 11 | stop(); |
| 42 | | 11 | super.release(); |
| 43 | | | } |
| 44 | | | |
| 45 | | | /// The [Worker]'s entry point; typically, a top-level function in native |
| 46 | | | /// world or the Uri to a JavaScript file in browser world. |
| 47 | | | final EntryPoint _entryPoint; |
| 48 | | | |
| 49 | | | @override |
| 50 | | | Logger? channelLogger; |
| 51 | | | |
| 52 | | 11 | @override |
| 53 | | | ExceptionManager get exceptionManager => |
| 54 | | 21 | (_exceptionManager ??= ExceptionManager()); |
| 55 | | | ExceptionManager? _exceptionManager; |
| 56 | | | |
| 57 | | | final PlatformThreadHook? _threadHook; |
| 58 | | | |
| 59 | | | /// The [Worker]'s start arguments. |
| 60 | | | List? getStartArgs(); |
| 61 | | | |
| 62 | | 22 | late final _stats = _Stats(this); |
| 63 | | | |
| 64 | | 33 | bool get isStopped => _stats.isStopped; |
| 65 | | | |
| 66 | | | /// [Worker] statistics. |
| 67 | | 0 | @Deprecated('Use getStats()') |
| 68 | | 0 | WorkerStat get stats => _stats.snapshot; |
| 69 | | | |
| 70 | | | /// [Worker] statistics. |
| 71 | | 18 | WorkerStat getStats() => _stats.snapshot; |
| 72 | | | |
| 73 | | | /// Returns true if the [Worker] is connected i.e., it has a valid [Channel]. |
| 74 | | | /// Returns false otherwise. |
| 75 | | 2 | bool get isConnected => _channel != null; |
| 76 | | | |
| 77 | | | /// Shared [Channel] that can be used to communicate with the worker. |
| 78 | | 6 | Channel? getSharedChannel() => _channel?.share(); |
| 79 | | | |
| 80 | | | Channel? _channel; |
| 81 | | | Future<Channel>? _openChannel; |
| 82 | | | |
| 83 | | | /// Sends a workload to the worker. |
| 84 | | 10 | @override |
| 85 | | | Future<dynamic> send( |
| 86 | | | int command, { |
| 87 | | | List args = const [], |
| 88 | | | CancelationToken? token, |
| 89 | | | bool inspectRequest = false, |
| 90 | | | bool inspectResponse = false, |
| 91 | | | }) async { |
| 92 | | 0 | token?.throwIfCanceled(); |
| 93 | | | |
| 94 | | | // get the channel, start the worker if necessary |
| 95 | | 18 | final channel = _channel ?? await start(); |
| 96 | | | |
| 97 | | 10 | final completer = ForwardCompleter(); |
| 98 | | | |
| 99 | | 0 | final squadronToken = token?.wrap(); |
| 100 | | 0 | squadronToken?.onCanceled.then((ex) { |
| 101 | | 0 | _channel?.cancelToken(squadronToken); |
| 102 | | 0 | completer.failure(SquadronException.from(ex, null, command)); |
| 103 | | | }); |
| 104 | | | |
| 105 | | 20 | _stats.beginWork(); |
| 106 | | 50 | completer.future.whenComplete(_stats.endWork).ignore(); |
| 107 | | | |
| 108 | | | try { |
| 109 | | 10 | final res = await channel.sendRequest( |
| 110 | | | command, |
| 111 | | | args, |
| 112 | | | token: squadronToken, |
| 113 | | | inspectRequest: inspectRequest, |
| 114 | | | inspectResponse: inspectResponse, |
| 115 | | | ); |
| 116 | | 10 | completer.success(res); |
| 117 | | | } catch (ex, st) { |
| 118 | | 6 | _stats.failed(); |
| 119 | | 6 | completer.failure(SquadronException.from(ex, st, command)); |
| 120 | | | } |
| 121 | | | |
| 122 | | 10 | return completer.future; |
| 123 | | | } |
| 124 | | | |
| 125 | | | /// Sends a streaming workload to the worker. |
| 126 | | 5 | @override |
| 127 | | | Stream<dynamic> stream( |
| 128 | | | int command, { |
| 129 | | | List args = const [], |
| 130 | | | CancelationToken? token, |
| 131 | | | bool inspectRequest = false, |
| 132 | | | bool inspectResponse = false, |
| 133 | | | }) { |
| 134 | | 2 | final squadronToken = token?.wrap(); |
| 135 | | | |
| 136 | | | late final ForwardStreamController controller; |
| 137 | | | |
| 138 | | 6 | squadronToken?.onCanceled.then((ex) { |
| 139 | | 2 | if (!controller.isClosed) { |
| 140 | | 4 | controller.subscription?.cancel(); |
| 141 | | 4 | controller.addError(SquadronException.from(ex, null, command)); |
| 142 | | 2 | controller.close(); |
| 143 | | | } |
| 144 | | 4 | _channel?.cancelToken(squadronToken); |
| 145 | | | }); |
| 146 | | | |
| 147 | | 10 | controller = ForwardStreamController(onListen: () async { |
| 148 | | | try { |
| 149 | | 5 | if (controller.isClosed) return; |
| 150 | | 2 | squadronToken?.throwIfCanceled(); |
| 151 | | 8 | final channel = _channel ?? await start(); |
| 152 | | 5 | if (controller.isClosed) return; |
| 153 | | | |
| 154 | | 10 | _stats.beginWork(); |
| 155 | | 25 | controller.done.whenComplete(_stats.endWork).ignore(); |
| 156 | | | |
| 157 | | 5 | controller.attachSubscription(channel |
| 158 | | 5 | .sendStreamingRequest( |
| 159 | | | command, |
| 160 | | | args, |
| 161 | | | token: squadronToken, |
| 162 | | | inspectRequest: inspectRequest, |
| 163 | | | inspectResponse: inspectResponse, |
| 164 | | | ) |
| 165 | | 5 | .listen( |
| 166 | | 5 | controller.add, |
| 167 | | 3 | onError: (ex, st) => |
| 168 | | 6 | controller.addError(SquadronException.from(ex, st, command)), |
| 169 | | 5 | onDone: controller.close, |
| 170 | | | cancelOnError: false, |
| 171 | | | )); |
| 172 | | | } catch (ex, st) { |
| 173 | | 2 | _stats.failed(); |
| 174 | | 1 | controller.subscription?.cancel(); |
| 175 | | 2 | controller.addError(SquadronException.from(ex, st, command)); |
| 176 | | 1 | controller.close(); |
| 177 | | | } |
| 178 | | | }); |
| 179 | | | |
| 180 | | 5 | return controller.stream; |
| 181 | | | } |
| 182 | | | |
| 183 | | | /// Creates a [Channel] and starts the worker using the [_entryPoint]. |
| 184 | | 11 | @override |
| 185 | | | Future<Channel> start() async { |
| 186 | | 11 | if (isStopped) { |
| 187 | | 1 | throw WorkerException('Invalid state: worker is stopped'); |
| 188 | | | } |
| 189 | | | |
| 190 | | 11 | final args = getStartArgs() ?? const []; |
| 191 | | 22 | _openChannel ??= Channel.open( |
| 192 | | 44 | exceptionManager, channelLogger, _entryPoint, args, _threadHook); |
| 193 | | 22 | final channel = _channel ?? await _openChannel; |
| 194 | | 10 | if (_channel == null) { |
| 195 | | 10 | _channel = channel; |
| 196 | | 20 | _stats.start(); |
| 197 | | | } |
| 198 | | 10 | return _channel!; |
| 199 | | | } |
| 200 | | | |
| 201 | | | /// Stops this worker. |
| 202 | | 11 | @override |
| 203 | | | void stop() { |
| 204 | | 11 | if (!isStopped) { |
| 205 | | 14 | channelLogger?.d('Stop worker'); |
| 206 | | 22 | _stats.stop(); |
| 207 | | 11 | _openChannel = null; |
| 208 | | 21 | _channel?.close(); |
| 209 | | 11 | _channel = null; |
| 210 | | | } |
| 211 | | | } |
| 212 | | | |
| 213 | | | /// Terminates this worker. |
| 214 | | 2 | @override |
| 215 | | | void terminate([TaskTerminatedException? ex]) { |
| 216 | | | // terminate channel and stop worker |
| 217 | | 2 | ex ??= TaskTerminatedException('Worker has been terminated'); |
| 218 | | 4 | _channel?.terminate(ex); |
| 219 | | 2 | stop(); |
| 220 | | | } |
| 221 | | | |
| 222 | | | /// Workers do not need an [operations] map. |
| 223 | | 0 | @override |
| 224 | | 0 | OperationsMap get operations => WorkerService.noOperations; |
| 225 | | | } |