summaryrefslogtreecommitdiffstats
path: root/tokio-codec/tests/framed_write.rs
blob: 137fb5be13d3c8a82b71eee8eaf02717cefda0f4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
extern crate tokio_codec;
extern crate tokio_io;
extern crate bytes;
extern crate futures;

use tokio_io::AsyncWrite;
use tokio_codec::{Encoder, FramedWrite};

use futures::{Sink, Poll};
use bytes::{BytesMut, BufMut, BigEndian};

use std::io::{self, Write};
use std::collections::VecDeque;

macro_rules! mock {
    ($($x:expr,)*) => {{
        let mut v = VecDeque::new();
        v.extend(vec![$($x),*]);
        Mock { calls: v }
    }};
}

struct U32Encoder;

impl Encoder for U32Encoder {
    type Item = u32;
    type Error = io::Error;

    fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> {
        // Reserve space
        dst.reserve(4);
        dst.put_u32_be(item);
        Ok(())
    }
}

#[test]
fn write_multi_frame_in_packet() {
    let mock = mock! {
        Ok(b"\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02".to_vec()),
    };

    let mut framed = FramedWrite::new(mock, U32Encoder);
    assert!(framed.start_send(0).unwrap().is_ready());
    assert!(framed.start_send(1).unwrap().is_ready());
    assert!(framed.start_send(2).unwrap().is_ready());

    // Nothing written yet
    assert_eq!(1, framed.get_ref().calls.len());

    // Flush the writes
    assert!(framed.poll_complete().unwrap().is_ready());

    assert_eq!(0, framed.get_ref().calls.len());
}

#[test]
fn write_hits_backpressure() {
    const ITER: usize = 2 * 1024;

    let mut mock = mock! {
        // Block the `ITER`th write
        Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")),
        Ok(b"".to_vec()),
    };

    for i in 0..(ITER + 1) {
        let mut b = BytesMut::with_capacity(4);
        b.put_u32_be(i as u32);

        // Append to the end
        match mock.calls.back_mut().unwrap() {
            &mut Ok(ref mut data) => {
                // Write in 2kb chunks
                if data.len() < ITER {
                    data.extend_from_slice(&b[..]);
                    continue;
                }
            }
            _ => unreachable!(),
        }

        // Push a new new chunk
        mock.calls.push_back(Ok(b[..].to_vec()));
    }

    let mut framed = FramedWrite::new(mock, U32Encoder);

    for i in 0..ITER {
        assert!(framed.start_send(i as u32).unwrap().is_ready());
    }

    // This should reject
    assert!(!framed.start_send(ITER as u32).unwrap().is_ready());

    // This should succeed and start flushing the buffer.
    assert!(framed.start_send(ITER as u32).unwrap().is_ready());

    // Flush the rest of the buffer
    assert!(framed.poll_complete().unwrap().is_ready());

    // Ensure the mock is empty
    assert_eq!(0, framed.get_ref().calls.len());
}

// ===== Mock ======

struct Mock {
    calls: VecDeque<io::Result<Vec<u8>>>,
}

impl Write for Mock {
    fn write(&mut self, src: &[u8]) -> io::Result<usize> {
        match self.calls.pop_front() {
            Some(Ok(data)) => {
                assert!(src.len() >= data.len());
                assert_eq!(&data[..], &src[..data.len()]);
                Ok(data.len())
            }
            Some(Err(e)) => Err(e),
            None => panic!("unexpected write; {:?}", src),
        }
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}

impl AsyncWrite for Mock {
    fn shutdown(&mut self) -> Poll<(), io::Error> {
        Ok(().into())
    }
}