1 | | | import 'dart:async'; |
2 | | | import 'dart:convert'; |
3 | | | import 'dart:isolate' as vm; |
4 | | |
|
5 | | | import 'package:logger/web.dart'; |
6 | | |
|
7 | | | import '../../channel.dart' show Channel; |
8 | | | import '../../channel.dart'; |
9 | | | import '../../exceptions/exception_manager.dart'; |
10 | | | import '../../exceptions/squadron_error.dart'; |
11 | | | import '../../exceptions/squadron_exception.dart'; |
12 | | | import '../../exceptions/worker_exception.dart'; |
13 | | | import '../../tokens/_squadron_cancelation_token.dart'; |
14 | | | import '../../typedefs.dart'; |
15 | | | import '../../worker/worker_request.dart'; |
16 | | | import '../../worker/worker_response.dart'; |
17 | | | import '../xplat/_disconnected_channel.dart'; |
18 | | | import '../xplat/_forward_stream_controller.dart'; |
19 | | | import '../xplat/_result_stream.dart'; |
20 | | |
|
21 | | | part '_channel_impl.dart'; |
22 | | |
|
23 | | | // Stub implementations. |
24 | | |
|
25 | | | /// Starts an [Isolate] using the [entryPoint] and sends a start |
26 | | | /// [WorkerRequest] with [startArguments]. The future completes after the |
27 | | | /// worker [Isolate]'s main program has provided the [SendPort] via |
28 | | | /// [WorkerChannel.connect]. |
29 | | 10 | Future<Channel> openChannel( |
30 | | | EntryPoint entryPoint, |
31 | | | ExceptionManager exceptionManager, |
32 | | | Logger? logger, |
33 | | | List startArguments, [ |
34 | | | PlatformThreadHook? hook, |
35 | | | ]) async { |
36 | | 10 | final completer = Completer<Channel>(); |
37 | | | Channel? channel; |
38 | | |
|
39 | | 10 | void failure(Object error, [StackTrace? stackTrace]) { |
40 | | 10 | if (!completer.isCompleted) { |
41 | | 6 | completer.completeError(SquadronException.from(error, stackTrace)); |
42 | | | } |
43 | | | } |
44 | | |
|
45 | | 9 | void success(Channel channel) { |
46 | | 9 | if (!completer.isCompleted) { |
47 | | 9 | completer.complete(channel); |
48 | | | } |
49 | | | } |
50 | | |
|
51 | | 10 | final receiver = vm.ReceivePort(); |
52 | | 10 | final exitPort = vm.ReceivePort(); |
53 | | 10 | final errorPort = vm.ReceivePort(); |
54 | | |
|
55 | | 20 | exitPort.listen((message) { |
56 | | 20 | failure(SquadronErrorExt.create('Connection to worker failed')); |
57 | | 2 | logger?.t('Isolate terminated.'); |
58 | | 9 | channel?.close(); |
59 | | 10 | receiver.close(); |
60 | | 10 | errorPort.close(); |
61 | | 10 | exitPort.close(); |
62 | | | }); |
63 | | |
|
64 | | 10 | errorPort.listen((message) { |
65 | | | SquadronException? error; |
66 | | | try { |
67 | | 0 | final data = jsonDecode(message[0]); |
68 | | 0 | if (data is List) { |
69 | | 0 | error = exceptionManager.deserialize(data.cast<String>()); |
70 | | | } |
71 | | | } catch (_) { |
72 | | | // not a String representing a SquadronException |
73 | | | } |
74 | | |
|
75 | | 0 | error ??= WorkerException( |
76 | | 0 | message[0], |
77 | | 0 | SquadronException.loadStackTrace(message[1]), |
78 | | | ); |
79 | | |
|
80 | | 0 | logger?.d(() => 'Unhandled error from Isolate: ${error?.message}.'); |
81 | | 0 | failure(error); |
82 | | | }); |
83 | | |
|
84 | | 10 | final disconnected = DisconnectedChannel(exceptionManager, logger); |
85 | | |
|
86 | | 19 | receiver.listen((message) { |
87 | | 9 | final response = WorkerResponseExt.from(message); |
88 | | 9 | if (!response.unwrapInPlace(disconnected)) { |
89 | | | return; |
90 | | | } |
91 | | |
|
92 | | 9 | final error = response.error; |
93 | | | if (error != null) { |
94 | | 4 | logger?.e(() => 'Connection to Isolate failed: ${response.error}'); |
95 | | 2 | failure(error); |
96 | | 9 | } else if (response.endOfStream) { |
97 | | 1 | logger?.w('Disconnecting from Isolate'); |
98 | | 1 | channel?.close(); |
99 | | 9 | } else if (!completer.isCompleted) { |
100 | | 2 | logger?.t('Connected to Isolate'); |
101 | | 18 | channel = _VmChannel._(response.result, logger, exceptionManager); |
102 | | 9 | success(channel!); |
103 | | | } else { |
104 | | 0 | logger?.e(() => 'Unexpected response: $response'); |
105 | | | } |
106 | | | }); |
107 | | |
|
108 | | 20 | final startRequest = WorkerRequest.start(receiver.sendPort, startArguments); |
109 | | 10 | startRequest.wrapInPlace(); |
110 | | 10 | final isolate = await vm.Isolate.spawn( |
111 | | | entryPoint, |
112 | | | startRequest, |
113 | | | errorsAreFatal: false, |
114 | | 10 | onExit: exitPort.sendPort, |
115 | | 10 | onError: errorPort.sendPort, |
116 | | | ); |
117 | | |
|
118 | | | try { |
119 | | 10 | final channel = await completer.future; |
120 | | 1 | await hook?.call(isolate); |
121 | | 2 | logger?.t('Created Isolate'); |
122 | | | return channel; |
123 | | | } catch (ex) { |
124 | | 3 | logger?.e(() => 'Connection to Isolate failed: $ex'); |
125 | | 3 | isolate.kill(priority: vm.Isolate.beforeNextEvent); |
126 | | | rethrow; |
127 | | | } |
128 | | | } |
129 | | |
|
130 | | | /// Creates a [_VmChannel] from a [SendPort]. |
131 | | 3 | Channel? deserialize(PlatformChannel? channelInfo, |
132 | | | [Logger? logger, ExceptionManager? exceptionManager]) => |
133 | | | (channelInfo == null) |
134 | | | ? null |
135 | | 3 | : _VmChannel._( |
136 | | | channelInfo, |
137 | | | logger, |
138 | | 3 | exceptionManager ?? ExceptionManager(), |
139 | | | ); |