summaryrefslogtreecommitdiffstats
path: root/tokio-reactor/benches/basic.rs
blob: 63020d2f7a7653b6e150688e4aeb2f650eafc639 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#![feature(test)]
#![deny(warnings)]

extern crate futures;
extern crate mio;
extern crate num_cpus;
extern crate test;
extern crate tokio;
extern crate tokio_io_pool;
extern crate tokio_reactor;

const NUM_YIELD: usize = 500;
const TASKS_PER_CPU: usize = 100;

mod threadpool {
    use super::*;
    use std::sync::mpsc;

    use futures::{future, Async};
    use test::Bencher;
    use tokio::runtime::Runtime;
    use tokio_reactor::Registration;

    #[bench]
    fn notify_many(b: &mut Bencher) {
        let mut rt = Runtime::new().unwrap();
        let tasks = TASKS_PER_CPU * num_cpus::get();

        b.iter(|| {
            let (tx, rx) = mpsc::channel();

            rt.block_on::<_, (), ()>(future::lazy(move || {
                for _ in 0..tasks {
                    let tx = tx.clone();

                    tokio::spawn(future::lazy(move || {
                        let (r, s) = mio::Registration::new2();
                        let registration = Registration::new();
                        registration.register(&r).unwrap();

                        let mut rem = NUM_YIELD;
                        let mut r = Some(r);
                        let tx = tx.clone();

                        tokio::spawn(future::poll_fn(move || loop {
                            let is_ready = registration.poll_read_ready().unwrap().is_ready();

                            if is_ready {
                                rem -= 1;

                                if rem == 0 {
                                    r.take().unwrap();
                                    tx.send(()).unwrap();
                                    return Ok(Async::Ready(()));
                                }
                            } else {
                                s.set_readiness(mio::Ready::readable()).unwrap();
                                return Ok(Async::NotReady);
                            }
                        }));

                        Ok(())
                    }));
                }

                Ok(())
            }))
            .unwrap();

            for _ in 0..tasks {
                rx.recv().unwrap();
            }
        })
    }
}

mod io_pool {
    use super::*;
    use std::sync::mpsc;

    use futures::{future, Async};
    use test::Bencher;
    use tokio_io_pool::Runtime;
    use tokio_reactor::Registration;

    #[bench]
    fn notify_many(b: &mut Bencher) {
        let mut rt = Runtime::new();
        let tasks = TASKS_PER_CPU * num_cpus::get();

        b.iter(|| {
            let (tx, rx) = mpsc::channel();

            rt.block_on::<_, (), ()>(future::lazy(move || {
                for _ in 0..tasks {
                    let tx = tx.clone();

                    tokio::spawn(future::lazy(move || {
                        let (r, s) = mio::Registration::new2();
                        let registration = Registration::new();
                        registration.register(&r).unwrap();

                        let mut rem = NUM_YIELD;
                        let mut r = Some(r);
                        let tx = tx.clone();

                        tokio::spawn(future::poll_fn(move || loop {
                            let is_ready = registration.poll_read_ready().unwrap().is_ready();

                            if is_ready {
                                rem -= 1;

                                if rem == 0 {
                                    r.take().unwrap();
                                    tx.send(()).unwrap();
                                    return Ok(Async::Ready(()));
                                }
                            } else {
                                s.set_readiness(mio::Ready::readable()).unwrap();
                                return Ok(Async::NotReady);
                            }
                        }));

                        Ok(())
                    }));
                }

                Ok(())
            }))
            .unwrap();

            for _ in 0..tasks {
                rx.recv().unwrap();
            }
        })
    }
}