├── src ├── task.rs ├── sync │ ├── mod.rs │ ├── flag.rs │ ├── value_creator.rs │ └── task_queue.rs ├── tokio │ ├── mod.rs │ ├── joinset.rs │ ├── task.rs │ └── split.rs ├── lib.rs ├── flag.rs ├── waker.rs ├── mpsc │ ├── chunked_queue.rs │ └── mod.rs ├── future.rs └── task_queue.rs ├── .gitignore ├── rust-toolchain.toml ├── .rustfmt.toml ├── README.md ├── Cargo.toml ├── .github └── workflows │ ├── release.yml │ └── rust.yml └── LICENSE /src/task.rs: -------------------------------------------------------------------------------- 1 | 2 | -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- 1 | /.vs 2 | /target 3 | /Cargo.lock 4 | -------------------------------------------------------------------------------- /rust-toolchain.toml: -------------------------------------------------------------------------------- 1 | [toolchain] 2 | channel = "1.87.0" 3 | components = [ "clippy", "rustfmt" ] 4 | -------------------------------------------------------------------------------- /.rustfmt.toml: -------------------------------------------------------------------------------- 1 | max_width = 80 2 | tab_spaces = 2 3 | edition = "2021" 4 | imports_granularity = "Item" 5 | -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- 1 | # deno_unsync 2 | 3 | This is a collection of adapters to make working with Tokio single-threaded runtimes as easy as working with 4 | multi-threaded runtimes. 5 | -------------------------------------------------------------------------------- /src/sync/mod.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2018-2024 the Deno authors. MIT license. 2 | 3 | mod flag; 4 | mod task_queue; 5 | #[cfg(feature = "tokio")] 6 | mod value_creator; 7 | 8 | pub use flag::AtomicFlag; 9 | pub use task_queue::TaskQueue; 10 | pub use task_queue::TaskQueuePermit; 11 | #[cfg(feature = "tokio")] 12 | pub use value_creator::MultiRuntimeAsyncValueCreator; 13 | -------------------------------------------------------------------------------- /src/tokio/mod.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2018-2024 the Deno authors. MIT license. 2 | 3 | pub use joinset::JoinSet; 4 | pub use split::split_io; 5 | pub use split::IOReadHalf; 6 | pub use split::IOWriteHalf; 7 | 8 | pub use task::spawn; 9 | pub use task::spawn_blocking; 10 | pub use task::JoinHandle; 11 | pub use task::MaskFutureAsSend; 12 | 13 | mod joinset; 14 | mod split; 15 | mod task; 16 | -------------------------------------------------------------------------------- /src/lib.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2018-2024 the Deno authors. MIT license. 2 | 3 | mod flag; 4 | pub mod future; 5 | pub mod mpsc; 6 | pub mod sync; 7 | mod task; 8 | mod task_queue; 9 | #[cfg(feature = "tokio")] 10 | mod tokio; 11 | mod waker; 12 | 13 | pub use flag::Flag; 14 | pub use task_queue::TaskQueue; 15 | pub use task_queue::TaskQueuePermit; 16 | pub use task_queue::TaskQueuePermitAcquireFuture; 17 | pub use waker::UnsyncWaker; 18 | 19 | #[cfg(feature = "tokio")] 20 | pub use self::tokio::*; 21 | 22 | /// Marker for items that are ![`Send`]. 23 | #[derive(Copy, Clone, Default, Eq, PartialEq, PartialOrd, Ord, Debug, Hash)] 24 | pub struct UnsendMarker( 25 | std::marker::PhantomData>, 26 | ); 27 | -------------------------------------------------------------------------------- /Cargo.toml: -------------------------------------------------------------------------------- 1 | [package] 2 | name = "deno_unsync" 3 | version = "0.4.4" 4 | edition = "2021" 5 | authors = ["the Deno authors"] 6 | license = "MIT" 7 | repository = "https://github.com/denoland/deno_unsync" 8 | description = "A collection of adapters to make working with Tokio single-threaded runtimes easier" 9 | readme = "README.md" 10 | 11 | [features] 12 | default = ["tokio"] 13 | tokio = ["dep:tokio"] 14 | 15 | [dependencies] 16 | futures-util = { version = "0.3.21", default-features = false, features = ["std"] } 17 | parking_lot = "0.12.3" 18 | tokio = { version = "1", features = ["rt"], optional = true } 19 | 20 | [dev-dependencies] 21 | tokio = { version = "1", features = ["io-util", "macros", "rt", "sync", "time"] } 22 | 23 | [lib] 24 | name = "deno_unsync" 25 | -------------------------------------------------------------------------------- /.github/workflows/release.yml: -------------------------------------------------------------------------------- 1 | name: release 2 | 3 | on: 4 | workflow_dispatch: 5 | inputs: 6 | releaseKind: 7 | description: "Kind of release" 8 | default: "minor" 9 | type: choice 10 | options: 11 | - patch 12 | - minor 13 | required: true 14 | 15 | jobs: 16 | rust: 17 | name: release 18 | runs-on: ubuntu-latest 19 | timeout-minutes: 30 20 | 21 | steps: 22 | - name: Clone repository 23 | uses: actions/checkout@v4 24 | with: 25 | token: ${{ secrets.DENOBOT_PAT }} 26 | 27 | - uses: denoland/setup-deno@v1 28 | - uses: dsherret/rust-toolchain-file@v1 29 | 30 | - name: Tag and release 31 | env: 32 | GITHUB_TOKEN: ${{ secrets.DENOBOT_PAT }} 33 | GH_WORKFLOW_ACTOR: ${{ github.actor }} 34 | run: | 35 | git config user.email "denobot@users.noreply.github.com" 36 | git config user.name "denobot" 37 | deno run -A https://raw.githubusercontent.com/denoland/automation/0.15.0/tasks/publish_release.ts --${{github.event.inputs.releaseKind}} 38 | -------------------------------------------------------------------------------- /LICENSE: -------------------------------------------------------------------------------- 1 | MIT License 2 | 3 | Copyright (c) 2018-2024 the Deno authors 4 | 5 | Permission is hereby granted, free of charge, to any person obtaining a copy 6 | of this software and associated documentation files (the "Software"), to deal 7 | in the Software without restriction, including without limitation the rights 8 | to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 | copies of the Software, and to permit persons to whom the Software is 10 | furnished to do so, subject to the following conditions: 11 | 12 | The above copyright notice and this permission notice shall be included in all 13 | copies or substantial portions of the Software. 14 | 15 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 | IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 | FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 | AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 | LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 | OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 21 | SOFTWARE. 22 | -------------------------------------------------------------------------------- /.github/workflows/rust.yml: -------------------------------------------------------------------------------- 1 | name: Rust 2 | 3 | on: 4 | push: 5 | branches: [ "main" ] 6 | tags: 7 | - '*' 8 | pull_request: 9 | branches: [ "main" ] 10 | 11 | env: 12 | CARGO_TERM_COLOR: always 13 | 14 | jobs: 15 | build: 16 | runs-on: ubuntu-latest 17 | 18 | steps: 19 | - uses: actions/checkout@v4 20 | - uses: dsherret/rust-toolchain-file@v1 21 | - name: Build 22 | run: cargo build 23 | - name: Build (no default features) 24 | run: cargo build --no-default-features 25 | - name: Format 26 | run: cargo fmt -- --check 27 | - name: Lint 28 | run: cargo clippy --all-targets --all-features -- -D warnings 29 | - name: Test 30 | run: cargo test 31 | - name: Cargo publish 32 | if: | 33 | github.repository == 'denoland/deno_unsync' && 34 | startsWith(github.ref, 'refs/tags/') 35 | env: 36 | CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }} 37 | run: cargo publish 38 | 39 | miri: 40 | runs-on: ubuntu-latest 41 | 42 | steps: 43 | - uses: actions/checkout@v4 44 | - run: rustup install nightly 45 | - run: rustup component add --toolchain nightly-x86_64-unknown-linux-gnu miri 46 | - name: Tests (miri) 47 | # opt into only specific tests because this takes a while to run 48 | run: cargo +nightly miri test future::test 49 | -------------------------------------------------------------------------------- /src/flag.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2018-2024 the Deno authors. MIT license. 2 | 3 | use std::cell::Cell; 4 | 5 | /// A flag with interior mutability that can be raised or lowered. 6 | /// Useful for indicating if an event has occurred. 7 | #[derive(Debug, Default)] 8 | pub struct Flag(Cell); 9 | 10 | impl Flag { 11 | /// Creates a new flag that's lowered. 12 | pub const fn lowered() -> Self { 13 | Self(Cell::new(false)) 14 | } 15 | 16 | /// Creates a new flag that's raised. 17 | pub const fn raised() -> Self { 18 | Self(Cell::new(true)) 19 | } 20 | 21 | /// Raises the flag returning if raised. 22 | pub fn raise(&self) -> bool { 23 | !self.0.replace(true) 24 | } 25 | 26 | /// Lowers the flag returning if lowered. 27 | pub fn lower(&self) -> bool { 28 | self.0.replace(false) 29 | } 30 | 31 | /// Gets if the flag is raised. 32 | pub fn is_raised(&self) -> bool { 33 | self.0.get() 34 | } 35 | } 36 | 37 | #[cfg(test)] 38 | mod test { 39 | use super::*; 40 | 41 | #[test] 42 | fn test_raise_lower() { 43 | let flag = Flag::default(); 44 | assert!(!flag.is_raised()); 45 | assert!(flag.raise()); 46 | assert!(flag.is_raised()); 47 | assert!(!flag.raise()); 48 | assert!(flag.is_raised()); 49 | assert!(flag.lower()); 50 | assert!(!flag.is_raised()); 51 | assert!(!flag.lower()); 52 | assert!(!flag.is_raised()); 53 | } 54 | } 55 | -------------------------------------------------------------------------------- /src/sync/flag.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2018-2024 the Deno authors. MIT license. 2 | 3 | use std::sync::atomic::AtomicBool; 4 | use std::sync::atomic::Ordering; 5 | 6 | /// Simplifies the use of an atomic boolean as a flag. 7 | #[derive(Debug, Default)] 8 | pub struct AtomicFlag(AtomicBool); 9 | 10 | impl AtomicFlag { 11 | /// Creates a new flag that's lowered. 12 | pub const fn lowered() -> AtomicFlag { 13 | Self(AtomicBool::new(false)) 14 | } 15 | 16 | /// Creates a new flag that's raised. 17 | pub const fn raised() -> AtomicFlag { 18 | Self(AtomicBool::new(true)) 19 | } 20 | 21 | /// Raises the flag returning if the raise was successful. 22 | pub fn raise(&self) -> bool { 23 | !self.0.swap(true, Ordering::SeqCst) 24 | } 25 | 26 | /// Lowers the flag returning if the lower was successful. 27 | pub fn lower(&self) -> bool { 28 | self.0.swap(false, Ordering::SeqCst) 29 | } 30 | 31 | /// Gets if the flag is raised. 32 | pub fn is_raised(&self) -> bool { 33 | self.0.load(Ordering::SeqCst) 34 | } 35 | } 36 | 37 | #[cfg(test)] 38 | mod test { 39 | use super::*; 40 | 41 | #[test] 42 | fn atomic_flag_raises_lowers() { 43 | let flag = AtomicFlag::default(); 44 | assert!(!flag.is_raised()); // false by default 45 | assert!(flag.raise()); 46 | assert!(flag.is_raised()); 47 | assert!(!flag.raise()); 48 | assert!(flag.is_raised()); 49 | assert!(flag.lower()); 50 | assert!(flag.raise()); 51 | assert!(flag.lower()); 52 | assert!(!flag.lower()); 53 | let flag = AtomicFlag::raised(); 54 | assert!(flag.is_raised()); 55 | assert!(flag.lower()); 56 | } 57 | } 58 | -------------------------------------------------------------------------------- /src/waker.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2018-2024 the Deno authors. MIT license. 2 | 3 | use std::cell::UnsafeCell; 4 | use std::task::Waker; 5 | 6 | use crate::UnsendMarker; 7 | 8 | /// A ![`Sync`] and ![`Sync`] equivalent to `AtomicWaker`. 9 | #[derive(Default)] 10 | pub struct UnsyncWaker { 11 | waker: UnsafeCell>, 12 | _unsend_marker: UnsendMarker, 13 | } 14 | 15 | impl UnsyncWaker { 16 | /// Register a waker if the waker represents a different waker than is already stored. 17 | pub fn register(&self, waker: &Waker) { 18 | // SAFETY: This is We can guarantee no other threads are accessing this field as 19 | // we are !Send and !Sync. 20 | unsafe { 21 | if let Some(old_waker) = &mut *self.waker.get() { 22 | if old_waker.will_wake(waker) { 23 | return; 24 | } 25 | } 26 | *self.waker.get() = Some(waker.clone()) 27 | } 28 | } 29 | 30 | /// If a waker has been registered, wake the contained [`Waker`], unregistering it at the same time. 31 | pub fn wake(&self) { 32 | // SAFETY: This is We can guarantee no other threads are accessing this field as 33 | // we are !Send and !Sync. 34 | unsafe { 35 | if let Some(waker) = (*self.waker.get()).take() { 36 | waker.wake(); 37 | } 38 | } 39 | } 40 | 41 | /// If a waker has been registered, wake the contained [`Waker`], maintaining it for later use. 42 | pub fn wake_by_ref(&self) { 43 | // SAFETY: This is We can guarantee no other threads are accessing this field as 44 | // we are !Send and !Sync. 45 | unsafe { 46 | if let Some(waker) = &mut *self.waker.get() { 47 | waker.wake_by_ref(); 48 | } 49 | } 50 | } 51 | } 52 | -------------------------------------------------------------------------------- /src/mpsc/chunked_queue.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2018-2024 the Deno authors. MIT license. 2 | 3 | use std::collections::LinkedList; 4 | use std::collections::VecDeque; 5 | 6 | const CHUNK_SIZE: usize = 1024; 7 | 8 | /// A queue that stores elements in chunks in a linked list 9 | /// to reduce allocations. 10 | pub(crate) struct ChunkedQueue { 11 | chunks: LinkedList>, 12 | } 13 | 14 | impl Default for ChunkedQueue { 15 | fn default() -> Self { 16 | Self { 17 | chunks: Default::default(), 18 | } 19 | } 20 | } 21 | 22 | impl ChunkedQueue { 23 | pub fn len(&self) -> usize { 24 | match self.chunks.len() { 25 | 0 => 0, 26 | 1 => self.chunks.front().unwrap().len(), 27 | 2 => { 28 | self.chunks.front().unwrap().len() + self.chunks.back().unwrap().len() 29 | } 30 | _ => { 31 | self.chunks.front().unwrap().len() 32 | + CHUNK_SIZE * (self.chunks.len() - 2) 33 | + self.chunks.back().unwrap().len() 34 | } 35 | } 36 | } 37 | 38 | pub fn push_back(&mut self, value: T) { 39 | if let Some(tail) = self.chunks.back_mut() { 40 | if tail.len() < CHUNK_SIZE { 41 | tail.push_back(value); 42 | return; 43 | } 44 | } 45 | let mut new_buffer = VecDeque::with_capacity(CHUNK_SIZE); 46 | new_buffer.push_back(value); 47 | self.chunks.push_back(new_buffer); 48 | } 49 | 50 | pub fn pop_front(&mut self) -> Option { 51 | if let Some(head) = self.chunks.front_mut() { 52 | let value = head.pop_front(); 53 | if value.is_some() && head.is_empty() && self.chunks.len() > 1 { 54 | self.chunks.pop_front().unwrap(); 55 | } 56 | value 57 | } else { 58 | None 59 | } 60 | } 61 | } 62 | 63 | #[cfg(test)] 64 | mod test { 65 | use super::CHUNK_SIZE; 66 | 67 | #[test] 68 | fn ensure_len_correct() { 69 | let mut queue = super::ChunkedQueue::default(); 70 | for _ in 0..2 { 71 | for i in 0..CHUNK_SIZE * 20 { 72 | queue.push_back(i); 73 | assert_eq!(queue.len(), i + 1); 74 | } 75 | for i in (0..CHUNK_SIZE * 20).rev() { 76 | queue.pop_front(); 77 | assert_eq!(queue.len(), i); 78 | } 79 | assert_eq!(queue.len(), 0); 80 | } 81 | } 82 | } 83 | -------------------------------------------------------------------------------- /src/tokio/joinset.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2018-2024 the Deno authors. MIT license. 2 | // Some code and comments under MIT license where adapted from Tokio code 3 | // Copyright (c) 2023 Tokio Contributors 4 | 5 | use std::future::Future; 6 | use std::task::Context; 7 | use std::task::Poll; 8 | use std::task::Waker; 9 | use tokio::task::AbortHandle; 10 | use tokio::task::JoinError; 11 | 12 | use super::task::MaskFutureAsSend; 13 | use super::task::MaskResultAsSend; 14 | 15 | /// Wraps the tokio [`JoinSet`] to make it !Send-friendly and to make it easier and safer for us to 16 | /// poll while empty. 17 | pub struct JoinSet { 18 | joinset: tokio::task::JoinSet>, 19 | /// If join_next returns Ready(None), we stash the waker 20 | waker: Option, 21 | } 22 | 23 | impl Default for JoinSet { 24 | fn default() -> Self { 25 | Self { 26 | joinset: Default::default(), 27 | waker: None, 28 | } 29 | } 30 | } 31 | 32 | impl JoinSet { 33 | /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`] 34 | /// that can be used to remotely cancel the task. 35 | /// 36 | /// The provided future will start running in the background immediately 37 | /// when this method is called, even if you don't await anything on this 38 | /// `JoinSet`. 39 | /// 40 | /// # Panics 41 | /// 42 | /// This method panics if called outside of a Tokio runtime. 43 | /// 44 | /// [`AbortHandle`]: tokio::task::AbortHandle 45 | #[track_caller] 46 | pub fn spawn(&mut self, task: F) -> AbortHandle 47 | where 48 | F: Future, 49 | F: 'static, 50 | T: 'static, 51 | { 52 | // SAFETY: We only use this with the single-thread executor 53 | let handle = self.joinset.spawn(unsafe { MaskFutureAsSend::new(task) }); 54 | 55 | // If someone had called poll_join_next while we were empty, ask them to poll again 56 | // so we can properly register the waker with the underlying JoinSet. 57 | if let Some(waker) = self.waker.take() { 58 | waker.wake(); 59 | } 60 | handle 61 | } 62 | 63 | /// Returns the number of tasks currently in the `JoinSet`. 64 | pub fn len(&self) -> usize { 65 | self.joinset.len() 66 | } 67 | 68 | /// Returns whether the `JoinSet` is empty. 69 | pub fn is_empty(&self) -> bool { 70 | self.joinset.is_empty() 71 | } 72 | 73 | /// Waits until one of the tasks in the set completes and returns its output. 74 | /// 75 | /// # Cancel Safety 76 | /// 77 | /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!` 78 | /// statement and some other branch completes first, it is guaranteed that no tasks were 79 | /// removed from this `JoinSet`. 80 | pub fn poll_join_next( 81 | &mut self, 82 | cx: &mut Context, 83 | ) -> Poll> { 84 | match self.joinset.poll_join_next(cx) { 85 | Poll::Ready(Some(res)) => Poll::Ready(res.map(|res| res.into_inner())), 86 | Poll::Ready(None) => { 87 | // Stash waker 88 | self.waker = Some(cx.waker().clone()); 89 | Poll::Pending 90 | } 91 | Poll::Pending => Poll::Pending, 92 | } 93 | } 94 | 95 | /// Waits until one of the tasks in the set completes and returns its output. 96 | /// 97 | /// Returns `None` if the set is empty. 98 | /// 99 | /// # Cancel Safety 100 | /// 101 | /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!` 102 | /// statement and some other branch completes first, it is guaranteed that no tasks were 103 | /// removed from this `JoinSet`. 104 | pub async fn join_next(&mut self) -> Option> { 105 | self 106 | .joinset 107 | .join_next() 108 | .await 109 | .map(|result| result.map(|res| res.into_inner())) 110 | } 111 | 112 | /// Aborts all tasks on this `JoinSet`. 113 | /// 114 | /// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete 115 | /// cancellation, you should call `join_next` in a loop until the `JoinSet` is empty. 116 | pub fn abort_all(&mut self) { 117 | self.joinset.abort_all(); 118 | } 119 | 120 | /// Removes all tasks from this `JoinSet` without aborting them. 121 | /// 122 | /// The tasks removed by this call will continue to run in the background even if the `JoinSet` 123 | /// is dropped. 124 | pub fn detach_all(&mut self) { 125 | self.joinset.detach_all(); 126 | } 127 | } 128 | -------------------------------------------------------------------------------- /src/tokio/task.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2018-2024 the Deno authors. MIT license. 2 | 3 | use core::pin::Pin; 4 | use core::task::Context; 5 | use core::task::Poll; 6 | use std::future::Future; 7 | use std::marker::PhantomData; 8 | use tokio::runtime::Handle; 9 | use tokio::runtime::RuntimeFlavor; 10 | 11 | /// Equivalent to [`tokio::task::JoinHandle`]. 12 | #[repr(transparent)] 13 | pub struct JoinHandle { 14 | handle: tokio::task::JoinHandle>, 15 | _r: PhantomData, 16 | } 17 | 18 | impl JoinHandle { 19 | /// Equivalent to [`tokio::task::JoinHandle::abort`]. 20 | pub fn abort(&self) { 21 | self.handle.abort() 22 | } 23 | 24 | pub fn abort_handle(&self) -> tokio::task::AbortHandle { 25 | self.handle.abort_handle() 26 | } 27 | } 28 | 29 | impl Future for JoinHandle { 30 | type Output = Result; 31 | 32 | fn poll( 33 | self: Pin<&mut Self>, 34 | cx: &mut std::task::Context<'_>, 35 | ) -> std::task::Poll { 36 | // SAFETY: We are sure that handle is valid here 37 | unsafe { 38 | let me: &mut Self = Pin::into_inner_unchecked(self); 39 | let handle = Pin::new_unchecked(&mut me.handle); 40 | match handle.poll(cx) { 41 | Poll::Pending => Poll::Pending, 42 | Poll::Ready(Ok(r)) => Poll::Ready(Ok(r.into_inner())), 43 | Poll::Ready(Err(e)) => Poll::Ready(Err(e)), 44 | } 45 | } 46 | } 47 | } 48 | 49 | /// Equivalent to [`tokio::task::spawn`], but does not require the future to be [`Send`]. Must only be 50 | /// used on a [`RuntimeFlavor::CurrentThread`] executor, though this is only checked when running with 51 | /// debug assertions. 52 | #[inline(always)] 53 | pub fn spawn + 'static, R: 'static>( 54 | f: F, 55 | ) -> JoinHandle { 56 | debug_assert!( 57 | Handle::current().runtime_flavor() == RuntimeFlavor::CurrentThread 58 | ); 59 | // SAFETY: we know this is a current-thread executor 60 | let future = unsafe { MaskFutureAsSend::new(f) }; 61 | JoinHandle { 62 | handle: tokio::task::spawn(future), 63 | _r: Default::default(), 64 | } 65 | } 66 | 67 | /// Equivalent to [`tokio::task::spawn_blocking`]. Currently a thin wrapper around the tokio API, but this 68 | /// may change in the future. 69 | #[inline(always)] 70 | pub fn spawn_blocking< 71 | F: (FnOnce() -> R) + Send + 'static, 72 | R: Send + 'static, 73 | >( 74 | f: F, 75 | ) -> JoinHandle { 76 | let handle = tokio::task::spawn_blocking(|| MaskResultAsSend { result: f() }); 77 | JoinHandle { 78 | handle, 79 | _r: Default::default(), 80 | } 81 | } 82 | 83 | #[repr(transparent)] 84 | #[doc(hidden)] 85 | pub struct MaskResultAsSend { 86 | result: R, 87 | } 88 | 89 | /// SAFETY: We ensure that Send bounds are only faked when tokio is running on a current-thread executor 90 | unsafe impl Send for MaskResultAsSend {} 91 | 92 | impl MaskResultAsSend { 93 | #[inline(always)] 94 | pub fn into_inner(self) -> R { 95 | self.result 96 | } 97 | } 98 | 99 | #[repr(transparent)] 100 | pub struct MaskFutureAsSend { 101 | future: F, 102 | } 103 | 104 | impl MaskFutureAsSend { 105 | /// Mark a non-`Send` future as `Send`. This is a trick to be able to use 106 | /// `tokio::spawn()` (which requires `Send` futures) in a current thread 107 | /// runtime. 108 | /// 109 | /// # Safety 110 | /// 111 | /// You must ensure that the future is actually used on the same 112 | /// thread, ie. always use current thread runtime flavor from Tokio. 113 | #[inline(always)] 114 | pub unsafe fn new(future: F) -> Self { 115 | Self { future } 116 | } 117 | } 118 | 119 | // SAFETY: we are cheating here - this struct is NOT really Send, 120 | // but we need to mark it Send so that we can use `spawn()` in Tokio. 121 | unsafe impl Send for MaskFutureAsSend {} 122 | 123 | impl Future for MaskFutureAsSend { 124 | type Output = MaskResultAsSend; 125 | 126 | fn poll( 127 | self: Pin<&mut Self>, 128 | cx: &mut Context<'_>, 129 | ) -> Poll> { 130 | // SAFETY: We are sure that future is valid here 131 | unsafe { 132 | let me: &mut MaskFutureAsSend = Pin::into_inner_unchecked(self); 133 | let future = Pin::new_unchecked(&mut me.future); 134 | match future.poll(cx) { 135 | Poll::Pending => Poll::Pending, 136 | Poll::Ready(result) => Poll::Ready(MaskResultAsSend { result }), 137 | } 138 | } 139 | } 140 | } 141 | -------------------------------------------------------------------------------- /src/tokio/split.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2018-2024 the Deno authors. MIT license. 2 | 3 | use crate::UnsendMarker; 4 | use std::cell::Cell; 5 | use std::cell::UnsafeCell; 6 | use std::pin::Pin; 7 | use std::rc::Rc; 8 | use std::task::Poll; 9 | 10 | use tokio::io::AsyncRead; 11 | use tokio::io::AsyncWrite; 12 | 13 | /// Create a ![`Send`] I/O split on top of a stream. The split reader and writer halves are safe to use 14 | /// only in a single-threaded context, and are not legal to send to another thread. 15 | pub fn split_io(stream: S) -> (IOReadHalf, IOWriteHalf) 16 | where 17 | S: AsyncRead + AsyncWrite + Unpin, 18 | { 19 | let is_write_vectored = stream.is_write_vectored(); 20 | let stream = Rc::new(Split { 21 | stream: UnsafeCell::new(stream), 22 | lock: Cell::new(false), 23 | }); 24 | ( 25 | IOReadHalf { 26 | split: stream.clone(), 27 | _marker: UnsendMarker::default(), 28 | }, 29 | IOWriteHalf { 30 | split: stream, 31 | is_write_vectored, 32 | _marker: UnsendMarker::default(), 33 | }, 34 | ) 35 | } 36 | 37 | struct Split { 38 | stream: UnsafeCell, 39 | lock: Cell, 40 | } 41 | 42 | pub struct IOReadHalf { 43 | split: Rc>, 44 | _marker: UnsendMarker, 45 | } 46 | 47 | pub struct IOWriteHalf { 48 | split: Rc>, 49 | is_write_vectored: bool, 50 | _marker: UnsendMarker, 51 | } 52 | 53 | impl AsyncRead for IOReadHalf 54 | where 55 | S: AsyncRead + Unpin, 56 | { 57 | fn poll_read( 58 | self: std::pin::Pin<&mut Self>, 59 | cx: &mut std::task::Context<'_>, 60 | buf: &mut tokio::io::ReadBuf<'_>, 61 | ) -> std::task::Poll> { 62 | let lock = &self.split.lock; 63 | if lock.clone().into_inner() { 64 | return Poll::Ready(Err(std::io::Error::new( 65 | std::io::ErrorKind::PermissionDenied, 66 | "Re-entrant read while writing", 67 | ))); 68 | } 69 | lock.set(true); 70 | // SAFETY: This is !Send and the lock is set, so we can guarantee we won't get another &mut to the stream now 71 | let s = unsafe { self.split.stream.get().as_mut().unwrap() }; 72 | let res = Pin::new(s).poll_read(cx, buf); 73 | lock.set(false); 74 | res 75 | } 76 | } 77 | 78 | impl AsyncWrite for IOWriteHalf 79 | where 80 | S: AsyncWrite + Unpin, 81 | { 82 | fn poll_flush( 83 | self: std::pin::Pin<&mut Self>, 84 | cx: &mut std::task::Context<'_>, 85 | ) -> std::task::Poll> { 86 | let lock = &self.split.lock; 87 | if lock.clone().into_inner() { 88 | return Poll::Ready(Err(std::io::Error::new( 89 | std::io::ErrorKind::PermissionDenied, 90 | "Re-entrant write while reading", 91 | ))); 92 | } 93 | lock.set(true); 94 | // SAFETY: This is !Send and the lock is set, so we can guarantee we won't get another &mut to the stream now 95 | let s = unsafe { self.split.stream.get().as_mut().unwrap() }; 96 | let res = Pin::new(s).poll_flush(cx); 97 | lock.set(false); 98 | res 99 | } 100 | 101 | fn poll_shutdown( 102 | self: std::pin::Pin<&mut Self>, 103 | cx: &mut std::task::Context<'_>, 104 | ) -> std::task::Poll> { 105 | let lock = &self.split.lock; 106 | if lock.clone().into_inner() { 107 | return Poll::Ready(Err(std::io::Error::new( 108 | std::io::ErrorKind::PermissionDenied, 109 | "Re-entrant write while reading", 110 | ))); 111 | } 112 | lock.set(true); 113 | // SAFETY: This is !Send and the lock is set, so we can guarantee we won't get another &mut to the stream now 114 | let s = unsafe { self.split.stream.get().as_mut().unwrap() }; 115 | let res = Pin::new(s).poll_shutdown(cx); 116 | lock.set(false); 117 | res 118 | } 119 | 120 | fn poll_write( 121 | self: std::pin::Pin<&mut Self>, 122 | cx: &mut std::task::Context<'_>, 123 | buf: &[u8], 124 | ) -> std::task::Poll> { 125 | let lock = &self.split.lock; 126 | if lock.clone().into_inner() { 127 | return Poll::Ready(Err(std::io::Error::new( 128 | std::io::ErrorKind::PermissionDenied, 129 | "Re-entrant write while reading", 130 | ))); 131 | } 132 | lock.set(true); 133 | // SAFETY: This is !Send and the lock is set, so we can guarantee we won't get another &mut to the stream now 134 | let s = unsafe { self.split.stream.get().as_mut().unwrap() }; 135 | let res = Pin::new(s).poll_write(cx, buf); 136 | lock.set(false); 137 | res 138 | } 139 | 140 | fn poll_write_vectored( 141 | self: std::pin::Pin<&mut Self>, 142 | cx: &mut std::task::Context<'_>, 143 | bufs: &[std::io::IoSlice<'_>], 144 | ) -> std::task::Poll> { 145 | let lock = &self.split.lock; 146 | if lock.clone().into_inner() { 147 | return Poll::Ready(Err(std::io::Error::new( 148 | std::io::ErrorKind::PermissionDenied, 149 | "Re-entrant write while reading", 150 | ))); 151 | } 152 | lock.set(true); 153 | // SAFETY: This is !Send and the lock is set, so we can guarantee we won't get another &mut to the stream now 154 | let s = unsafe { self.split.stream.get().as_mut().unwrap() }; 155 | let res = Pin::new(s).poll_write_vectored(cx, bufs); 156 | lock.set(false); 157 | res 158 | } 159 | 160 | fn is_write_vectored(&self) -> bool { 161 | self.is_write_vectored 162 | } 163 | } 164 | 165 | #[cfg(test)] 166 | mod tests { 167 | use super::*; 168 | use tokio::io::AsyncReadExt; 169 | use tokio::io::AsyncWriteExt; 170 | 171 | #[tokio::test(flavor = "current_thread")] 172 | async fn split_duplex() { 173 | let (a, b) = tokio::io::duplex(1024); 174 | let (mut ar, mut aw) = split_io(a); 175 | let (mut br, mut bw) = split_io(b); 176 | 177 | bw.write_i8(123).await.unwrap(); 178 | assert_eq!(ar.read_i8().await.unwrap(), 123); 179 | 180 | aw.write_i8(123).await.unwrap(); 181 | assert_eq!(br.read_i8().await.unwrap(), 123); 182 | } 183 | } 184 | -------------------------------------------------------------------------------- /src/sync/value_creator.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2018-2024 the Deno authors. MIT license. 2 | 3 | use std::sync::Arc; 4 | 5 | use futures_util::future::BoxFuture; 6 | use futures_util::future::LocalBoxFuture; 7 | use futures_util::future::Shared; 8 | use futures_util::FutureExt; 9 | use parking_lot::Mutex; 10 | use tokio::task::JoinError; 11 | 12 | type JoinResult = Result>; 13 | type CreateFutureFn = 14 | Box LocalBoxFuture<'static, TResult> + Send + Sync>; 15 | 16 | #[derive(Debug)] 17 | struct State { 18 | retry_index: usize, 19 | future: Option>>>, 20 | } 21 | 22 | /// Attempts to create a shared value asynchronously on one tokio runtime while 23 | /// many runtimes are requesting the value. 24 | /// 25 | /// This is only useful when the value needs to get created once across 26 | /// many runtimes. 27 | /// 28 | /// This handles the case where the tokio runtime creating the value goes down 29 | /// while another one is waiting on the value. 30 | pub struct MultiRuntimeAsyncValueCreator { 31 | create_future: CreateFutureFn, 32 | state: Mutex>, 33 | } 34 | 35 | impl std::fmt::Debug 36 | for MultiRuntimeAsyncValueCreator 37 | { 38 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 39 | f.debug_struct("MultiRuntimeAsyncValueCreator").finish() 40 | } 41 | } 42 | 43 | impl MultiRuntimeAsyncValueCreator { 44 | pub fn new(create_future: CreateFutureFn) -> Self { 45 | Self { 46 | state: Mutex::new(State { 47 | retry_index: 0, 48 | future: None, 49 | }), 50 | create_future, 51 | } 52 | } 53 | 54 | pub async fn get(&self) -> TResult { 55 | let (mut future, mut retry_index) = { 56 | let mut state = self.state.lock(); 57 | let future = match &state.future { 58 | Some(future) => future.clone(), 59 | None => { 60 | let future = self.create_shared_future(); 61 | state.future = Some(future.clone()); 62 | future 63 | } 64 | }; 65 | (future, state.retry_index) 66 | }; 67 | 68 | loop { 69 | let result = future.await; 70 | 71 | match result { 72 | Ok(result) => return result, 73 | Err(join_error) => { 74 | if join_error.is_cancelled() { 75 | let mut state = self.state.lock(); 76 | 77 | if state.retry_index == retry_index { 78 | // we were the first one to retry, so create a new future 79 | // that we'll run from the current runtime 80 | state.retry_index += 1; 81 | state.future = Some(self.create_shared_future()); 82 | } 83 | 84 | retry_index = state.retry_index; 85 | future = state.future.as_ref().unwrap().clone(); 86 | 87 | // just in case we're stuck in a loop 88 | if retry_index > 1000 { 89 | panic!("Something went wrong.") // should never happen 90 | } 91 | } else { 92 | panic!("{}", join_error); 93 | } 94 | } 95 | } 96 | } 97 | } 98 | 99 | fn create_shared_future( 100 | &self, 101 | ) -> Shared>> { 102 | let future = (self.create_future)(); 103 | crate::spawn(future) 104 | .map(|result| result.map_err(Arc::new)) 105 | .boxed() 106 | .shared() 107 | } 108 | } 109 | 110 | #[cfg(test)] 111 | mod test { 112 | use crate::spawn; 113 | 114 | use super::*; 115 | 116 | #[tokio::test] 117 | async fn single_runtime() { 118 | let value_creator = MultiRuntimeAsyncValueCreator::new(Box::new(|| { 119 | async { 1 }.boxed_local() 120 | })); 121 | let value = value_creator.get().await; 122 | assert_eq!(value, 1); 123 | } 124 | 125 | #[test] 126 | fn multi_runtimes() { 127 | let value_creator = 128 | Arc::new(MultiRuntimeAsyncValueCreator::new(Box::new(|| { 129 | async { 130 | tokio::task::yield_now().await; 131 | 1 132 | } 133 | .boxed_local() 134 | }))); 135 | let handles = (0..3) 136 | .map(|_| { 137 | let value_creator = value_creator.clone(); 138 | std::thread::spawn(|| { 139 | create_runtime().block_on(async move { value_creator.get().await }) 140 | }) 141 | }) 142 | .collect::>(); 143 | for handle in handles { 144 | assert_eq!(handle.join().unwrap(), 1); 145 | } 146 | } 147 | 148 | #[test] 149 | fn multi_runtimes_first_never_finishes() { 150 | let is_first_run = Arc::new(Mutex::new(true)); 151 | let (tx, rx) = std::sync::mpsc::channel::<()>(); 152 | let value_creator = Arc::new(MultiRuntimeAsyncValueCreator::new({ 153 | let is_first_run = is_first_run.clone(); 154 | Box::new(move || { 155 | let is_first_run = is_first_run.clone(); 156 | let tx = tx.clone(); 157 | async move { 158 | let is_first_run = { 159 | let mut is_first_run = is_first_run.lock(); 160 | let initial_value = *is_first_run; 161 | *is_first_run = false; 162 | tx.send(()).unwrap(); 163 | initial_value 164 | }; 165 | if is_first_run { 166 | tokio::time::sleep(std::time::Duration::from_millis(30_000)).await; 167 | panic!("TIMED OUT"); // should not happen 168 | } else { 169 | tokio::task::yield_now().await; 170 | } 171 | 1 172 | } 173 | .boxed_local() 174 | }) 175 | })); 176 | std::thread::spawn({ 177 | let value_creator = value_creator.clone(); 178 | let is_first_run = is_first_run.clone(); 179 | move || { 180 | create_runtime().block_on(async { 181 | let value_creator = value_creator.clone(); 182 | // spawn a task that will never complete 183 | spawn(async move { value_creator.get().await }); 184 | // wait for the task to set is_first_run to false 185 | while *is_first_run.lock() { 186 | tokio::time::sleep(std::time::Duration::from_millis(20)).await; 187 | } 188 | // now exit the runtime while the value_creator is still pending 189 | }) 190 | } 191 | }); 192 | let handle = { 193 | let value_creator = value_creator.clone(); 194 | std::thread::spawn(|| { 195 | create_runtime().block_on(async move { 196 | let value_creator = value_creator.clone(); 197 | rx.recv().unwrap(); 198 | // even though the other runtime shutdown, this get() should 199 | // recover and still get the value 200 | value_creator.get().await 201 | }) 202 | }) 203 | }; 204 | assert_eq!(handle.join().unwrap(), 1); 205 | } 206 | 207 | fn create_runtime() -> tokio::runtime::Runtime { 208 | tokio::runtime::Builder::new_current_thread() 209 | .enable_all() 210 | .build() 211 | .unwrap() 212 | } 213 | } 214 | -------------------------------------------------------------------------------- /src/future.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2018-2024 the Deno authors. MIT license. 2 | 3 | use parking_lot::Mutex; 4 | use std::cell::RefCell; 5 | use std::future::Future; 6 | use std::pin::Pin; 7 | use std::rc::Rc; 8 | use std::sync::Arc; 9 | use std::task::Context; 10 | use std::task::Wake; 11 | use std::task::Waker; 12 | 13 | use crate::sync::AtomicFlag; 14 | 15 | impl LocalFutureExt for T where T: Future {} 16 | 17 | pub trait LocalFutureExt: std::future::Future { 18 | fn shared_local(self) -> SharedLocal 19 | where 20 | Self: Sized, 21 | Self::Output: Clone, 22 | { 23 | SharedLocal::new(self) 24 | } 25 | } 26 | 27 | enum FutureOrOutput { 28 | Future(TFuture), 29 | Output(TFuture::Output), 30 | } 31 | 32 | impl std::fmt::Debug for FutureOrOutput 33 | where 34 | TFuture::Output: std::fmt::Debug, 35 | { 36 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 37 | match self { 38 | Self::Future(_) => f.debug_tuple("Future").field(&"").finish(), 39 | Self::Output(arg0) => f.debug_tuple("Result").field(arg0).finish(), 40 | } 41 | } 42 | } 43 | 44 | struct SharedLocalData { 45 | future_or_output: FutureOrOutput, 46 | } 47 | 48 | impl std::fmt::Debug for SharedLocalData 49 | where 50 | TFuture::Output: std::fmt::Debug, 51 | { 52 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 53 | f.debug_struct("SharedLocalData") 54 | .field("future_or_output", &self.future_or_output) 55 | .finish() 56 | } 57 | } 58 | 59 | struct SharedLocalInner { 60 | data: RefCell>, 61 | child_waker_state: Arc, 62 | } 63 | 64 | impl std::fmt::Debug for SharedLocalInner 65 | where 66 | TFuture::Output: std::fmt::Debug, 67 | { 68 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 69 | f.debug_struct("SharedLocalInner") 70 | .field("data", &self.data) 71 | .field("child_waker_state", &self.child_waker_state) 72 | .finish() 73 | } 74 | } 75 | 76 | /// A !Send-friendly future whose result can be awaited multiple times. 77 | #[must_use = "futures do nothing unless you `.await` or poll them"] 78 | pub struct SharedLocal(Rc>); 79 | 80 | impl Clone for SharedLocal 81 | where 82 | TFuture::Output: Clone, 83 | { 84 | fn clone(&self) -> Self { 85 | Self(self.0.clone()) 86 | } 87 | } 88 | 89 | impl std::fmt::Debug for SharedLocal 90 | where 91 | TFuture::Output: std::fmt::Debug, 92 | { 93 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 94 | f.debug_tuple("SharedLocal").field(&self.0).finish() 95 | } 96 | } 97 | 98 | impl SharedLocal 99 | where 100 | TFuture::Output: Clone, 101 | { 102 | pub fn new(future: TFuture) -> Self { 103 | SharedLocal(Rc::new(SharedLocalInner { 104 | data: RefCell::new(SharedLocalData { 105 | future_or_output: FutureOrOutput::Future(future), 106 | }), 107 | child_waker_state: Arc::new(ChildWakerState { 108 | can_poll: AtomicFlag::raised(), 109 | wakers: Default::default(), 110 | }), 111 | })) 112 | } 113 | } 114 | 115 | impl std::future::Future for SharedLocal 116 | where 117 | TFuture::Output: Clone, 118 | { 119 | type Output = TFuture::Output; 120 | 121 | fn poll( 122 | self: std::pin::Pin<&mut Self>, 123 | cx: &mut std::task::Context<'_>, 124 | ) -> std::task::Poll { 125 | use std::task::Poll; 126 | 127 | let mut inner = self.0.data.borrow_mut(); 128 | match &mut inner.future_or_output { 129 | FutureOrOutput::Future(fut) => { 130 | self.0.child_waker_state.wakers.push(cx.waker().clone()); 131 | if self.0.child_waker_state.can_poll.lower() { 132 | let child_waker = Waker::from(self.0.child_waker_state.clone()); 133 | let mut child_cx = Context::from_waker(&child_waker); 134 | let fut = unsafe { Pin::new_unchecked(fut) }; 135 | match fut.poll(&mut child_cx) { 136 | Poll::Ready(result) => { 137 | inner.future_or_output = FutureOrOutput::Output(result.clone()); 138 | drop(inner); // stop borrow_mut 139 | let wakers = self.0.child_waker_state.wakers.take_all(); 140 | for waker in wakers { 141 | waker.wake(); 142 | } 143 | Poll::Ready(result) 144 | } 145 | Poll::Pending => Poll::Pending, 146 | } 147 | } else { 148 | Poll::Pending 149 | } 150 | } 151 | FutureOrOutput::Output(result) => Poll::Ready(result.clone()), 152 | } 153 | } 154 | } 155 | 156 | #[derive(Debug, Default)] 157 | struct WakerStore(Mutex>); 158 | 159 | impl WakerStore { 160 | pub fn take_all(&self) -> Vec { 161 | let mut wakers = self.0.lock(); 162 | std::mem::take(&mut *wakers) 163 | } 164 | 165 | pub fn clone_all(&self) -> Vec { 166 | self.0.lock().clone() 167 | } 168 | 169 | pub fn push(&self, waker: Waker) { 170 | self.0.lock().push(waker); 171 | } 172 | } 173 | 174 | #[derive(Debug)] 175 | struct ChildWakerState { 176 | can_poll: AtomicFlag, 177 | wakers: WakerStore, 178 | } 179 | 180 | impl Wake for ChildWakerState { 181 | fn wake(self: Arc) { 182 | self.can_poll.raise(); 183 | let wakers = self.wakers.take_all(); 184 | 185 | for waker in wakers { 186 | waker.wake(); 187 | } 188 | } 189 | 190 | fn wake_by_ref(self: &Arc) { 191 | self.can_poll.raise(); 192 | let wakers = self.wakers.clone_all(); 193 | 194 | for waker in wakers { 195 | waker.wake_by_ref(); 196 | } 197 | } 198 | } 199 | 200 | #[cfg(test)] 201 | mod test { 202 | use std::sync::Arc; 203 | 204 | use tokio::sync::Notify; 205 | 206 | use super::LocalFutureExt; 207 | 208 | #[tokio::test(flavor = "current_thread")] 209 | async fn test_shared_local_future() { 210 | let shared = super::SharedLocal::new(Box::pin(async { 42 })); 211 | assert_eq!(shared.clone().await, 42); 212 | assert_eq!(shared.await, 42); 213 | } 214 | 215 | #[tokio::test(flavor = "current_thread")] 216 | async fn test_shared_local() { 217 | let shared = async { 42 }.shared_local(); 218 | assert_eq!(shared.clone().await, 42); 219 | assert_eq!(shared.await, 42); 220 | } 221 | 222 | #[tokio::test(flavor = "current_thread")] 223 | async fn multiple_tasks_waiting() { 224 | let notify = Arc::new(Notify::new()); 225 | 226 | let shared = { 227 | let notify = notify.clone(); 228 | async move { 229 | tokio::task::yield_now().await; 230 | notify.notified().await; 231 | tokio::task::yield_now().await; 232 | tokio::task::yield_now().await; 233 | } 234 | .shared_local() 235 | }; 236 | let mut tasks = Vec::new(); 237 | for _ in 0..10 { 238 | tasks.push(crate::spawn(shared.clone())); 239 | } 240 | 241 | crate::spawn(async move { 242 | notify.notify_one(); 243 | for task in tasks { 244 | task.await.unwrap(); 245 | } 246 | }) 247 | .await 248 | .unwrap() 249 | } 250 | } 251 | -------------------------------------------------------------------------------- /src/mpsc/mod.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2018-2024 the Deno authors. MIT license. 2 | 3 | use std::cell::RefCell; 4 | use std::fmt::Formatter; 5 | use std::future::Future; 6 | use std::pin::Pin; 7 | use std::rc::Rc; 8 | use std::task::Context; 9 | use std::task::Poll; 10 | 11 | mod chunked_queue; 12 | 13 | use crate::UnsyncWaker; 14 | use chunked_queue::ChunkedQueue; 15 | 16 | pub struct SendError(pub T); 17 | 18 | impl std::fmt::Debug for SendError 19 | where 20 | T: std::fmt::Debug, 21 | { 22 | fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { 23 | f.debug_tuple("SendError").field(&self.0).finish() 24 | } 25 | } 26 | 27 | pub struct Sender { 28 | shared: Rc>>, 29 | } 30 | 31 | impl Sender { 32 | pub fn send(&self, value: T) -> Result<(), SendError> { 33 | let mut shared = self.shared.borrow_mut(); 34 | if shared.closed { 35 | return Err(SendError(value)); 36 | } 37 | shared.queue.push_back(value); 38 | shared.waker.wake(); 39 | Ok(()) 40 | } 41 | } 42 | 43 | impl Drop for Sender { 44 | fn drop(&mut self) { 45 | let mut shared = self.shared.borrow_mut(); 46 | shared.closed = true; 47 | shared.waker.wake(); 48 | } 49 | } 50 | 51 | pub struct Receiver { 52 | shared: Rc>>, 53 | } 54 | 55 | impl Drop for Receiver { 56 | fn drop(&mut self) { 57 | let mut shared = self.shared.borrow_mut(); 58 | shared.closed = true; 59 | } 60 | } 61 | 62 | impl Receiver { 63 | /// Receives a value from the channel, returning `None` if there 64 | /// are no more items and the channel is closed. 65 | pub async fn recv(&mut self) -> Option { 66 | // note: this is `&mut self` so that it can't be polled 67 | // concurrently. DO NOT change this to `&self` because 68 | // then futures will lose their wakers. 69 | RecvFuture { 70 | shared: &self.shared, 71 | } 72 | .await 73 | } 74 | 75 | /// Number of pending unread items. 76 | pub fn len(&self) -> usize { 77 | self.shared.borrow().queue.len() 78 | } 79 | 80 | /// If the receiver has no pending items. 81 | pub fn is_empty(&self) -> bool { 82 | self.len() == 0 83 | } 84 | } 85 | 86 | struct RecvFuture<'a, T> { 87 | shared: &'a RefCell>, 88 | } 89 | 90 | impl Future for RecvFuture<'_, T> { 91 | type Output = Option; 92 | 93 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { 94 | let mut shared = self.shared.borrow_mut(); 95 | if let Some(value) = shared.queue.pop_front() { 96 | Poll::Ready(Some(value)) 97 | } else if shared.closed { 98 | Poll::Ready(None) 99 | } else { 100 | shared.waker.register(cx.waker()); 101 | Poll::Pending 102 | } 103 | } 104 | } 105 | 106 | struct Shared { 107 | queue: ChunkedQueue, 108 | waker: UnsyncWaker, 109 | closed: bool, 110 | } 111 | 112 | /// A ![`Sync`] and ![`Sync`] equivalent to `tokio::sync::unbounded_channel`. 113 | pub fn unbounded_channel() -> (Sender, Receiver) { 114 | let shared = Rc::new(RefCell::new(Shared { 115 | queue: ChunkedQueue::default(), 116 | waker: UnsyncWaker::default(), 117 | closed: false, 118 | })); 119 | ( 120 | Sender { 121 | shared: shared.clone(), 122 | }, 123 | Receiver { shared }, 124 | ) 125 | } 126 | 127 | #[cfg(test)] 128 | mod test { 129 | use tokio::join; 130 | 131 | use super::*; 132 | 133 | #[tokio::test(flavor = "current_thread")] 134 | async fn sends_receives_exits() { 135 | let (sender, mut receiver) = unbounded_channel::(); 136 | sender.send(1).unwrap(); 137 | assert_eq!(receiver.recv().await, Some(1)); 138 | sender.send(2).unwrap(); 139 | assert_eq!(receiver.recv().await, Some(2)); 140 | drop(sender); 141 | assert_eq!(receiver.recv().await, None); 142 | } 143 | 144 | #[tokio::test(flavor = "current_thread")] 145 | async fn sends_multiple_then_drop() { 146 | let (sender, mut receiver) = unbounded_channel::(); 147 | sender.send(1).unwrap(); 148 | sender.send(2).unwrap(); 149 | drop(sender); 150 | assert_eq!(receiver.len(), 2); 151 | assert!(!receiver.is_empty()); 152 | assert_eq!(receiver.recv().await, Some(1)); 153 | assert_eq!(receiver.recv().await, Some(2)); 154 | assert_eq!(receiver.recv().await, None); 155 | assert_eq!(receiver.len(), 0); 156 | assert!(receiver.is_empty()); 157 | } 158 | 159 | #[tokio::test(flavor = "current_thread")] 160 | async fn receiver_dropped_sending() { 161 | let (sender, receiver) = unbounded_channel::(); 162 | drop(receiver); 163 | let err = sender.send(1).unwrap_err(); 164 | assert_eq!(err.0, 1); 165 | } 166 | 167 | #[tokio::test(flavor = "current_thread")] 168 | async fn receiver_recv_then_drop_sender() { 169 | let (sender, mut receiver) = unbounded_channel::(); 170 | let future = crate::spawn(async move { 171 | let value = receiver.recv().await; 172 | value.is_none() 173 | }); 174 | let future2 = crate::spawn(async move { 175 | drop(sender); 176 | true 177 | }); 178 | let (first, second) = join!(future, future2); 179 | assert!(first.unwrap()); 180 | assert!(second.unwrap()); 181 | } 182 | 183 | #[tokio::test(flavor = "current_thread")] 184 | async fn multiple_senders_divided_work() { 185 | for receiver_ticks in [None, Some(1), Some(10)] { 186 | for sender_ticks in [None, Some(1), Some(10)] { 187 | for sender_count in [1000, 100, 10, 2, 1] { 188 | let (sender, mut receiver) = unbounded_channel::(); 189 | let future = crate::spawn(async move { 190 | let mut values = Vec::with_capacity(1000); 191 | for _ in 0..1000 { 192 | if let Some(ticks) = receiver_ticks { 193 | for _ in 0..ticks { 194 | tokio::task::yield_now().await; 195 | } 196 | } 197 | let value = receiver.recv().await; 198 | values.push(value.unwrap()); 199 | } 200 | // both senders should be dropped at this point 201 | let value = receiver.recv().await; 202 | assert!(value.is_none()); 203 | 204 | values.sort(); 205 | // ensure we received these values 206 | #[allow(clippy::needless_range_loop)] 207 | for i in 0..1000 { 208 | assert_eq!(values[i], i); 209 | } 210 | }); 211 | 212 | let mut futures = Vec::with_capacity(1 + sender_count); 213 | futures.push(future); 214 | let sender = Rc::new(sender); 215 | for sender_index in 0..sender_count { 216 | let sender = sender.clone(); 217 | let batch_count = 1000 / sender_count; 218 | futures.push(crate::spawn(async move { 219 | for i in 0..batch_count { 220 | if let Some(ticks) = sender_ticks { 221 | for _ in 0..ticks { 222 | tokio::task::yield_now().await; 223 | } 224 | } 225 | sender.send(batch_count * sender_index + i).unwrap(); 226 | } 227 | })); 228 | } 229 | drop(sender); 230 | 231 | // wait all futures 232 | for future in futures { 233 | future.await.unwrap(); 234 | } 235 | } 236 | } 237 | } 238 | } 239 | } 240 | -------------------------------------------------------------------------------- /src/sync/task_queue.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2018-2025 the Deno authors. MIT license. 2 | 3 | use std::collections::LinkedList; 4 | use std::future::Future; 5 | use std::sync::Arc; 6 | 7 | use futures_util::task::AtomicWaker; 8 | use parking_lot::Mutex; 9 | 10 | use super::AtomicFlag; 11 | 12 | #[derive(Debug, Default)] 13 | struct TaskQueueTaskItem { 14 | is_ready: AtomicFlag, 15 | is_future_dropped: AtomicFlag, 16 | waker: AtomicWaker, 17 | } 18 | 19 | #[derive(Debug, Default)] 20 | struct TaskQueueTasks { 21 | is_running: bool, 22 | items: LinkedList>, 23 | } 24 | 25 | /// A queue that executes tasks sequentially one after the other 26 | /// ensuring order and that no task runs at the same time as another. 27 | /// 28 | /// Note that this differs from tokio's semaphore in that the order 29 | /// is **acquired** synchronously. 30 | #[derive(Debug, Default)] 31 | pub struct TaskQueue { 32 | tasks: Mutex, 33 | } 34 | 35 | impl TaskQueue { 36 | /// Acquires a permit where the tasks are executed one at a time 37 | /// and in the order that they were acquired. 38 | pub fn acquire(&self) -> TaskQueuePermitAcquireFuture { 39 | TaskQueuePermitAcquireFuture::new(self) 40 | } 41 | 42 | /// Alternate API that acquires a permit internally 43 | /// for the duration of the future. 44 | #[allow(unused)] 45 | pub fn run<'a, R>( 46 | &'a self, 47 | future: impl Future + 'a, 48 | ) -> impl Future + 'a { 49 | let acquire_future = self.acquire(); 50 | async move { 51 | let permit = acquire_future.await; 52 | let result = future.await; 53 | drop(permit); // explicit for clarity 54 | result 55 | } 56 | } 57 | 58 | fn raise_next(&self) { 59 | let front_item = { 60 | let mut tasks = self.tasks.lock(); 61 | 62 | // clear out any wakers for futures that were dropped 63 | while let Some(front_waker) = tasks.items.front() { 64 | if front_waker.is_future_dropped.is_raised() { 65 | tasks.items.pop_front(); 66 | } else { 67 | break; 68 | } 69 | } 70 | let front_item = tasks.items.pop_front(); 71 | tasks.is_running = front_item.is_some(); 72 | front_item 73 | }; 74 | 75 | // wake up the next waker 76 | if let Some(front_item) = front_item { 77 | front_item.is_ready.raise(); 78 | front_item.waker.wake(); 79 | } 80 | } 81 | } 82 | 83 | /// A permit that when dropped will allow another task to proceed. 84 | pub struct TaskQueuePermit<'a>(&'a TaskQueue); 85 | 86 | impl Drop for TaskQueuePermit<'_> { 87 | fn drop(&mut self) { 88 | self.0.raise_next(); 89 | } 90 | } 91 | 92 | pub struct TaskQueuePermitAcquireFuture<'a> { 93 | task_queue: Option<&'a TaskQueue>, 94 | item: Arc, 95 | } 96 | 97 | impl<'a> TaskQueuePermitAcquireFuture<'a> { 98 | pub fn new(task_queue: &'a TaskQueue) -> Self { 99 | // acquire the waker position synchronously 100 | let mut tasks = task_queue.tasks.lock(); 101 | let item = if !tasks.is_running { 102 | tasks.is_running = true; 103 | let item = Arc::new(TaskQueueTaskItem::default()); 104 | item.is_ready.raise(); 105 | item 106 | } else { 107 | let item = Arc::new(TaskQueueTaskItem::default()); 108 | tasks.items.push_back(item.clone()); 109 | item 110 | }; 111 | drop(tasks); 112 | Self { 113 | task_queue: Some(task_queue), 114 | item, 115 | } 116 | } 117 | } 118 | 119 | impl Drop for TaskQueuePermitAcquireFuture<'_> { 120 | fn drop(&mut self) { 121 | if let Some(task_queue) = self.task_queue.take() { 122 | if self.item.is_ready.is_raised() { 123 | task_queue.raise_next(); 124 | } else { 125 | self.item.is_future_dropped.raise(); 126 | } 127 | } 128 | } 129 | } 130 | 131 | impl<'a> Future for TaskQueuePermitAcquireFuture<'a> { 132 | type Output = TaskQueuePermit<'a>; 133 | 134 | fn poll( 135 | mut self: std::pin::Pin<&mut Self>, 136 | cx: &mut std::task::Context<'_>, 137 | ) -> std::task::Poll { 138 | if self.item.is_ready.is_raised() { 139 | std::task::Poll::Ready(TaskQueuePermit(self.task_queue.take().unwrap())) 140 | } else { 141 | self.item.waker.register(cx.waker()); 142 | std::task::Poll::Pending 143 | } 144 | } 145 | } 146 | 147 | #[cfg(test)] 148 | mod test { 149 | use std::sync::Arc; 150 | 151 | use parking_lot::Mutex; 152 | 153 | use super::*; 154 | 155 | #[tokio::test] 156 | async fn task_queue_runs_one_after_other() { 157 | let task_queue = TaskQueue::default(); 158 | let mut tasks = Vec::new(); 159 | let data = Arc::new(Mutex::new(0)); 160 | for i in 0..100 { 161 | let data = data.clone(); 162 | tasks.push(task_queue.run(async move { 163 | crate::spawn_blocking(move || { 164 | let mut data = data.lock(); 165 | assert_eq!(*data, i); 166 | *data = i + 1; 167 | }) 168 | .await 169 | .unwrap(); 170 | })); 171 | } 172 | futures_util::future::join_all(tasks).await; 173 | } 174 | 175 | #[tokio::test] 176 | async fn task_queue_run_in_sequence() { 177 | let task_queue = TaskQueue::default(); 178 | let data = Arc::new(Mutex::new(0)); 179 | 180 | let first = task_queue.run(async { 181 | *data.lock() = 1; 182 | }); 183 | let second = task_queue.run(async { 184 | assert_eq!(*data.lock(), 1); 185 | *data.lock() = 2; 186 | }); 187 | let _ = tokio::join!(first, second); 188 | 189 | assert_eq!(*data.lock(), 2); 190 | } 191 | 192 | #[tokio::test] 193 | async fn task_queue_future_dropped_before_poll() { 194 | let task_queue = Arc::new(TaskQueue::default()); 195 | 196 | // acquire a future, but do not await it 197 | let future = task_queue.acquire(); 198 | 199 | // this task tries to acquire another permit, but will be blocked by the first permit. 200 | let enter_flag = Arc::new(AtomicFlag::default()); 201 | let delayed_task = crate::spawn({ 202 | let enter_flag = enter_flag.clone(); 203 | let task_queue = task_queue.clone(); 204 | async move { 205 | enter_flag.raise(); 206 | task_queue.acquire().await; 207 | true 208 | } 209 | }); 210 | 211 | // ensure the task gets a chance to be scheduled and blocked 212 | tokio::task::yield_now().await; 213 | assert!(enter_flag.is_raised()); 214 | 215 | // now, drop the first future 216 | drop(future); 217 | 218 | assert!(delayed_task.await.unwrap()); 219 | } 220 | 221 | #[tokio::test] 222 | async fn task_queue_many_future_dropped_before_poll() { 223 | let task_queue = Arc::new(TaskQueue::default()); 224 | 225 | // acquire a future, but do not await it 226 | let mut futures = Vec::new(); 227 | for _ in 0..=10_000 { 228 | futures.push(task_queue.acquire()); 229 | } 230 | 231 | // this task tries to acquire another permit, but will be blocked by the first permit. 232 | let enter_flag = Arc::new(AtomicFlag::default()); 233 | let delayed_task = crate::spawn({ 234 | let task_queue = task_queue.clone(); 235 | let enter_flag = enter_flag.clone(); 236 | async move { 237 | enter_flag.raise(); 238 | task_queue.acquire().await; 239 | true 240 | } 241 | }); 242 | 243 | // ensure the task gets a chance to be scheduled and blocked 244 | tokio::task::yield_now().await; 245 | assert!(enter_flag.is_raised()); 246 | 247 | // now, drop the futures 248 | drop(futures); 249 | 250 | assert!(delayed_task.await.unwrap()); 251 | } 252 | 253 | #[tokio::test] 254 | async fn task_queue_middle_future_dropped_while_permit_acquired() { 255 | let task_queue = TaskQueue::default(); 256 | 257 | let fut1 = task_queue.acquire(); 258 | let fut2 = task_queue.acquire(); 259 | let fut3 = task_queue.acquire(); 260 | 261 | // should not hang 262 | drop(fut2); 263 | drop(fut1.await); 264 | drop(fut3.await); 265 | } 266 | } 267 | -------------------------------------------------------------------------------- /src/task_queue.rs: -------------------------------------------------------------------------------- 1 | // Copyright 2018-2024 the Deno authors. MIT license. 2 | 3 | use std::cell::RefCell; 4 | use std::collections::LinkedList; 5 | use std::future::Future; 6 | use std::rc::Rc; 7 | use std::task::Waker; 8 | 9 | use crate::Flag; 10 | 11 | #[derive(Debug, Default)] 12 | struct TaskQueueTaskItem { 13 | is_ready: Flag, 14 | is_future_dropped: Flag, 15 | waker: RefCell>, 16 | } 17 | 18 | #[derive(Debug, Default)] 19 | struct TaskQueueTasks { 20 | is_running: bool, 21 | items: LinkedList>, 22 | } 23 | 24 | /// A queue that executes tasks sequentially one after the other 25 | /// ensuring order and that no task runs at the same time as another. 26 | #[derive(Debug, Default)] 27 | pub struct TaskQueue { 28 | tasks: RefCell, 29 | } 30 | 31 | impl TaskQueue { 32 | /// Acquires a permit where the tasks are executed one at a time 33 | /// and in the order that they were acquired. 34 | pub fn acquire(self: &Rc) -> TaskQueuePermitAcquireFuture { 35 | TaskQueuePermitAcquireFuture::new(self.clone()) 36 | } 37 | 38 | /// Alternate API that acquires a permit internally 39 | /// for the duration of the future. 40 | pub fn run( 41 | self: &Rc, 42 | future: impl Future, 43 | ) -> impl Future { 44 | let acquire_future = self.acquire(); 45 | async move { 46 | let permit = acquire_future.await; 47 | let result = future.await; 48 | drop(permit); // explicit for clarity 49 | result 50 | } 51 | } 52 | 53 | fn raise_next(&self) { 54 | let front_item = { 55 | let mut tasks = self.tasks.borrow_mut(); 56 | 57 | // clear out any wakers for futures that were dropped 58 | while let Some(front_waker) = tasks.items.front() { 59 | if front_waker.is_future_dropped.is_raised() { 60 | tasks.items.pop_front(); 61 | } else { 62 | break; 63 | } 64 | } 65 | let front_item = tasks.items.pop_front(); 66 | tasks.is_running = front_item.is_some(); 67 | front_item 68 | }; 69 | 70 | // wake up the next waker 71 | if let Some(front_item) = front_item { 72 | front_item.is_ready.raise(); 73 | let maybe_waker = front_item.waker.borrow_mut().take(); 74 | if let Some(waker) = maybe_waker { 75 | waker.wake(); 76 | } 77 | } 78 | } 79 | } 80 | 81 | /// A permit that when dropped will allow another task to proceed. 82 | pub struct TaskQueuePermit(Rc); 83 | 84 | impl Drop for TaskQueuePermit { 85 | fn drop(&mut self) { 86 | self.0.raise_next(); 87 | } 88 | } 89 | 90 | pub struct TaskQueuePermitAcquireFuture { 91 | task_queue: Option>, 92 | item: Option>, 93 | } 94 | 95 | impl Drop for TaskQueuePermitAcquireFuture { 96 | fn drop(&mut self) { 97 | if let Some(task_queue) = self.task_queue.take() { 98 | if let Some(item) = self.item.take() { 99 | if item.is_ready.is_raised() { 100 | task_queue.raise_next(); 101 | } else { 102 | item.is_future_dropped.raise(); 103 | } 104 | } else { 105 | // this was the first item, so raise the next one 106 | task_queue.raise_next(); 107 | } 108 | } 109 | } 110 | } 111 | 112 | impl TaskQueuePermitAcquireFuture { 113 | pub fn new(task_queue: Rc) -> Self { 114 | // acquire the position synchronously 115 | let mut tasks = task_queue.tasks.borrow_mut(); 116 | if !tasks.is_running { 117 | tasks.is_running = true; 118 | drop(tasks); 119 | Self { 120 | task_queue: Some(task_queue), 121 | item: None, // avoid boxing for the fast path 122 | } 123 | } else { 124 | let item = Rc::new(TaskQueueTaskItem::default()); 125 | tasks.items.push_back(item.clone()); 126 | drop(tasks); 127 | Self { 128 | task_queue: Some(task_queue), 129 | item: Some(item), 130 | } 131 | } 132 | } 133 | } 134 | 135 | impl Future for TaskQueuePermitAcquireFuture { 136 | type Output = TaskQueuePermit; 137 | 138 | fn poll( 139 | mut self: std::pin::Pin<&mut Self>, 140 | cx: &mut std::task::Context<'_>, 141 | ) -> std::task::Poll { 142 | // check if we're ready to run 143 | let Some(item) = &self.item else { 144 | // no item means this was the first queued future, so we're ready to run 145 | return std::task::Poll::Ready(TaskQueuePermit( 146 | self.task_queue.take().unwrap(), 147 | )); 148 | }; 149 | if item.is_ready.is_raised() { 150 | // we're done, move the task queue out 151 | std::task::Poll::Ready(TaskQueuePermit(self.task_queue.take().unwrap())) 152 | } else { 153 | // store the waker for next time 154 | let mut stored_waker = item.waker.borrow_mut(); 155 | // update with the latest waker if it's different or not set 156 | if stored_waker 157 | .as_ref() 158 | .map(|w| !w.will_wake(cx.waker())) 159 | .unwrap_or(true) 160 | { 161 | *stored_waker = Some(cx.waker().clone()); 162 | } 163 | 164 | std::task::Poll::Pending 165 | } 166 | } 167 | } 168 | 169 | #[cfg(test)] 170 | mod tests { 171 | use std::sync::Arc; 172 | use std::sync::Mutex; 173 | 174 | use crate::tokio::JoinSet; 175 | 176 | use super::*; 177 | 178 | #[tokio::test] 179 | async fn task_queue_runs_one_after_other() { 180 | let task_queue = Rc::new(TaskQueue::default()); 181 | let mut set = JoinSet::default(); 182 | let data = Arc::new(Mutex::new(0)); 183 | for i in 0..100 { 184 | let data = data.clone(); 185 | let task_queue = task_queue.clone(); 186 | let acquire = task_queue.acquire(); 187 | set.spawn(async move { 188 | let permit = acquire.await; 189 | crate::spawn_blocking(move || { 190 | let mut data = data.lock().unwrap(); 191 | assert_eq!(i, *data); 192 | *data = i + 1; 193 | }) 194 | .await 195 | .unwrap(); 196 | drop(permit); 197 | drop(task_queue); 198 | }); 199 | } 200 | while let Some(res) = set.join_next().await { 201 | assert!(res.is_ok()); 202 | } 203 | } 204 | 205 | #[tokio::test] 206 | async fn tasks_run_in_sequence() { 207 | let task_queue = Rc::new(TaskQueue::default()); 208 | let data = RefCell::new(0); 209 | 210 | let first = task_queue.run(async { 211 | *data.borrow_mut() = 1; 212 | }); 213 | let second = task_queue.run(async { 214 | assert_eq!(*data.borrow(), 1); 215 | *data.borrow_mut() = 2; 216 | }); 217 | let _ = tokio::join!(first, second); 218 | 219 | assert_eq!(*data.borrow(), 2); 220 | } 221 | 222 | #[tokio::test] 223 | async fn future_dropped_before_poll() { 224 | let task_queue = Rc::new(TaskQueue::default()); 225 | 226 | // acquire a future, but do not await it 227 | let future = task_queue.acquire(); 228 | 229 | // this task tries to acquire another permit, but will be blocked by the first permit. 230 | let enter_flag = Rc::new(Flag::default()); 231 | let delayed_task = crate::spawn({ 232 | let task_queue = task_queue.clone(); 233 | let enter_flag = enter_flag.clone(); 234 | async move { 235 | enter_flag.raise(); 236 | task_queue.acquire().await; 237 | true 238 | } 239 | }); 240 | 241 | // ensure the task gets a chance to be scheduled and blocked 242 | tokio::task::yield_now().await; 243 | assert!(enter_flag.is_raised()); 244 | 245 | // now, drop the first future 246 | drop(future); 247 | 248 | assert!(delayed_task.await.unwrap()); 249 | } 250 | 251 | #[tokio::test] 252 | async fn many_future_dropped_before_poll() { 253 | let task_queue = Rc::new(TaskQueue::default()); 254 | 255 | // acquire a future, but do not await it 256 | let mut futures = Vec::new(); 257 | for _ in 0..=10_000 { 258 | futures.push(task_queue.acquire()); 259 | } 260 | 261 | // this task tries to acquire another permit, but will be blocked by the first permit. 262 | let enter_flag = Rc::new(Flag::default()); 263 | let delayed_task = crate::spawn({ 264 | let task_queue = task_queue.clone(); 265 | let enter_flag = enter_flag.clone(); 266 | async move { 267 | enter_flag.raise(); 268 | task_queue.acquire().await; 269 | true 270 | } 271 | }); 272 | 273 | // ensure the task gets a chance to be scheduled and blocked 274 | tokio::task::yield_now().await; 275 | assert!(enter_flag.is_raised()); 276 | 277 | // now, drop the futures 278 | drop(futures); 279 | 280 | assert!(delayed_task.await.unwrap()); 281 | } 282 | 283 | #[tokio::test] 284 | async fn acquires_position_synchronously() { 285 | let task_queue = Rc::new(TaskQueue::default()); 286 | 287 | let fut1 = task_queue.acquire(); 288 | let fut2 = task_queue.acquire(); 289 | let fut3 = task_queue.acquire(); 290 | let fut4 = task_queue.acquire(); 291 | let value = Rc::new(RefCell::new(0)); 292 | 293 | let task1 = crate::spawn({ 294 | let value = value.clone(); 295 | async move { 296 | let permit = fut2.await; 297 | assert_eq!(*value.borrow(), 1); 298 | *value.borrow_mut() += 1; 299 | drop(permit); 300 | // dropping this future without awaiting it 301 | // should cause the next future to be polled 302 | drop(fut3); 303 | } 304 | }); 305 | let task2 = crate::spawn({ 306 | let value = value.clone(); 307 | async move { 308 | // give the other task some time 309 | tokio::task::yield_now().await; 310 | let permit = fut1.await; 311 | assert_eq!(*value.borrow(), 0); 312 | *value.borrow_mut() += 1; 313 | drop(permit); 314 | } 315 | }); 316 | let task3 = crate::spawn({ 317 | let value = value.clone(); 318 | async move { 319 | // give the other tasks some time 320 | tokio::task::yield_now().await; 321 | let permit = fut4.await; 322 | assert_eq!(*value.borrow(), 2); 323 | *value.borrow_mut() += 1; 324 | drop(permit); 325 | } 326 | }); 327 | 328 | tokio::try_join!(task1, task2, task3).unwrap(); 329 | assert_eq!(*value.borrow(), 3); 330 | } 331 | 332 | #[tokio::test] 333 | async fn middle_future_dropped_while_permit_acquired() { 334 | let task_queue = Rc::new(TaskQueue::default()); 335 | 336 | let fut1 = task_queue.acquire(); 337 | let fut2 = task_queue.acquire(); 338 | let fut3 = task_queue.acquire(); 339 | 340 | // should not hang 341 | drop(fut2); 342 | drop(fut1.await); 343 | drop(fut3.await); 344 | } 345 | } 346 | --------------------------------------------------------------------------------