LCOV - code coverage report

Current view
top level - src/worker - worker.dart
Test
lcov.info
Date
2024-11-13
Legend
Lines
hit
not hit
Branches
taken
not taken
# not executed
HitTotalCoverage
Lines12413293.9%
Functions00-
Branches00-
Each row represents a line of source code
LineBranchHitsSource code
1import 'dart:async';
2
3import 'package:cancelation_token/cancelation_token.dart';
4import 'package:logger/web.dart';
5import 'package:using/using.dart';
6
7import '../_impl/xplat/_forward_completer.dart';
8import '../_impl/xplat/_forward_stream_controller.dart';
9import '../_impl/xplat/_time_stamp.dart';
10import '../channel.dart';
11import '../exceptions/exception_manager.dart';
12import '../exceptions/squadron_exception.dart';
13import '../exceptions/worker_exception.dart';
14import '../iworker.dart';
15import '../stats/worker_stat.dart';
16import '../tokens/_squadron_cancelation_token.dart';
17import '../typedefs.dart';
18import '../worker/worker_request.dart';
19import '../worker_service.dart';
20
21/// Base worker class.
22///
23/// This base class takes care of creating the [Channel] and firing up the
24/// worker. Typically, derived classes should add proxy methods sending
25/// [WorkerRequest]s to the worker.
26abstract class Worker with Releasable implements WorkerService, IWorker {
27 /// Creates a [Worker] with the specified entrypoint.
2810 Worker(this._entryPoint,
293 {this.args = const [],
30 PlatformThreadHook? threadHook,
31 ExceptionManager? exceptionManager})
321 : _threadHook = threadHook,
33 _exceptionManager = exceptionManager;
34
356 @override
363 void release() {
3710 stop();
3812 super.release();
393 }
40
41 /// The [Worker]'s entry point; typically, a top-level function in native
42 /// world or the Uri to a JavaScript file in browser world.
43 final EntryPoint _entryPoint;
44
45 @override
46 Logger? channelLogger;
47
4810 @override
491 ExceptionManager get exceptionManager =>
5029 (_exceptionManager ??= ExceptionManager());
51 ExceptionManager? _exceptionManager;
52
53 final PlatformThreadHook? _threadHook;
54
55 /// The [Worker]'s start arguments.
56 final List args;
57
58 /// Start timestamp (in microseconds since Epoch).
59 int? _started;
60
61 /// Stopped timestamp (in microseconds since Epoch).
62 int? _stopped;
63
64 /// Current workload.
6511 int get workload => _workload;
66 int _workload = 0;
67
68 /// Maximum acceptable workload.
6912 int get maxWorkload => _maxWorkload;
70 int _maxWorkload = 0;
71
72 /// Total processed workload.
7311 int get totalWorkload => _totalWorkload;
74 int _totalWorkload = 0;
75
76 /// Total errors.
7712 int get totalErrors => _totalErrors;
78 int _totalErrors = 0;
79
80 /// Up time.
8115 Duration get upTime => (_started == null)
821 ? Duration.zero
8327 : Duration(microseconds: (_stopped ?? microsecTimeStamp()) - _started!);
84
85 /// Idle time.
8625 Duration get idleTime => (_workload > 0 || _idle == null)
872 ? Duration.zero
8825 : Duration(microseconds: microsecTimeStamp() - _idle!);
89 int? _idle;
90
91 /// Indicates if the [Worker] has been stopped.
9215 bool get isStopped => _stopped != null;
93
94 /// [Worker] status.
9510 String get status {
9610 if (isStopped) {
974 return 'STOPPED';
986 } else if (_workload == 0) {
992 return 'IDLE';
100 } else {
1010 return 'WORKING($_workload)';
102 }
1035 }
104
105 /// [Worker] statistics.
10615 WorkerStat get stats => WorkerStatExt.create(
1075 runtimeType,
1085 hashCode,
1095 isStopped,
11010 status,
1115 workload,
1125 maxWorkload,
1135 totalWorkload,
1145 totalErrors,
1156 upTime,
1167 idleTime,
1175 );
118
119 /// Returns true if the [Worker] is connected i.e., it has a valid [Channel].
120 /// Returns false otherwise.
1213 bool get isConnected => _channel != null;
122
123 /// Shared [Channel] that can be used to communicate with the worker.
1248 Channel? getSharedChannel() => _channel?.share();
125
126 Channel? _channel;
12710 Future<Channel>? _openChannel;
128
12913 void _enter() {
13018 _workload++;
13136 if (_workload > _maxWorkload) {
13227 _maxWorkload = _workload;
133 }
1344 }
135
13614 void _leave() {
13718 _workload--;
13818 _totalWorkload++;
13927 _idle = microsecTimeStamp();
1405 }
141
142 /// Sends a workload to the worker.
14316 Future<dynamic> send(
144 int command, {
145 List args = const [],
146 CancelationToken? token,
147 bool inspectRequest = false,
148 bool inspectResponse = false,
149 }) async {
1508 token?.throwIfCanceled();
151
152 // get the channel, start the worker if necessary
15319 final channel = _channel ?? await start();
154
1558 final completer = ForwardCompleter();
156
1570 final squadronToken = token?.wrap();
1580 squadronToken?.onCanceled.then((ex) {
1590 _channel?.cancelToken(squadronToken);
1600 completer.failure(SquadronException.from(ex, null, command));
1610 });
162
16312 _enter();
1643 try {
16516 final res = await channel.sendRequest(
166 command,
167 args,
168 token: squadronToken,
169 inspectRequest: inspectRequest,
170 inspectResponse: inspectResponse,
171 );
1728 completer.success(res);
1733 } catch (ex, st) {
1744 _totalErrors++;
1757 completer.failure(SquadronException.from(ex, st, command));
176 } finally {
17712 _leave();
178 }
179
18016 return completer.future;
1818 }
182
183 /// Sends a streaming workload to the worker.
18410 Stream<dynamic> stream(
185 int command, {
186 List args = const [],
187 CancelationToken? token,
188 bool inspectRequest = false,
189 bool inspectResponse = false,
190 }) {
1914 final squadronToken = token?.wrap();
192
1935 late final ForwardStreamController controller;
194
1958 squadronToken?.onCanceled.then((ex) {
1964 if (!controller.isClosed) {
1976 controller.subscription?.cancel();
1986 controller.addError(SquadronException.from(ex, null, command));
1994 controller.close();
200 }
2016 _channel?.cancelToken(squadronToken);
2022 });
203
20415 controller = ForwardStreamController(onListen: () async {
2055 try {
20610 if (controller.isClosed) return;
2077 squadronToken?.throwIfCanceled();
20811 final channel = _channel ?? await start();
2095 if (controller.isClosed) return;
2109 _enter();
21110 controller.attachSubscription(channel
2125 .sendStreamingRequest(
2135 command,
2145 args,
215 token: squadronToken,
2165 inspectRequest: inspectRequest,
2175 inspectResponse: inspectResponse,
218 )
21910 .listen(
22010 controller.add,
2217 onError: (ex, st) =>
22210 controller.addError(SquadronException.from(ex, st, command)),
22310 onDone: controller.close,
224 cancelOnError: false,
225 ));
22620 controller.done.whenComplete(_leave);
2271 } catch (ex, st) {
2283 _totalErrors++;
2293 controller.addError(SquadronException.from(ex, st, command));
2302 controller.close();
231 }
2325 });
233
23410 return controller.stream;
2355 }
236
237 /// Creates a [Channel] and starts the worker using the [_entryPoint].
23810 @override
23910 Future<Channel> start() async {
24020 if (_stopped != null) {
2412 throw WorkerException('Invalid state: worker is stopped');
242 }
24330 _openChannel ??= Channel.open(
24460 exceptionManager, channelLogger, _entryPoint, args, _threadHook);
24530 final channel = _channel ?? await _openChannel;
24618 if (_channel == null) {
24718 _channel = channel;
24827 _started = microsecTimeStamp();
24923 _idle = _started;
250 }
25118 return _channel!;
25210 }
253
254 /// Stops this worker.
25510 @override
2565 void stop() {
25720 if (_stopped == null) {
25830 _stopped = microsecTimeStamp();
25920 _openChannel = null;
26029 _channel?.close();
26120 _channel = null;
262 }
2635 }
264
265 /// Workers do not need an [operations] map.
2660 @override
2670 Map<int, CommandHandler> get operations => WorkerService.noOperations;
268}
Choose Features