1 | | | import 'dart:async'; |
2 | | |
|
3 | | | import 'package:cancelation_token/cancelation_token.dart'; |
4 | | | import 'package:logger/web.dart'; |
5 | | | import 'package:using/using.dart'; |
6 | | |
|
7 | | | import '../_impl/xplat/_forward_completer.dart'; |
8 | | | import '../_impl/xplat/_forward_stream_controller.dart'; |
9 | | | import '../_impl/xplat/_time_stamp.dart'; |
10 | | | import '../channel.dart'; |
11 | | | import '../exceptions/exception_manager.dart'; |
12 | | | import '../exceptions/squadron_exception.dart'; |
13 | | | import '../exceptions/worker_exception.dart'; |
14 | | | import '../iworker.dart'; |
15 | | | import '../stats/worker_stat.dart'; |
16 | | | import '../tokens/_squadron_cancelation_token.dart'; |
17 | | | import '../typedefs.dart'; |
18 | | | import '../worker/worker_request.dart'; |
19 | | | import '../worker_service.dart'; |
20 | | |
|
21 | | | /// Base worker class. |
22 | | | /// |
23 | | | /// This base class takes care of creating the [Channel] and firing up the |
24 | | | /// worker. Typically, derived classes should add proxy methods sending |
25 | | | /// [WorkerRequest]s to the worker. |
26 | | | abstract class Worker with Releasable implements WorkerService, IWorker { |
27 | | | /// Creates a [Worker] with the specified entrypoint. |
28 | | 10 | Worker(this._entryPoint, |
29 | | 3 | {this.args = const [], |
30 | | | PlatformThreadHook? threadHook, |
31 | | | ExceptionManager? exceptionManager}) |
32 | | 1 | : _threadHook = threadHook, |
33 | | | _exceptionManager = exceptionManager; |
34 | | |
|
35 | | 6 | @override |
36 | | 3 | void release() { |
37 | | 10 | stop(); |
38 | | 12 | super.release(); |
39 | | 3 | } |
40 | | |
|
41 | | | /// The [Worker]'s entry point; typically, a top-level function in native |
42 | | | /// world or the Uri to a JavaScript file in browser world. |
43 | | | final EntryPoint _entryPoint; |
44 | | |
|
45 | | | @override |
46 | | | Logger? channelLogger; |
47 | | |
|
48 | | 10 | @override |
49 | | 1 | ExceptionManager get exceptionManager => |
50 | | 29 | (_exceptionManager ??= ExceptionManager()); |
51 | | | ExceptionManager? _exceptionManager; |
52 | | |
|
53 | | | final PlatformThreadHook? _threadHook; |
54 | | |
|
55 | | | /// The [Worker]'s start arguments. |
56 | | | final List args; |
57 | | |
|
58 | | | /// Start timestamp (in microseconds since Epoch). |
59 | | | int? _started; |
60 | | |
|
61 | | | /// Stopped timestamp (in microseconds since Epoch). |
62 | | | int? _stopped; |
63 | | |
|
64 | | | /// Current workload. |
65 | | 11 | int get workload => _workload; |
66 | | | int _workload = 0; |
67 | | |
|
68 | | | /// Maximum acceptable workload. |
69 | | 12 | int get maxWorkload => _maxWorkload; |
70 | | | int _maxWorkload = 0; |
71 | | |
|
72 | | | /// Total processed workload. |
73 | | 11 | int get totalWorkload => _totalWorkload; |
74 | | | int _totalWorkload = 0; |
75 | | |
|
76 | | | /// Total errors. |
77 | | 12 | int get totalErrors => _totalErrors; |
78 | | | int _totalErrors = 0; |
79 | | |
|
80 | | | /// Up time. |
81 | | 15 | Duration get upTime => (_started == null) |
82 | | 1 | ? Duration.zero |
83 | | 27 | : Duration(microseconds: (_stopped ?? microsecTimeStamp()) - _started!); |
84 | | |
|
85 | | | /// Idle time. |
86 | | 25 | Duration get idleTime => (_workload > 0 || _idle == null) |
87 | | 2 | ? Duration.zero |
88 | | 25 | : Duration(microseconds: microsecTimeStamp() - _idle!); |
89 | | | int? _idle; |
90 | | |
|
91 | | | /// Indicates if the [Worker] has been stopped. |
92 | | 15 | bool get isStopped => _stopped != null; |
93 | | |
|
94 | | | /// [Worker] status. |
95 | | 10 | String get status { |
96 | | 10 | if (isStopped) { |
97 | | 4 | return 'STOPPED'; |
98 | | 6 | } else if (_workload == 0) { |
99 | | 2 | return 'IDLE'; |
100 | | | } else { |
101 | | 0 | return 'WORKING($_workload)'; |
102 | | | } |
103 | | 5 | } |
104 | | |
|
105 | | | /// [Worker] statistics. |
106 | | 15 | WorkerStat get stats => WorkerStatExt.create( |
107 | | 5 | runtimeType, |
108 | | 5 | hashCode, |
109 | | 5 | isStopped, |
110 | | 10 | status, |
111 | | 5 | workload, |
112 | | 5 | maxWorkload, |
113 | | 5 | totalWorkload, |
114 | | 5 | totalErrors, |
115 | | 6 | upTime, |
116 | | 7 | idleTime, |
117 | | 5 | ); |
118 | | |
|
119 | | | /// Returns true if the [Worker] is connected i.e., it has a valid [Channel]. |
120 | | | /// Returns false otherwise. |
121 | | 3 | bool get isConnected => _channel != null; |
122 | | |
|
123 | | | /// Shared [Channel] that can be used to communicate with the worker. |
124 | | 8 | Channel? getSharedChannel() => _channel?.share(); |
125 | | |
|
126 | | | Channel? _channel; |
127 | | 10 | Future<Channel>? _openChannel; |
128 | | |
|
129 | | 13 | void _enter() { |
130 | | 18 | _workload++; |
131 | | 36 | if (_workload > _maxWorkload) { |
132 | | 27 | _maxWorkload = _workload; |
133 | | | } |
134 | | 4 | } |
135 | | |
|
136 | | 14 | void _leave() { |
137 | | 18 | _workload--; |
138 | | 18 | _totalWorkload++; |
139 | | 27 | _idle = microsecTimeStamp(); |
140 | | 5 | } |
141 | | |
|
142 | | | /// Sends a workload to the worker. |
143 | | 16 | Future<dynamic> send( |
144 | | | int command, { |
145 | | | List args = const [], |
146 | | | CancelationToken? token, |
147 | | | bool inspectRequest = false, |
148 | | | bool inspectResponse = false, |
149 | | | }) async { |
150 | | 8 | token?.throwIfCanceled(); |
151 | | |
|
152 | | | // get the channel, start the worker if necessary |
153 | | 19 | final channel = _channel ?? await start(); |
154 | | |
|
155 | | 8 | final completer = ForwardCompleter(); |
156 | | |
|
157 | | 0 | final squadronToken = token?.wrap(); |
158 | | 0 | squadronToken?.onCanceled.then((ex) { |
159 | | 0 | _channel?.cancelToken(squadronToken); |
160 | | 0 | completer.failure(SquadronException.from(ex, null, command)); |
161 | | 0 | }); |
162 | | |
|
163 | | 12 | _enter(); |
164 | | 3 | try { |
165 | | 16 | final res = await channel.sendRequest( |
166 | | | command, |
167 | | | args, |
168 | | | token: squadronToken, |
169 | | | inspectRequest: inspectRequest, |
170 | | | inspectResponse: inspectResponse, |
171 | | | ); |
172 | | 8 | completer.success(res); |
173 | | 3 | } catch (ex, st) { |
174 | | 4 | _totalErrors++; |
175 | | 7 | completer.failure(SquadronException.from(ex, st, command)); |
176 | | | } finally { |
177 | | 12 | _leave(); |
178 | | | } |
179 | | |
|
180 | | 16 | return completer.future; |
181 | | 8 | } |
182 | | |
|
183 | | | /// Sends a streaming workload to the worker. |
184 | | 10 | Stream<dynamic> stream( |
185 | | | int command, { |
186 | | | List args = const [], |
187 | | | CancelationToken? token, |
188 | | | bool inspectRequest = false, |
189 | | | bool inspectResponse = false, |
190 | | | }) { |
191 | | 4 | final squadronToken = token?.wrap(); |
192 | | |
|
193 | | 5 | late final ForwardStreamController controller; |
194 | | |
|
195 | | 8 | squadronToken?.onCanceled.then((ex) { |
196 | | 4 | if (!controller.isClosed) { |
197 | | 6 | controller.subscription?.cancel(); |
198 | | 6 | controller.addError(SquadronException.from(ex, null, command)); |
199 | | 4 | controller.close(); |
200 | | | } |
201 | | 6 | _channel?.cancelToken(squadronToken); |
202 | | 2 | }); |
203 | | |
|
204 | | 15 | controller = ForwardStreamController(onListen: () async { |
205 | | 5 | try { |
206 | | 10 | if (controller.isClosed) return; |
207 | | 7 | squadronToken?.throwIfCanceled(); |
208 | | 11 | final channel = _channel ?? await start(); |
209 | | 5 | if (controller.isClosed) return; |
210 | | 9 | _enter(); |
211 | | 10 | controller.attachSubscription(channel |
212 | | 5 | .sendStreamingRequest( |
213 | | 5 | command, |
214 | | 5 | args, |
215 | | | token: squadronToken, |
216 | | 5 | inspectRequest: inspectRequest, |
217 | | 5 | inspectResponse: inspectResponse, |
218 | | | ) |
219 | | 10 | .listen( |
220 | | 10 | controller.add, |
221 | | 7 | onError: (ex, st) => |
222 | | 10 | controller.addError(SquadronException.from(ex, st, command)), |
223 | | 10 | onDone: controller.close, |
224 | | | cancelOnError: false, |
225 | | | )); |
226 | | 20 | controller.done.whenComplete(_leave); |
227 | | 1 | } catch (ex, st) { |
228 | | 3 | _totalErrors++; |
229 | | 3 | controller.addError(SquadronException.from(ex, st, command)); |
230 | | 2 | controller.close(); |
231 | | | } |
232 | | 5 | }); |
233 | | |
|
234 | | 10 | return controller.stream; |
235 | | 5 | } |
236 | | |
|
237 | | | /// Creates a [Channel] and starts the worker using the [_entryPoint]. |
238 | | 10 | @override |
239 | | 10 | Future<Channel> start() async { |
240 | | 20 | if (_stopped != null) { |
241 | | 2 | throw WorkerException('Invalid state: worker is stopped'); |
242 | | | } |
243 | | 30 | _openChannel ??= Channel.open( |
244 | | 60 | exceptionManager, channelLogger, _entryPoint, args, _threadHook); |
245 | | 30 | final channel = _channel ?? await _openChannel; |
246 | | 18 | if (_channel == null) { |
247 | | 18 | _channel = channel; |
248 | | 27 | _started = microsecTimeStamp(); |
249 | | 23 | _idle = _started; |
250 | | | } |
251 | | 18 | return _channel!; |
252 | | 10 | } |
253 | | |
|
254 | | | /// Stops this worker. |
255 | | 10 | @override |
256 | | 5 | void stop() { |
257 | | 20 | if (_stopped == null) { |
258 | | 30 | _stopped = microsecTimeStamp(); |
259 | | 20 | _openChannel = null; |
260 | | 29 | _channel?.close(); |
261 | | 20 | _channel = null; |
262 | | | } |
263 | | 5 | } |
264 | | |
|
265 | | | /// Workers do not need an [operations] map. |
266 | | 0 | @override |
267 | | 0 | Map<int, CommandHandler> get operations => WorkerService.noOperations; |
268 | | | } |