neon/event/channel.rs
1use std::{
2 error, fmt,
3 sync::{
4 atomic::{AtomicUsize, Ordering},
5 Arc,
6 },
7};
8
9use crate::{
10 context::{internal::Env, Context, Cx},
11 result::{NeonResult, ResultExt, Throw},
12 sys::{self, tsfn::ThreadsafeFunction},
13};
14
15#[cfg(feature = "futures")]
16use {
17 std::future::Future,
18 std::pin::Pin,
19 std::task::{self, Poll},
20 tokio::sync::oneshot,
21};
22
23#[cfg(not(feature = "futures"))]
24// Synchronous oneshot channel API compatible with `tokio::sync::oneshot`
25mod oneshot {
26 use std::sync::mpsc;
27
28 pub(super) mod error {
29 pub use super::mpsc::RecvError;
30 }
31
32 pub(super) struct Receiver<T>(mpsc::Receiver<T>);
33
34 impl<T> Receiver<T> {
35 pub(super) fn blocking_recv(self) -> Result<T, mpsc::RecvError> {
36 self.0.recv()
37 }
38 }
39
40 pub(super) fn channel<T>() -> (mpsc::SyncSender<T>, Receiver<T>) {
41 let (tx, rx) = mpsc::sync_channel(1);
42
43 (tx, Receiver(rx))
44 }
45}
46
47type Callback = Box<dyn FnOnce(sys::Env) + Send + 'static>;
48
49/// Channel for scheduling Rust closures to execute on the JavaScript main thread.
50///
51/// Cloning a `Channel` will create a new channel that shares a backing queue for
52/// events.
53///
54/// # Example
55///
56/// The following example spawns a standard Rust thread to complete a computation
57/// and calls back to a JavaScript function asynchronously with the result.
58///
59/// ```
60/// # use neon::prelude::*;
61/// # fn fibonacci(_: f64) -> f64 { todo!() }
62/// fn async_fibonacci(mut cx: FunctionContext) -> JsResult<JsUndefined> {
63/// // These types (`f64`, `Root<JsFunction>`, `Channel`) may all be sent
64/// // across threads.
65/// let n = cx.argument::<JsNumber>(0)?.value(&mut cx);
66/// let callback = cx.argument::<JsFunction>(1)?.root(&mut cx);
67/// let channel = cx.channel();
68///
69/// // Spawn a thread to complete the execution. This will _not_ block the
70/// // JavaScript event loop.
71/// std::thread::spawn(move || {
72/// let result = fibonacci(n);
73///
74/// // Send a closure as a task to be executed by the JavaScript event
75/// // loop. This _will_ block the event loop while executing.
76/// channel.send(move |mut cx| {
77/// let callback = callback.into_inner(&mut cx);
78///
79/// callback
80/// .bind(&mut cx)
81/// .args(((), result))?
82/// .exec()?;
83///
84/// Ok(())
85/// });
86/// });
87///
88/// Ok(cx.undefined())
89/// }
90/// ```
91#[cfg_attr(docsrs, doc(cfg(feature = "napi-4")))]
92pub struct Channel {
93 state: Arc<ChannelState>,
94 has_ref: bool,
95}
96
97impl fmt::Debug for Channel {
98 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
99 f.write_str("Channel")
100 }
101}
102
103impl Channel {
104 /// Creates an unbounded channel for scheduling closures on the JavaScript
105 /// main thread
106 pub fn new<'a, C: Context<'a>>(cx: &mut C) -> Self {
107 Self {
108 state: Arc::new(ChannelState::new(cx)),
109 has_ref: true,
110 }
111 }
112
113 /// Allow the Node event loop to exit while this `Channel` exists.
114 /// _Idempotent_
115 pub fn unref<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self {
116 // Already unreferenced
117 if !self.has_ref {
118 return self;
119 }
120
121 self.has_ref = false;
122 self.state.unref(cx);
123 self
124 }
125
126 /// Prevent the Node event loop from exiting while this `Channel` exists. (Default)
127 /// _Idempotent_
128 pub fn reference<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self {
129 // Already referenced
130 if self.has_ref {
131 return self;
132 }
133
134 self.has_ref = true;
135 self.state.reference(cx);
136 self
137 }
138
139 /// Schedules a closure to execute on the JavaScript thread that created this Channel
140 /// Panics if there is a libuv error
141 pub fn send<T, F>(&self, f: F) -> JoinHandle<T>
142 where
143 T: Send + 'static,
144 F: FnOnce(Cx) -> NeonResult<T> + Send + 'static,
145 {
146 self.try_send(f).unwrap()
147 }
148
149 /// Schedules a closure to execute on the JavaScript thread that created this Channel
150 /// Returns an `Error` if the task could not be scheduled.
151 ///
152 /// See [`SendError`] for additional details on failure causes.
153 pub fn try_send<T, F>(&self, f: F) -> Result<JoinHandle<T>, SendError>
154 where
155 T: Send + 'static,
156 F: FnOnce(Cx) -> NeonResult<T> + Send + 'static,
157 {
158 let (tx, rx) = oneshot::channel();
159 let callback = Box::new(move |env| {
160 let env = Env::from(env);
161
162 // Note: It is sufficient to use `Cx` because
163 // N-API creates a `HandleScope` before calling the callback.
164 Cx::with_context(env, move |cx| {
165 // Error can be ignored; it only means the user didn't join
166 let _ = tx.send(f(cx).map_err(Into::into));
167 });
168 });
169
170 self.state
171 .tsfn
172 .call(callback, None)
173 .map_err(|_| SendError)?;
174
175 Ok(JoinHandle { rx })
176 }
177
178 /// Returns a boolean indicating if this `Channel` will prevent the Node event
179 /// loop from exiting.
180 pub fn has_ref(&self) -> bool {
181 self.has_ref
182 }
183}
184
185impl Clone for Channel {
186 /// Returns a clone of the Channel instance that shares the internal
187 /// unbounded queue with the original channel. Scheduling callbacks on the
188 /// same queue is faster than using separate channels, but might lead to
189 /// starvation if one of the threads posts significantly more callbacks on
190 /// the channel than the other one.
191 ///
192 /// Cloned and referenced Channel instances might trigger additional
193 /// event-loop tick when dropped. Channel can be wrapped into an Arc and
194 /// shared between different threads/callers to avoid this.
195 fn clone(&self) -> Self {
196 // Not referenced, we can simply clone the fields
197 if !self.has_ref {
198 return Self {
199 state: self.state.clone(),
200 has_ref: false,
201 };
202 }
203
204 let state = Arc::clone(&self.state);
205
206 // Only need to increase the ref count since the tsfn is already referenced
207 state.ref_count.fetch_add(1, Ordering::Relaxed);
208
209 Self {
210 state,
211 has_ref: true,
212 }
213 }
214}
215
216impl Drop for Channel {
217 fn drop(&mut self) {
218 // Not a referenced event queue
219 if !self.has_ref {
220 return;
221 }
222
223 // It was only us who kept the `ChannelState` alive. No need to unref
224 // the `tsfn`, because it is going to be dropped once this function
225 // returns.
226 if Arc::strong_count(&self.state) == 1 {
227 return;
228 }
229
230 // The ChannelState is dropped on a worker thread. We have to `unref`
231 // the tsfn on the UV thread after all pending closures. Note that in
232 // the most of scenarios the optimization in N-API layer would coalesce
233 // `send()` with a user-supplied closure and the unref send here into a
234 // single UV tick.
235 //
236 // If this ever has to be optimized a second `Arc` could be used to wrap
237 // the `state` and it could be cloned in `try_send` and unref'ed on the
238 // UV thread if strong reference count goes to 0.
239 let state = Arc::clone(&self.state);
240
241 // `Channel::try_send` will only fail if the environment has shutdown.
242 // In that case, the teardown will perform clean-up.
243 let _ = self.try_send(move |mut cx| {
244 state.unref(&mut cx);
245 Ok(())
246 });
247 }
248}
249
250/// An owned permission to join on the result of a closure sent to the JavaScript main
251/// thread with [`Channel::send`].
252pub struct JoinHandle<T> {
253 // `Err` is always `Throw`, but `Throw` cannot be sent across threads
254 rx: oneshot::Receiver<Result<T, SendThrow>>,
255}
256
257impl<T> JoinHandle<T> {
258 /// Waits for the associated closure to finish executing
259 ///
260 /// If the closure panics or throws an exception, `Err` is returned
261 ///
262 /// # Panics
263 ///
264 /// This function panics if called within an asynchronous execution context.
265 pub fn join(self) -> Result<T, JoinError> {
266 Ok(self.rx.blocking_recv()??)
267 }
268}
269
270#[cfg(feature = "futures")]
271#[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
272impl<T> Future for JoinHandle<T> {
273 type Output = Result<T, JoinError>;
274
275 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
276 match Pin::new(&mut self.rx).poll(cx) {
277 Poll::Ready(result) => {
278 // Flatten `Result<Result<T, SendThrow>, RecvError>` by mapping to
279 // `Result<T, JoinError>`. This can be simplified by replacing the
280 // closure with a try-block after stabilization.
281 // https://doc.rust-lang.org/beta/unstable-book/language-features/try-blocks.html
282 let get_result = move || Ok(result??);
283
284 Poll::Ready(get_result())
285 }
286 Poll::Pending => Poll::Pending,
287 }
288 }
289}
290
291#[derive(Debug)]
292/// Error returned by [`JoinHandle::join`] indicating the associated closure panicked
293/// or threw an exception.
294pub struct JoinError(JoinErrorType);
295
296#[derive(Debug)]
297enum JoinErrorType {
298 Panic,
299 Throw,
300}
301
302impl JoinError {
303 fn as_str(&self) -> &str {
304 match &self.0 {
305 JoinErrorType::Panic => "Closure panicked before returning",
306 JoinErrorType::Throw => "Closure threw an exception",
307 }
308 }
309}
310
311impl fmt::Display for JoinError {
312 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
313 f.write_str(self.as_str())
314 }
315}
316
317impl error::Error for JoinError {}
318
319impl From<oneshot::error::RecvError> for JoinError {
320 fn from(_: oneshot::error::RecvError) -> Self {
321 JoinError(JoinErrorType::Panic)
322 }
323}
324
325// Marker that a `Throw` occurred that can be sent across threads for use in `JoinError`
326pub(crate) struct SendThrow(());
327
328impl From<SendThrow> for JoinError {
329 fn from(_: SendThrow) -> Self {
330 JoinError(JoinErrorType::Throw)
331 }
332}
333
334impl From<Throw> for SendThrow {
335 fn from(_: Throw) -> SendThrow {
336 SendThrow(())
337 }
338}
339
340impl<T> ResultExt<T> for Result<T, JoinError> {
341 fn or_throw<'a, C: Context<'a>>(self, cx: &mut C) -> NeonResult<T> {
342 self.or_else(|err| cx.throw_error(err.as_str()))
343 }
344}
345
346/// Error indicating that a closure was unable to be scheduled to execute on the event loop.
347///
348/// The most likely cause of a failure is that Node is shutting down. This may occur if the
349/// process is forcefully exiting even if the channel is referenced. For example, by calling
350/// `process.exit()`.
351//
352// NOTE: These docs will need to be updated to include `QueueFull` if bounded queues are
353// implemented.
354#[cfg_attr(docsrs, doc(cfg(feature = "napi-4")))]
355pub struct SendError;
356
357impl fmt::Display for SendError {
358 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
359 write!(f, "SendError")
360 }
361}
362
363impl fmt::Debug for SendError {
364 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365 fmt::Display::fmt(self, f)
366 }
367}
368
369impl error::Error for SendError {}
370
371struct ChannelState {
372 tsfn: ThreadsafeFunction<Callback>,
373 ref_count: AtomicUsize,
374}
375
376impl ChannelState {
377 fn new<'a, C: Context<'a>>(cx: &mut C) -> Self {
378 let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) };
379 Self {
380 tsfn,
381 ref_count: AtomicUsize::new(1),
382 }
383 }
384
385 fn reference<'a, C: Context<'a>>(&self, cx: &mut C) {
386 // We can use relaxed ordering because `reference()` can only be called
387 // on the Event-Loop thread.
388 if self.ref_count.fetch_add(1, Ordering::Relaxed) != 0 {
389 return;
390 }
391
392 unsafe {
393 self.tsfn.reference(cx.env().to_raw());
394 }
395 }
396
397 fn unref<'a, C: Context<'a>>(&self, cx: &mut C) {
398 // We can use relaxed ordering because `unref()` can only be called
399 // on the Event-Loop thread.
400 if self.ref_count.fetch_sub(1, Ordering::Relaxed) != 1 {
401 return;
402 }
403
404 unsafe {
405 self.tsfn.unref(cx.env().to_raw());
406 }
407 }
408
409 // Monomorphized trampoline funciton for calling the user provided closure
410 fn callback(env: Option<sys::Env>, callback: Callback) {
411 if let Some(env) = env {
412 callback(env);
413 } else {
414 crate::context::internal::IS_RUNNING.with(|v| {
415 *v.borrow_mut() = false;
416 });
417 }
418 }
419}