1 | | | import 'dart:async'; |
2 | | |
|
3 | | | import 'package:cancelation_token/cancelation_token.dart'; |
4 | | | import 'package:logger/web.dart'; |
5 | | | import 'package:using/using.dart'; |
6 | | |
|
7 | | | import '../_impl/xplat/_forward_completer.dart'; |
8 | | | import '../_impl/xplat/_forward_stream_controller.dart'; |
9 | | | import '../_impl/xplat/_time_stamp.dart'; |
10 | | | import '../channel.dart'; |
11 | | | import '../exceptions/exception_manager.dart'; |
12 | | | import '../exceptions/squadron_exception.dart'; |
13 | | | import '../exceptions/task_terminated_exception.dart'; |
14 | | | import '../exceptions/worker_exception.dart'; |
15 | | | import '../invoker.dart'; |
16 | | | import '../iworker.dart'; |
17 | | | import '../stats/worker_stat.dart'; |
18 | | | import '../tokens/_squadron_cancelation_token.dart'; |
19 | | | import '../typedefs.dart'; |
20 | | | import '../worker/worker_request.dart'; |
21 | | | import '../worker_service.dart'; |
22 | | |
|
23 | | | part 'worker.stats.dart'; |
24 | | |
|
25 | | | /// Base worker class. |
26 | | | /// |
27 | | | /// This base class takes care of creating the [Channel] and firing up the |
28 | | | /// worker. Typically, derived classes should add proxy methods sending |
29 | | | /// [WorkerRequest]s to the worker. |
30 | | | abstract class Worker |
31 | | | with Releasable |
32 | | | implements WorkerService, IWorker, Invoker { |
33 | | | /// Creates a [Worker] with the specified entrypoint. |
34 | | 11 | Worker(this._entryPoint, |
35 | | | {PlatformThreadHook? threadHook, ExceptionManager? exceptionManager}) |
36 | | | : _threadHook = threadHook, |
37 | | | _exceptionManager = exceptionManager; |
38 | | |
|
39 | | 11 | @override |
40 | | | void release() { |
41 | | 11 | stop(); |
42 | | 11 | super.release(); |
43 | | | } |
44 | | |
|
45 | | | /// The [Worker]'s entry point; typically, a top-level function in native |
46 | | | /// world or the Uri to a JavaScript file in browser world. |
47 | | | final EntryPoint _entryPoint; |
48 | | |
|
49 | | | @override |
50 | | | Logger? channelLogger; |
51 | | |
|
52 | | 11 | @override |
53 | | | ExceptionManager get exceptionManager => |
54 | | 21 | (_exceptionManager ??= ExceptionManager()); |
55 | | | ExceptionManager? _exceptionManager; |
56 | | |
|
57 | | | final PlatformThreadHook? _threadHook; |
58 | | |
|
59 | | | /// The [Worker]'s start arguments. |
60 | | | List? getStartArgs(); |
61 | | |
|
62 | | | /// [Worker] statistics. |
63 | | 33 | WorkerStat get stats => _stats.snapshot; |
64 | | |
|
65 | | 22 | late final _stats = _Stats(this); |
66 | | |
|
67 | | | /// Returns true if the [Worker] is connected i.e., it has a valid [Channel]. |
68 | | | /// Returns false otherwise. |
69 | | 2 | bool get isConnected => _channel != null; |
70 | | |
|
71 | | | /// Shared [Channel] that can be used to communicate with the worker. |
72 | | 6 | Channel? getSharedChannel() => _channel?.share(); |
73 | | |
|
74 | | | Channel? _channel; |
75 | | | Future<Channel>? _openChannel; |
76 | | |
|
77 | | | /// Sends a workload to the worker. |
78 | | 10 | @override |
79 | | | Future<dynamic> send( |
80 | | | int command, { |
81 | | | List args = const [], |
82 | | | CancelationToken? token, |
83 | | | bool inspectRequest = false, |
84 | | | bool inspectResponse = false, |
85 | | | }) async { |
86 | | 0 | token?.throwIfCanceled(); |
87 | | |
|
88 | | | // get the channel, start the worker if necessary |
89 | | 18 | final channel = _channel ?? await start(); |
90 | | |
|
91 | | 10 | final completer = ForwardCompleter(); |
92 | | |
|
93 | | 0 | final squadronToken = token?.wrap(); |
94 | | 0 | squadronToken?.onCanceled.then((ex) { |
95 | | 0 | _channel?.cancelToken(squadronToken); |
96 | | 0 | completer.failure(SquadronException.from(ex, null, command)); |
97 | | | }); |
98 | | |
|
99 | | 20 | _stats.beginWork(); |
100 | | 50 | completer.future.whenComplete(_stats.endWork).ignore(); |
101 | | |
|
102 | | | try { |
103 | | 10 | final res = await channel.sendRequest( |
104 | | | command, |
105 | | | args, |
106 | | | token: squadronToken, |
107 | | | inspectRequest: inspectRequest, |
108 | | | inspectResponse: inspectResponse, |
109 | | | ); |
110 | | 10 | completer.success(res); |
111 | | | } catch (ex, st) { |
112 | | 6 | _stats.failed(); |
113 | | 6 | completer.failure(SquadronException.from(ex, st, command)); |
114 | | | } |
115 | | |
|
116 | | 10 | return completer.future; |
117 | | | } |
118 | | |
|
119 | | | /// Sends a streaming workload to the worker. |
120 | | 5 | @override |
121 | | | Stream<dynamic> stream( |
122 | | | int command, { |
123 | | | List args = const [], |
124 | | | CancelationToken? token, |
125 | | | bool inspectRequest = false, |
126 | | | bool inspectResponse = false, |
127 | | | }) { |
128 | | 2 | final squadronToken = token?.wrap(); |
129 | | |
|
130 | | | late final ForwardStreamController controller; |
131 | | |
|
132 | | 6 | squadronToken?.onCanceled.then((ex) { |
133 | | 2 | if (!controller.isClosed) { |
134 | | 4 | controller.subscription?.cancel(); |
135 | | 4 | controller.addError(SquadronException.from(ex, null, command)); |
136 | | 2 | controller.close(); |
137 | | | } |
138 | | 4 | _channel?.cancelToken(squadronToken); |
139 | | | }); |
140 | | |
|
141 | | 10 | controller = ForwardStreamController(onListen: () async { |
142 | | | try { |
143 | | 5 | if (controller.isClosed) return; |
144 | | 2 | squadronToken?.throwIfCanceled(); |
145 | | 8 | final channel = _channel ?? await start(); |
146 | | 5 | if (controller.isClosed) return; |
147 | | |
|
148 | | 10 | _stats.beginWork(); |
149 | | 25 | controller.done.whenComplete(_stats.endWork).ignore(); |
150 | | |
|
151 | | 5 | controller.attachSubscription(channel |
152 | | 5 | .sendStreamingRequest( |
153 | | | command, |
154 | | | args, |
155 | | | token: squadronToken, |
156 | | | inspectRequest: inspectRequest, |
157 | | | inspectResponse: inspectResponse, |
158 | | | ) |
159 | | 5 | .listen( |
160 | | 5 | controller.add, |
161 | | 3 | onError: (ex, st) => |
162 | | 6 | controller.addError(SquadronException.from(ex, st, command)), |
163 | | 5 | onDone: controller.close, |
164 | | | cancelOnError: false, |
165 | | | )); |
166 | | | } catch (ex, st) { |
167 | | 2 | _stats.failed(); |
168 | | 1 | controller.subscription?.cancel(); |
169 | | 2 | controller.addError(SquadronException.from(ex, st, command)); |
170 | | 1 | controller.close(); |
171 | | | } |
172 | | | }); |
173 | | |
|
174 | | 5 | return controller.stream; |
175 | | | } |
176 | | |
|
177 | | | /// Creates a [Channel] and starts the worker using the [_entryPoint]. |
178 | | 11 | @override |
179 | | | Future<Channel> start() async { |
180 | | 22 | if (stats.isStopped) { |
181 | | 1 | throw WorkerException('Invalid state: worker is stopped'); |
182 | | | } |
183 | | |
|
184 | | 11 | final args = getStartArgs() ?? const []; |
185 | | 22 | _openChannel ??= Channel.open( |
186 | | 44 | exceptionManager, channelLogger, _entryPoint, args, _threadHook); |
187 | | 22 | final channel = _channel ?? await _openChannel; |
188 | | 10 | if (_channel == null) { |
189 | | 10 | _channel = channel; |
190 | | 20 | _stats.start(); |
191 | | | } |
192 | | 10 | return _channel!; |
193 | | | } |
194 | | |
|
195 | | | /// Stops this worker. |
196 | | 11 | @override |
197 | | | void stop() { |
198 | | 22 | if (!_stats.isStopped) { |
199 | | 14 | channelLogger?.d('Stop worker'); |
200 | | 22 | _stats.stop(); |
201 | | 11 | _openChannel = null; |
202 | | 21 | _channel?.close(); |
203 | | 11 | _channel = null; |
204 | | | } |
205 | | | } |
206 | | |
|
207 | | | /// Terminates this worker. |
208 | | 2 | @override |
209 | | | void terminate([TaskTerminatedException? ex]) { |
210 | | | // terminate channel and stop worker |
211 | | 2 | ex ??= TaskTerminatedException('Worker has been terminated'); |
212 | | 3 | _channel?.terminate(ex); |
213 | | 2 | stop(); |
214 | | | } |
215 | | |
|
216 | | | /// Workers do not need an [operations] map. |
217 | | 0 | @override |
218 | | 0 | OperationsMap get operations => WorkerService.noOperations; |
219 | | | } |