1 | | | import 'dart:async'; |
2 | | | import 'dart:convert'; |
3 | | | import 'dart:isolate' as vm; |
4 | | |
|
5 | | | import 'package:logger/web.dart'; |
6 | | | import 'package:meta/meta.dart'; |
7 | | |
|
8 | | | import '../../channel.dart'; |
9 | | | import '../../exceptions/exception_manager.dart'; |
10 | | | import '../../exceptions/squadron_error.dart'; |
11 | | | import '../../exceptions/squadron_exception.dart'; |
12 | | | import '../../exceptions/task_terminated_exception.dart'; |
13 | | | import '../../exceptions/worker_exception.dart'; |
14 | | | import '../../tokens/_squadron_cancelation_token.dart'; |
15 | | | import '../../worker/worker_request.dart'; |
16 | | | import '../../worker/worker_response.dart'; |
17 | | | import '../xplat/_disconnected_channel.dart'; |
18 | | | import '../xplat/_forward_stream_controller.dart'; |
19 | | | import '../xplat/_result_stream.dart'; |
20 | | | import '_typedefs.dart'; |
21 | | |
|
22 | | | part '_channel_forward.dart'; |
23 | | | part '_channel_impl.dart'; |
24 | | |
|
25 | | | // Stub implementations. |
26 | | |
|
27 | | | /// Starts an [Isolate] using the [entryPoint] and sends a start |
28 | | | /// [WorkerRequest] with [startArguments]. The future completes after the |
29 | | | /// worker [Isolate]'s main program has provided the [SendPort] via |
30 | | | /// [WorkerChannel.connect]. |
31 | | 11 | Future<Channel> openChannel( |
32 | | | EntryPoint entryPoint, |
33 | | | ExceptionManager exceptionManager, |
34 | | | Logger? logger, |
35 | | | List startArguments, |
36 | | | PlatformThreadHook? hook, |
37 | | | ) async { |
38 | | 11 | final completer = Completer<_VmChannel>(); |
39 | | | Channel? channel; |
40 | | |
|
41 | | 11 | void failure(Object error, [StackTrace? stackTrace]) { |
42 | | 11 | if (!completer.isCompleted) { |
43 | | 6 | completer.completeError(SquadronException.from(error, stackTrace)); |
44 | | | } |
45 | | | } |
46 | | |
|
47 | | 10 | void success(_VmChannel channel) { |
48 | | 10 | if (!completer.isCompleted) { |
49 | | 10 | completer.complete(channel); |
50 | | | } |
51 | | | } |
52 | | |
|
53 | | 11 | final receiver = vm.ReceivePort(); |
54 | | 11 | final exitPort = vm.ReceivePort(); |
55 | | 11 | final errorPort = vm.ReceivePort(); |
56 | | |
|
57 | | 22 | exitPort.listen((message) { |
58 | | 22 | failure(SquadronErrorImpl.create('Connection to worker failed')); |
59 | | 6 | logger?.t('Isolate terminated with message $message.'); |
60 | | 10 | channel?.close(); |
61 | | 11 | receiver.close(); |
62 | | 11 | errorPort.close(); |
63 | | 11 | exitPort.close(); |
64 | | | }); |
65 | | |
|
66 | | 11 | errorPort.listen((message) { |
67 | | | SquadronException? error; |
68 | | | try { |
69 | | 0 | final data = jsonDecode(message[0]); |
70 | | 0 | if (data is List) { |
71 | | 0 | error = exceptionManager.deserialize(data.cast<String>()); |
72 | | | } |
73 | | | } catch (_) { |
74 | | | // not a String representing a SquadronException |
75 | | | } |
76 | | |
|
77 | | 0 | error ??= WorkerException( |
78 | | 0 | message[0], |
79 | | 0 | SquadronException.loadStackTrace(message[1]), |
80 | | | ); |
81 | | |
|
82 | | 0 | logger?.d(() => 'Unhandled error from Isolate: ${error?.message}.'); |
83 | | 0 | failure(error); |
84 | | | }); |
85 | | |
|
86 | | 11 | final disconnected = DisconnectedChannel(exceptionManager, logger); |
87 | | |
|
88 | | 21 | receiver.listen((message) { |
89 | | 10 | final response = WorkerResponseImpl.from(message); |
90 | | 10 | if (!response.unwrapInPlace(disconnected)) { |
91 | | | return; |
92 | | | } |
93 | | |
|
94 | | 10 | final error = response.error; |
95 | | | if (error != null) { |
96 | | 3 | logger?.e(() => 'Connection to Isolate failed: $error'); |
97 | | 2 | failure(error); |
98 | | 10 | } else if (response.endOfStream) { |
99 | | 1 | logger?.w('Disconnecting from Isolate'); |
100 | | 1 | channel?.close(); |
101 | | 10 | } else if (!completer.isCompleted) { |
102 | | 3 | logger?.t('Connected to Isolate'); |
103 | | | final platformChannel = |
104 | | 20 | _VmChannel._(response.result, logger, exceptionManager); |
105 | | | channel = platformChannel; |
106 | | 10 | success(platformChannel); |
107 | | | } else { |
108 | | 0 | logger?.e(() => 'Unexpected response: $response'); |
109 | | | } |
110 | | | }); |
111 | | |
|
112 | | 22 | final startRequest = WorkerRequest.start(receiver.sendPort, startArguments); |
113 | | 11 | startRequest.wrapInPlace(); |
114 | | 11 | final isolate = await vm.Isolate.spawn( |
115 | | | entryPoint, |
116 | | | startRequest, |
117 | | | errorsAreFatal: false, |
118 | | 11 | onExit: exitPort.sendPort, |
119 | | 11 | onError: errorPort.sendPort, |
120 | | | ); |
121 | | |
|
122 | | | try { |
123 | | 11 | final channel = await completer.future; |
124 | | 10 | channel._thread = isolate; |
125 | | | if (hook != null) { |
126 | | 2 | await hook.call(isolate); |
127 | | | } |
128 | | 3 | logger?.t('Created Isolate'); |
129 | | | return channel; |
130 | | | } catch (ex) { |
131 | | 0 | logger?.e(() => 'Connection to Isolate failed: $ex'); |
132 | | 3 | isolate.kill(priority: vm.Isolate.beforeNextEvent); |
133 | | | rethrow; |
134 | | | } |
135 | | | } |
136 | | |
|
137 | | 1 | @internal |
138 | | | void terminateChannel(Channel channel, TaskTerminatedException ex) { |
139 | | 1 | if (channel is _VmChannel) { |
140 | | 2 | channel._thread?.kill(priority: vm.Isolate.immediate); |
141 | | 1 | final pendingConnections = channel._activeConnections; |
142 | | 2 | for (var c in pendingConnections) { |
143 | | 2 | c.subscription?.cancel(); |
144 | | 1 | c.addError(ex); |
145 | | 1 | c.close(); |
146 | | | } |
147 | | | } |
148 | | | } |
149 | | |
|
150 | | | /// Creates a [_VmChannel] from a [SendPort]. |
151 | | 3 | Channel? deserialize( |
152 | | | PlatformChannel? channelInfo, [ |
153 | | | Logger? logger, |
154 | | | ExceptionManager? exceptionManager, |
155 | | | ]) => |
156 | | | (channelInfo == null) |
157 | | | ? null |
158 | | 3 | : _VmChannel._( |
159 | | | channelInfo, |
160 | | | logger, |
161 | | 3 | exceptionManager ?? ExceptionManager(), |
162 | | | ); |