diff options
Diffstat (limited to 'tokio-sync')
34 files changed, 0 insertions, 7411 deletions
diff --git a/tokio-sync/CHANGELOG.md b/tokio-sync/CHANGELOG.md deleted file mode 100644 index 44033503..00000000 --- a/tokio-sync/CHANGELOG.md +++ /dev/null @@ -1,76 +0,0 @@ -# 0.2.0-alpha.6 (September 30, 2019) - -- Move to `futures-*-preview 0.3.0-alpha.19` -- Move to `pin-project 0.4` - -# 0.2.0-alpha.5 (September 19, 2019) - -### Changed -- rename `Lock` -> `Mutex` and make it more like `std::sync::Mutex` (#1573). - -### Added -- `Barrier`, an async version of `std::sync::Barrier` (#1571). - -# 0.2.0-alpha.4 (August 29, 2019) - -- Track tokio release. - -# 0.2.0-alpha.3 (August 23, 2019) - -- Track `tokio` version number - -# 0.2.0-alpha.2 (August 17, 2019) - -### Changed -- Update `futures` dependency to 0.3.0-alpha.18. - -# 0.2.0-alpha.1 (August 8, 2019) - -### Changed -- Switch to `async`, `await`, and `std::future`. - -# 0.1.6 (June 4, 2019) - -### Added -- Add Sync impl for Lock (#1117). - -# 0.1.5 (April 22, 2019) - -### Added -- Add asynchronous mutual exclusion primitive (#964). - -# 0.1.4 (March 13, 2019) - -### Fixed -- Fix memory leak on channel drop (#917). - -### Added -- `std::error::Error` implementation for `oneshot`, `watch` error types (#967). - -# 0.1.3 (March 1, 2019) - -### Added -- `Watch`, a single value broadcast channel (#922). -- `std::error::Error` implementation for more `mpsc` types (#937). - -# 0.1.2 (February 20, 2019) - -### Fixes -- `mpsc` and `Semaphore` when releasing permits (#904). -- `oneshot` task handle leak (#911). - -### Changes -- Performance improvements in `AtomicTask` (#892). -- Improved assert message when creating a channel with bound of zero (#906). - -### Adds -- `AtomicTask::take_task` (#895). - -# 0.1.1 (February 1, 2019) - -### Fixes -- Panic when creating a channel with bound 0 (#879). - -# 0.1.0 (January 24, 2019) - -- Initial Release diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml deleted file mode 100644 index bb2fd607..00000000 --- a/tokio-sync/Cargo.toml +++ /dev/null @@ -1,39 +0,0 @@ -[package] -name = "tokio-sync" -# When releasing to crates.io: -# - Remove path dependencies -# - Update html_root_url. -# - Update doc url -# - Cargo.toml -# - Update CHANGELOG.md. -# - Create "v0.2.x" git tag. -version = "0.2.0-alpha.6" -edition = "2018" -authors = ["Tokio Contributors <team@tokio.rs>"] -license = "MIT" -repository = "https://github.com/tokio-rs/tokio" -homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-sync/0.2.0-alpha.6/tokio_sync" -description = """ -Synchronization utilities. -""" -categories = ["asynchronous"] - -[features] -async-traits = ["futures-sink-preview"] - -[dependencies] -fnv = "1.0.6" -futures-core-preview = { version = "=0.3.0-alpha.19" } -futures-sink-preview = { version = "=0.3.0-alpha.19", optional = true } -futures-util-preview = { version = "=0.3.0-alpha.19" } - -[dev-dependencies] -tokio = { version = "0.2.0-alpha.6", path = "../tokio" } -tokio-test = { version = "0.2.0-alpha.6", path = "../tokio-test" } - -env_logger = { version = "0.6", default-features = false } -loom = { version = "0.2.1", features = ["futures"] } - -[package.metadata.docs.rs] -all-features = true diff --git a/tokio-sync/LICENSE b/tokio-sync/LICENSE deleted file mode 100644 index cdb28b4b..00000000 --- a/tokio-sync/LICENSE +++ /dev/null @@ -1,25 +0,0 @@ -Copyright (c) 2019 Tokio Contributors - -Permission is hereby granted, free of charge, to any -person obtaining a copy of this software and associated -documentation files (the "Software"), to deal in the -Software without restriction, including without -limitation the rights to use, copy, modify, merge, -publish, distribute, sublicense, and/or sell copies of -the Software, and to permit persons to whom the Software -is furnished to do so, subject to the following -conditions: - -The above copyright notice and this permission notice -shall be included in all copies or substantial portions -of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -DEALINGS IN THE SOFTWARE. diff --git a/tokio-sync/README.md b/tokio-sync/README.md deleted file mode 100644 index 6e209f69..00000000 --- a/tokio-sync/README.md +++ /dev/null @@ -1,13 +0,0 @@ -# tokio-sync - -Synchronization utilities - -## License - -This project is licensed under the [MIT license](LICENSE). - -### Contribution - -Unless you explicitly state otherwise, any contribution intentionally submitted -for inclusion in Tokio by you, shall be licensed as MIT, without any additional -terms or conditions. diff --git a/tokio-sync/benches/mpsc.rs b/tokio-sync/benches/mpsc.rs deleted file mode 100644 index 95d8e5c7..00000000 --- a/tokio-sync/benches/mpsc.rs +++ /dev/null @@ -1,536 +0,0 @@ -#![cfg(feature = "broken")] -#![feature(test)] -#![warn(rust_2018_idioms)] - -extern crate test; - -type Medium = [usize; 64]; -type Large = [Medium; 64]; - -mod tokio { - use futures::{future, Async, Future, Sink, Stream}; - use std::thread; - use test::{self, Bencher}; - use tokio_sync::mpsc::*; - - #[bench] - fn bounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::<super::Medium>(1_000)); - }) - } - - #[bench] - fn unbounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded_channel::<super::Medium>()); - }) - } - #[bench] - fn bounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::<super::Large>(1_000)); - }) - } - - #[bench] - fn unbounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded_channel::<super::Large>()); - }) - } - - #[bench] - fn send_one_message(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - // Send - tx.try_send(1).unwrap(); - - // Receive - assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap()); - }) - } - - #[bench] - fn send_one_message_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::<super::Large>(1_000); - - // Send - let _ = tx.try_send([[0; 64]; 64]); - - // Receive - let _ = test::black_box(&rx.poll()); - }) - } - - #[bench] - fn bounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = channel::<i32>(1_000); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::<i32>(1); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_not_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::<i32>(1); - tx.try_send(1).unwrap(); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = unbounded_channel::<i32>(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready_x5(b: &mut Bencher) { - let (_tx, mut rx) = unbounded_channel::<i32>(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_uncontended_1(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - for i in 0..1000 { - tx.try_send(i).unwrap(); - // No need to create a task, because poll is not going to park. - assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); - } - }) - } - - #[bench] - fn bounded_uncontended_1_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::<super::Large>(1_000); - - for i in 0..1000 { - let _ = tx.try_send([[i; 64]; 64]); - // No need to create a task, because poll is not going to park. - let _ = test::black_box(&rx.poll()); - } - }) - } - - #[bench] - fn bounded_uncontended_2(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1000); - - for i in 0..1000 { - tx.try_send(i).unwrap(); - } - - for i in 0..1000 { - // No need to create a task, because poll is not going to park. - assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap()); - } - }) - } - - #[bench] - fn contended_unbounded_tx(b: &mut Bencher) { - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..4 { - let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for mut tx in rx.iter() { - for i in 0..1_000 { - tx.try_send(i).unwrap(); - } - } - })); - } - - b.iter(|| { - // TODO make unbounded - let (tx, rx) = channel::<i32>(1_000_000); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(4 * 1_000); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } - - #[bench] - fn contended_bounded_tx(b: &mut Bencher) { - const THREADS: usize = 4; - const ITERS: usize = 100; - - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..THREADS { - let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for tx in rx.iter() { - let mut tx = tx.wait(); - for i in 0..ITERS { - tx.send(i as i32).unwrap(); - } - } - })); - } - - b.iter(|| { - let (tx, rx) = channel::<i32>(1); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(THREADS * ITERS); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } -} - -mod legacy { - use futures::sync::mpsc::*; - use futures::{future, Async, Future, Sink, Stream}; - use std::thread; - use test::{self, Bencher}; - - #[bench] - fn bounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::<super::Medium>(1_000)); - }) - } - - #[bench] - fn unbounded_new_medium(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded::<super::Medium>()); - }) - } - - #[bench] - fn bounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&channel::<super::Large>(1_000)); - }) - } - - #[bench] - fn unbounded_new_large(b: &mut Bencher) { - b.iter(|| { - let _ = test::black_box(&unbounded::<super::Large>()); - }) - } - - #[bench] - fn send_one_message(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel(1_000); - - // Send - tx.try_send(1).unwrap(); - - // Receive - assert_eq!(Ok(Async::Ready(Some(1))), rx.poll()); - }) - } - - #[bench] - fn send_one_message_large(b: &mut Bencher) { - b.iter(|| { - let (mut tx, mut rx) = channel::<super::Large>(1_000); - - // Send - let _ = tx.try_send([[0; 64]; 64]); - - // Receive - let _ = test::black_box(&rx.poll()); - }) - } - - #[bench] - fn bounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = channel::<i32>(1_000); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::<i32>(0); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn bounded_tx_poll_not_ready(b: &mut Bencher) { - let (mut tx, _rx) = channel::<i32>(0); - tx.try_send(1).unwrap(); - b.iter(|| { - future::lazy(|| { - assert!(tx.poll_ready().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready(b: &mut Bencher) { - let (_tx, mut rx) = unbounded::<i32>(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_rx_not_ready_x5(b: &mut Bencher) { - let (_tx, mut rx) = unbounded::<i32>(); - b.iter(|| { - future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }) - } - - #[bench] - fn unbounded_uncontended_1(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = unbounded(); - - for i in 0..1000 { - UnboundedSender::unbounded_send(&tx, i).expect("send"); - // No need to create a task, because poll is not going to park. - assert_eq!(Ok(Async::Ready(Some(i))), rx.poll()); - } - }) - } - - #[bench] - fn unbounded_uncontended_1_large(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = unbounded::<super::Large>(); - - for i in 0..1000 { - let _ = UnboundedSender::unbounded_send(&tx, [[i; 64]; 64]); - // No need to create a task, because poll is not going to park. - let _ = test::black_box(&rx.poll()); - } - }) - } - - #[bench] - fn unbounded_uncontended_2(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = unbounded(); - - for i in 0..1000 { - UnboundedSender::unbounded_send(&tx, i).expect("send"); - } - - for i in 0..1000 { - // No need to create a task, because poll is not going to park. - assert_eq!(Ok(Async::Ready(Some(i))), rx.poll()); - } - }) - } - - #[bench] - fn multi_thread_unbounded_tx(b: &mut Bencher) { - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..4 { - let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for mut tx in rx.iter() { - for i in 0..1_000 { - tx.try_send(i).unwrap(); - } - } - })); - } - - b.iter(|| { - let (tx, rx) = channel::<i32>(1_000_000); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(4 * 1_000); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } - - #[bench] - fn contended_bounded_tx(b: &mut Bencher) { - const THREADS: usize = 4; - const ITERS: usize = 100; - - let mut threads = vec![]; - let mut txs = vec![]; - - for _ in 0..THREADS { - let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>(); - txs.push(tx); - - threads.push(thread::spawn(move || { - for tx in rx.iter() { - let mut tx = tx.wait(); - for i in 0..ITERS { - tx.send(i as i32).unwrap(); - } - } - })); - } - - b.iter(|| { - let (tx, rx) = channel::<i32>(1); - - for th in &txs { - th.send(tx.clone()).unwrap(); - } - - drop(tx); - - let rx = rx.wait().take(THREADS * ITERS); - - for v in rx { - let _ = test::black_box(v); - } - }); - - drop(txs); - - for th in threads { - th.join().unwrap(); - } - } -} diff --git a/tokio-sync/benches/oneshot.rs b/tokio-sync/benches/oneshot.rs deleted file mode 100644 index b2f37805..00000000 --- a/tokio-sync/benches/oneshot.rs +++ /dev/null @@ -1,239 +0,0 @@ -#![cfg(feature = "broken")] -#![feature(test)] -#![warn(rust_2018_idioms)] - -extern crate test; - -mod tokio { - use futures::{future, Async, Future}; - use test::Bencher; - use tokio_sync::oneshot; - - #[bench] - fn new(b: &mut Bencher) { - b.iter(|| { - let _ = ::test::black_box(&oneshot::channel::<i32>()); - }) - } - - #[bench] - fn same_thread_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - let _ = tx.send(1); - - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - }); - } - - #[bench] - fn same_thread_recv_multi_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - future::lazy(|| { - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - - let _ = tx.send(1); - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }); - } - - #[bench] - fn multi_thread_send_recv(b: &mut Bencher) { - const MAX: usize = 10_000_000; - - use std::thread; - - fn spin<F: Future>(mut f: F) -> Result<F::Item, F::Error> { - use futures::Async::Ready; - loop { - match f.poll() { - Ok(Ready(v)) => return Ok(v), - Ok(_) => {} - Err(e) => return Err(e), - } - } - } - - let mut ping_txs = vec![]; - let mut ping_rxs = vec![]; - let mut pong_txs = vec![]; - let mut pong_rxs = vec![]; - - for _ in 0..MAX { - let (tx, rx) = oneshot::channel::<()>(); - - ping_txs.push(Some(tx)); - ping_rxs.push(Some(rx)); - - let (tx, rx) = oneshot::channel::<()>(); - - pong_txs.push(Some(tx)); - pong_rxs.push(Some(rx)); - } - - thread::spawn(move || { - future::lazy(|| { - for i in 0..MAX { - let ping_rx = ping_rxs[i].take().unwrap(); - let pong_tx = pong_txs[i].take().unwrap(); - - if spin(ping_rx).is_err() { - return Ok(()); - } - - pong_tx.send(()).unwrap(); - } - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - }); - - future::lazy(|| { - let mut i = 0; - - b.iter(|| { - let ping_tx = ping_txs[i].take().unwrap(); - let pong_rx = pong_rxs[i].take().unwrap(); - - ping_tx.send(()).unwrap(); - spin(pong_rx).unwrap(); - - i += 1; - }); - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - } -} - -mod legacy { - use futures::sync::oneshot; - use futures::{future, Async, Future}; - use test::Bencher; - - #[bench] - fn new(b: &mut Bencher) { - b.iter(|| { - let _ = ::test::black_box(&oneshot::channel::<i32>()); - }) - } - - #[bench] - fn same_thread_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - let _ = tx.send(1); - - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - }); - } - - #[bench] - fn same_thread_recv_multi_send_recv(b: &mut Bencher) { - b.iter(|| { - let (tx, mut rx) = oneshot::channel(); - - future::lazy(|| { - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - let _ = rx.poll(); - - let _ = tx.send(1); - assert_eq!(Async::Ready(1), rx.poll().unwrap()); - - Ok::<_, ()>(()) - }) - .wait() - .unwrap(); - }); - } - - #[bench] - fn multi_thread_send_recv(b: &mut Bencher) { - const MAX: usize = 10_000_000; - - use std::thread; - - fn spin<F: Future>(mut f: F) -> Result<F::Item, F::Error> { - use futures::Async::Ready; - loop { - match f.poll() { - Ok(Ready(v)) => return Ok(v), - Ok(_) => {} - Err(e) => return Err(e), - } - } - } - - let mut ping_txs = vec![]; - let mut ping_rxs = vec![]; - let mut pong_txs = vec![]; - let mut pong_rxs = vec![]; - - for _ in 0..MAX { - let (tx, rx) = oneshot::channel::<()>(); - - ping_txs.push(Some(tx)); - ping_rxs.push(Some(rx)); - - let (tx, rx) = oneshot::channel::<()>(); - - pong_txs.push(Some(tx)); - pong_rxs.push(Some(rx)); - } - - thread::spawn(move || { - future::lazy(|| { - for i in 0..MAX { - let ping_rx = ping_rxs[i].take().unwrap(); - let pong_tx = pong_txs[i].take().unwrap(); - - if spin(ping_rx).is_err() { - return Ok(()); - } - - pong_tx.send(()).unwrap(); - } - - Ok::<(), ()>(()) - }) - .wait() - .unwrap(); - }); - - future::lazy(|| { - let mut i = 0; - - b.iter(|| { - let ping_tx = ping_txs[i].take().unwrap(); - let pong_rx = pong_rxs[i].take().unwrap(); - - ping_tx.send(()).unwrap(); - spin(pong_rx).unwrap(); - - i += 1; - }); - - Ok::<(), ()>(()) |