summaryrefslogtreecommitdiffstats
path: root/examples/udp-codec.rs
blob: f899180a46ad12957dfb9b072512702ac57f5241 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
extern crate tokio_core;
extern crate env_logger;
extern crate futures;

#[macro_use]
extern crate log;

use std::io;
use std::net::{SocketAddr};
use futures::{future, Future, Stream, Sink};
use tokio_core::io::{CodecUdp};
use tokio_core::net::{UdpSocket};
use tokio_core::reactor::{Core, Timeout};
use std::time::Duration;
use std::str;

/// This is a basic example of leveraging `FramedUdp` to create
/// a simple UDP client and server which speak a custom Protocol. 
/// `FramedUdp` applies a `Codec` to the input and output of an
/// `Evented`

/// Simple Newline based parser, 
/// This is for a connectionless server, it must keep track
/// of the Socket address of the last peer to contact it
/// so that it can respond back. 
/// In the real world, one would probably
/// want an associative of remote peers to their state
pub struct LineCodec {
    addr : Option<SocketAddr>
}

impl CodecUdp for LineCodec {
    type In = Vec<u8>;
    type Out = Vec<u8>;

    fn decode(&mut self, addr : &SocketAddr, buf: &[u8]) -> Result<Option<Self::In>, io::Error> {
        trace!("decoding {} - {}", str::from_utf8(buf).unwrap(), addr);
        self.addr = Some(*addr);
        match buf.iter().position(|&b| b == b'\n') {
            Some(i) => Ok(Some(buf[.. i].into())),
            None => Ok(None),
        }
    }
    
    fn encode(&mut self, item: &Vec<u8>, into: &mut Vec<u8>) -> SocketAddr {
        trace!("encoding {}", str::from_utf8(item.as_slice()).unwrap());
        into.extend_from_slice(item.as_slice());
        into.push('\n' as u8);
        
        self.addr.unwrap()
    }
}

fn main() {
    drop(env_logger::init());

    let mut core = Core::new().unwrap();
    let handle = core.handle();

    //create the line codec parser for each 
    let srvcodec = LineCodec { addr : None };
    let clicodec = LineCodec { addr : None };

    let srvaddr : SocketAddr = "127.0.0.1:31999".parse().unwrap();
    let clientaddr : SocketAddr = "127.0.0.1:32000".parse().unwrap();

    //We bind each socket to a specific port
    let server = UdpSocket::bind(&srvaddr, &handle).unwrap();
    let client = UdpSocket::bind(&clientaddr, &handle).unwrap();

    //start things off by sending a ping from the client to the server
    //This doesn't utilize the codec to encode the message, but rather
    //it sends raw data directly to the remote peer with the send_dgram future
    let job = client.send_dgram(b"PING\n", srvaddr);
    let (client, _buf) = core.run(job).unwrap();

    //We create a FramedUdp instance, which associates a socket
    //with a codec. We then immediate split that into the 
    //receiving side `Stream` and the writing side `Sink`
    let (srvstream, srvsink) = server.framed(srvcodec).split();

    //`Stream::fold` runs once per every received datagram.
    //Note that we pass srvsink into fold, so that it can be
    //supplied to every iteration. The reason for this is
    //sink.send moves itself into `send` and then returns itself
    let srvloop = srvstream.fold(srvsink, move |sink, buf| { 
        println!("{}", str::from_utf8(buf.as_slice()).unwrap());
        sink.send(b"PONG".to_vec())
    }).map(|_| ());
    
    //We create another FramedUdp instance, this time for the client socket
    let (clistream, clisink) = client.framed(clicodec).split();

    //And another infinite iteration
    let cliloop = clistream.fold(clisink, move |sink, buf| { 
            println!("{}", str::from_utf8(buf.as_slice()).unwrap());
            sink.send(b"PING".to_vec())
        }).map(|_| ());

    let timeout = Timeout::new(Duration::from_millis(500), &handle).unwrap();

    //`select_all` takes an `Iterable` of `Future` and returns a future itself
    //This future waits until the first `Future` completes, it then returns
    //that result.
    let wait = future::select_all(vec![timeout.boxed(), srvloop.boxed(), cliloop.boxed()]);

    //Now we instruct `reactor::Core` to iterate, processing events until its future, `SelectAll`
    //has completed
    if let Err(e) = core.run(wait) {
        error!("{}", e.0);
    }
}