summaryrefslogtreecommitdiffstats
path: root/tokio/tests/runtime_current_thread.rs
blob: b4e35f393b13119a0d6eeba88aa05e84a70c0d7c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#![warn(rust_2018_idioms)]
#![feature(async_await)]
#![cfg(feature = "default")]

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::current_thread::Runtime;
use tokio::sync::oneshot;
use tokio_test::{assert_err, assert_ok};

use env_logger;
use std::sync::mpsc;
use std::time::{Duration, Instant};
use tokio::timer::delay;

async fn client_server(tx: mpsc::Sender<()>) {
    let addr = assert_ok!("127.0.0.1:0".parse());
    let mut server = assert_ok!(TcpListener::bind(&addr));

    // Get the assigned address
    let addr = assert_ok!(server.local_addr());

    // Spawn the server
    tokio::spawn(async move {
        // Accept a socket
        let (mut socket, _) = server.accept().await.unwrap();

        // Write some data
        socket.write_all(b"hello").await.unwrap();
    });

    let mut client = TcpStream::connect(&addr).await.unwrap();

    let mut buf = vec![];
    client.read_to_end(&mut buf).await.unwrap();

    assert_eq!(buf, b"hello");
    tx.send(()).unwrap();
}

#[test]
fn spawn_run_spawn_root() {
    let _ = env_logger::try_init();

    let mut rt = Runtime::new().unwrap();
    let (tx, rx) = mpsc::channel();

    let tx2 = tx.clone();
    rt.spawn(async move {
        delay(Instant::now() + Duration::from_millis(1000)).await;
        tx2.send(()).unwrap();
    });

    rt.spawn(client_server(tx));
    rt.run().unwrap();

    assert_ok!(rx.try_recv());
    assert_ok!(rx.try_recv());
}

#[test]
fn spawn_run_nested_spawn() {
    let _ = env_logger::try_init();

    let mut rt = Runtime::new().unwrap();
    let (tx, rx) = mpsc::channel();

    let tx2 = tx.clone();
    rt.spawn(async move {
        tokio::spawn(async move {
            delay(Instant::now() + Duration::from_millis(1000)).await;
            tx2.send(()).unwrap();
        });
    });

    rt.spawn(client_server(tx));
    rt.run().unwrap();

    assert_ok!(rx.try_recv());
    assert_ok!(rx.try_recv());
}

#[test]
fn block_on() {
    let _ = env_logger::try_init();

    let mut rt = Runtime::new().unwrap();
    let (tx, rx) = mpsc::channel();

    let tx2 = tx.clone();
    rt.spawn(async move {
        delay(Instant::now() + Duration::from_millis(1000)).await;
        tx2.send(()).unwrap();
    });

    rt.block_on(client_server(tx));

    assert_ok!(rx.try_recv());
    assert_err!(rx.try_recv());
}

#[test]
fn racy() {
    use std::sync::mpsc;
    use std::thread;

    let (trigger, exit) = oneshot::channel();
    let (handle_tx, handle_rx) = mpsc::channel();

    let jh = thread::spawn(move || {
        let mut rt = Runtime::new().unwrap();
        handle_tx.send(rt.handle()).unwrap();

        // don't exit until we are told to
        rt.block_on(async {
            exit.await.unwrap();
        });

        // run until all spawned futures (incl. the "exit" signal future) have completed.
        rt.run().unwrap();
    });

    let (tx, rx) = oneshot::channel();

    let handle = handle_rx.recv().unwrap();
    handle
        .spawn(async {
            tx.send(()).unwrap();
        })
        .unwrap();

    // signal runtime thread to exit
    trigger.send(()).unwrap();

    // wait for runtime thread to exit
    jh.join().unwrap();

    let mut e = tokio_executor::enter().unwrap();
    e.block_on(rx).unwrap();
}