diff options
author | Alex Crichton <alex@alexcrichton.com> | 2017-09-11 08:32:34 -0700 |
---|---|---|
committer | Alex Crichton <alex@alexcrichton.com> | 2017-09-11 08:33:09 -0700 |
commit | a611f6ec3043e63be340341cd97d17892ec6513c (patch) | |
tree | 3ec4be1d5e5822f18b8b85a282aa62c8b66f04c3 /examples/connect.rs | |
parent | 5e4cfdfab114e9c1913a98b40746727b4d2cf5a4 (diff) |
Add a UDP mode to the `connect` example
Diffstat (limited to 'examples/connect.rs')
-rw-r--r-- | examples/connect.rs | 244 |
1 files changed, 172 insertions, 72 deletions
diff --git a/examples/connect.rs b/examples/connect.rs index a167e006..315d2caf 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -1,12 +1,18 @@ -//! A simple example of hooking up stdin/stdout to a TCP stream. +//! An example of hooking up stdin/stdout to either a TCP or UDP stream. //! -//! This example will connect to a server specified in the argument list and -//! then forward all data read on stdin to the server, printing out all data -//! received on stdout. +//! This example will connect to a socket address specified in the argument list +//! and then forward all data read on stdin to the server, printing out all data +//! received on stdout. An optional `--udp` argument can be passed to specify +//! that the connection should be made over UDP instead of TCP, translating each +//! line entered on stdin to a UDP packet to be sent to the remote address. //! -//! Note that this is not currently optimized for performance, especially around -//! buffer management. Rather it's intended to show an example of working with a -//! client. +//! Note that this is not currently optimized for performance, especially +//! around buffer management. Rather it's intended to show an example of +//! working with a client. +//! +//! This example can be quite useful when interacting with the other examples in +//! this repository! Many of them recommend running this as a simple "hook up +//! stdin/stdout to a server" to get up and running. extern crate futures; extern crate tokio_core; @@ -18,17 +24,23 @@ use std::io::{self, Read, Write}; use std::net::SocketAddr; use std::thread; -use bytes::{BufMut, BytesMut}; use futures::sync::mpsc; use futures::{Sink, Future, Stream}; -use tokio_core::net::TcpStream; use tokio_core::reactor::Core; -use tokio_io::AsyncRead; -use tokio_io::codec::{Encoder, Decoder}; fn main() { + // Determine if we're going to run in TCP or UDP mode + let mut args = env::args().skip(1).collect::<Vec<_>>(); + let tcp = match args.iter().position(|a| a == "--udp") { + Some(i) => { + args.remove(i); + false + } + None => true, + }; + // Parse what address we're going to connect to - let addr = env::args().nth(1).unwrap_or_else(|| { + let addr = args.first().unwrap_or_else(|| { panic!("this program requires at least one argument") }); let addr = addr.parse::<SocketAddr>().unwrap(); @@ -36,82 +48,170 @@ fn main() { // Create the event loop and initiate the connection to the remote server let mut core = Core::new().unwrap(); let handle = core.handle(); - let tcp = TcpStream::connect(&addr, &handle); // Right now Tokio doesn't support a handle to stdin running on the event // loop, so we farm out that work to a separate thread. This thread will - // read data from stdin and then send it to the event loop over a standard - // futures channel. + // read data (with blocking I/O) from stdin and then send it to the event + // loop over a standard futures channel. let (stdin_tx, stdin_rx) = mpsc::channel(0); thread::spawn(|| read_stdin(stdin_tx)); let stdin_rx = stdin_rx.map_err(|_| panic!()); // errors not possible on rx - // After the TCP connection has been established, we set up our client to - // start forwarding data. - // - // First we use the `Io::framed` method with a simple implementation of a - // `Codec` (listed below) that just ships bytes around. We then split that - // in two to work with the stream and sink separately. - // - // Half of the work we're going to do is to take all data we receive on - // stdin (`stdin_rx`) and send that along the TCP stream (`sink`). The - // second half is to take all the data we receive (`stream`) and then write - // that to stdout. Currently we just write to stdout in a synchronous - // fashion. - // - // Finally we set the client to terminate once either half of this work - // finishes. If we don't have any more data to read or we won't receive any - // more work from the remote then we can exit. - let mut stdout = io::stdout(); - let client = tcp.and_then(|stream| { - let (sink, stream) = stream.framed(Bytes).split(); - let send_stdin = stdin_rx.forward(sink); - let write_stdout = stream.for_each(move |buf| { - stdout.write_all(&buf) - }); - - send_stdin.map(|_| ()) - .select(write_stdout.map(|_| ())) - .then(|_| Ok(())) - }); - - // And now that we've got our client, we execute it in the event loop! - core.run(client).unwrap(); + // Now that we've got our stdin read we either set up our TCP connection or + // our UDP connection to get a stream of bytes we're going to emit to + // stdout. + let stdout = if tcp { + tcp::connect(&addr, &handle, Box::new(stdin_rx)) + } else { + udp::connect(&addr, &handle, Box::new(stdin_rx)) + }; + + // And now with our stream of bytes to write to stdout, we execute that in + // the event loop! Note that this is doing blocking I/O to emit data to + // stdout, and in general it's a no-no to do that sort of work on the event + // loop. In this case, though, we know it's ok as the event loop isn't + // otherwise running anything useful. + let mut out = io::stdout(); + core.run(stdout.for_each(|chunk| { + out.write_all(&chunk) + })).unwrap(); } -/// A simple `Codec` implementation that just ships bytes around. -/// -/// This type is used for "framing" a TCP stream of bytes but it's really just a -/// convenient method for us to work with streams/sinks for now. This'll just -/// take any data read and interpret it as a "frame" and conversely just shove -/// data into the output location without looking at it. -struct Bytes; - -impl Decoder for Bytes { - type Item = BytesMut; - type Error = io::Error; - - fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> { - if buf.len() > 0 { - let len = buf.len(); - Ok(Some(buf.split_to(len))) - } else { - Ok(None) +mod tcp { + use std::io; + use std::net::SocketAddr; + + use bytes::{BufMut, BytesMut}; + use futures::{Future, Stream}; + use tokio_core::net::TcpStream; + use tokio_core::reactor::Handle; + use tokio_io::AsyncRead; + use tokio_io::codec::{Encoder, Decoder}; + + pub fn connect(addr: &SocketAddr, + handle: &Handle, + stdin: Box<Stream<Item = Vec<u8>, Error = io::Error>>) + -> Box<Stream<Item = BytesMut, Error = io::Error>> + { + let tcp = TcpStream::connect(addr, handle); + let handle = handle.clone(); + + // After the TCP connection has been established, we set up our client + // to start forwarding data. + // + // First we use the `Io::framed` method with a simple implementation of + // a `Codec` (listed below) that just ships bytes around. We then split + // that in two to work with the stream and sink separately. + // + // Half of the work we're going to do is to take all data we receive on + // `stdin` and send that along the TCP stream (`sink`). The second half + // is to take all the data we receive (`stream`) and then write that to + // stdout. We'll be passing this handle back out from this method. + // + // You'll also note that we *spawn* the work to read stdin and write it + // to the TCP stream. This is done to ensure that happens concurrently + // with us reading data from the stream. + Box::new(tcp.map(move |stream| { + let (sink, stream) = stream.framed(Bytes).split(); + handle.spawn(stdin.forward(sink).then(|_| Ok(()))); + stream + }).flatten_stream()) + } + + /// A simple `Codec` implementation that just ships bytes around. + /// + /// This type is used for "framing" a TCP stream of bytes but it's really + /// just a convenient method for us to work with streams/sinks for now. + /// This'll just take any data read and interpret it as a "frame" and + /// conversely just shove data into the output location without looking at + /// it. + struct Bytes; + + impl Decoder for Bytes { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> { + if buf.len() > 0 { + let len = buf.len(); + Ok(Some(buf.split_to(len))) + } else { + Ok(None) + } + } + + fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> { + self.decode(buf) } } - fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Option<BytesMut>> { - self.decode(buf) + impl Encoder for Bytes { + type Item = Vec<u8>; + type Error = io::Error; + + fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> { + buf.put(&data[..]); + Ok(()) + } } } -impl Encoder for Bytes { - type Item = Vec<u8>; - type Error = io::Error; +mod udp { + use std::io; + use std::net::SocketAddr; + + use bytes::BytesMut; + use futures::{Future, Stream}; + use tokio_core::net::{UdpCodec, UdpSocket}; + use tokio_core::reactor::Handle; + + pub fn connect(&addr: &SocketAddr, + handle: &Handle, + stdin: Box<Stream<Item = Vec<u8>, Error = io::Error>>) + -> Box<Stream<Item = BytesMut, Error = io::Error>> + { + // We'll bind our UDP socket to a local IP/port, but for now we + // basically let the OS pick both of those. + let udp = UdpSocket::bind(&"0.0.0.0:0".parse().unwrap(), handle) + .expect("failed to bind socket"); + + // Like above with TCP we use an instance of `UdpCodec` to transform + // this UDP socket into a framed sink/stream which operates over + // discrete values. In this case we're working with *pairs* of socket + // addresses and byte buffers. + let (sink, stream) = udp.framed(Bytes).split(); + + // All bytes from `stdin` will go to the `addr` specified in our + // argument list. Like with TCP this is spawned concurrently + handle.spawn(stdin.map(move |chunk| { + (addr, chunk) + }).forward(sink).then(|_| Ok(()))); + + // With UDP we could receive data from any source, so filter out + // anything coming from a different address + Box::new(stream.filter_map(move |(src, chunk)| { + if src == addr { + Some(chunk.into()) + } else { + None + } + })) + } + + struct Bytes; - fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> { - buf.put(&data[..]); - Ok(()) + impl UdpCodec for Bytes { + type In = (SocketAddr, Vec<u8>); + type Out = (SocketAddr, Vec<u8>); + + fn decode(&mut self, addr: &SocketAddr, buf: &[u8]) -> io::Result<Self::In> { + Ok((*addr, buf.to_vec())) + } + + fn encode(&mut self, (addr, buf): Self::Out, into: &mut Vec<u8>) -> SocketAddr { + into.extend(buf); + addr + } } } |