diff options
author | Alex Crichton <alex@alexcrichton.com> | 2016-08-04 16:04:24 -0700 |
---|---|---|
committer | Alex Crichton <alex@alexcrichton.com> | 2016-08-04 20:34:54 -0700 |
commit | e7f4313cf40f52907806bfab2ca35b8c30d6095f (patch) | |
tree | cbb853388e703ae3543210393467ed3dfca0903e /src | |
parent | 04bd33e3901b5bf994b043f194aacdb8433345bb (diff) |
Implementing LoopData
This type acts for a handle to storage of non-`Send` data. The handle itself is
sendable across threads and is therefore suitable for storage in a `Future`.
This data uses communication internally and a new method on `Task` to ensure
that when the data needs to be accessed the future will find its way to the
right thread.
More on this type coming soon!
Diffstat (limited to 'src')
-rw-r--r-- | src/channel.rs | 118 | ||||
-rw-r--r-- | src/event_loop.rs | 341 | ||||
-rw-r--r-- | src/lib.rs | 5 | ||||
-rw-r--r-- | src/mpsc_queue.rs | 148 |
4 files changed, 596 insertions, 16 deletions
diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 00000000..503baef5 --- /dev/null +++ b/src/channel.rs @@ -0,0 +1,118 @@ +//! A thin wrapper around a mpsc queue and mio-based channel information +//! +//! Normally the standard library's channels would suffice but we unfortunately +//! need the `Sender<T>` half to be `Sync`, so to accomplish this for now we +//! just vendor the same mpsc queue as the one in the standard library and then +//! we pair that with the `mio::channel` module's Ctl pairs to control the +//! readiness notifications on the channel. + +use std::cell::Cell; +use std::io; +use std::marker; +use std::sync::Arc; + +use mio; +use mio::channel::{ctl_pair, SenderCtl, ReceiverCtl}; + +use mpsc_queue::{Queue, PopResult}; + +pub struct Sender<T> { + ctl: SenderCtl, + inner: Arc<Queue<T>>, +} + +pub struct Receiver<T> { + ctl: ReceiverCtl, + inner: Arc<Queue<T>>, + _marker: marker::PhantomData<Cell<()>>, // this type is not Sync +} + +pub fn channel<T>() -> (Sender<T>, Receiver<T>) { + let inner = Arc::new(Queue::new()); + let (tx, rx) = ctl_pair(); + + let tx = Sender { + ctl: tx, + inner: inner.clone(), + }; + let rx = Receiver { + ctl: rx, + inner: inner.clone(), + _marker: marker::PhantomData, + }; + (tx, rx) +} + +impl<T> Sender<T> { + pub fn send(&self, data: T) -> io::Result<()> { + self.inner.push(data); + self.ctl.inc() + } +} + +impl<T> Receiver<T> { + pub fn recv(&self) -> io::Result<Option<T>> { + // Note that the underlying method is `unsafe` because it's only safe + // if one thread accesses it at a time. + // + // We, however, are the only thread with a `Receiver<T>` because this + // type is not `Sync`. and we never handed out another instance. + match unsafe { self.inner.pop() } { + PopResult::Data(t) => { + try!(self.ctl.dec()); + Ok(Some(t)) + } + + // If the queue is either in an inconsistent or empty state, then + // we return `None` for both instances. Note that the standard + // library performs a yield loop in the event of `Inconsistent`, + // which means that there's data in the queue but a sender hasn't + // finished their operation yet. + // + // We do this because the queue will continue to be readable as + // the thread performing the push will eventually call `inc`, so + // if we return `None` and the event loop just loops aruond calling + // this method then we'll eventually get back to the same spot + // and due the retry. + // + // Basically, the inconsistent state doesn't mean we need to busy + // wait, but instead we can forge ahead and assume by the time we + // go to the kernel and come back we'll no longer be in an + // inconsistent state. + PopResult::Empty | + PopResult::Inconsistent => Ok(None), + } + } +} + +// Just delegate everything to `self.ctl` +impl<T> mio::Evented for Receiver<T> { + fn register(&self, + poll: &mio::Poll, + token: mio::Token, + interest: mio::EventSet, + opts: mio::PollOpt) -> io::Result<()> { + self.ctl.register(poll, token, interest, opts) + } + + fn reregister(&self, + poll: &mio::Poll, + token: mio::Token, + interest: mio::EventSet, + opts: mio::PollOpt) -> io::Result<()> { + self.ctl.reregister(poll, token, interest, opts) + } + + fn deregister(&self, poll: &mio::Poll) -> io::Result<()> { + self.ctl.deregister(poll) + } +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Sender<T> { + Sender { + ctl: self.ctl.clone(), + inner: self.inner.clone(), + } + } +} diff --git a/src/event_loop.rs b/src/event_loop.rs index b41ebc47..42ca012f 100644 --- a/src/event_loop.rs +++ b/src/event_loop.rs @@ -1,5 +1,7 @@ +use std::any::Any; use std::cell::{Cell, RefCell}; use std::io::{self, ErrorKind}; +use std::marker; use std::mem; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; @@ -7,11 +9,13 @@ use std::sync::mpsc; use std::time::{Instant, Duration}; use futures::{Future, Task, TaskHandle, Poll}; +use futures::executor::{ExecuteCallback, Executor}; use futures_io::Ready; -use mio::channel::SendError; use mio; use slab::Slab; +use channel::{Sender, Receiver, channel}; +use event_loop::dropbox::DropBox; use slot::{self, Slot}; use timer_wheel::{TimerWheel, Timeout}; @@ -31,8 +35,8 @@ pub struct Loop { id: usize, active: Cell<bool>, io: mio::Poll, - tx: mio::channel::Sender<Message>, - rx: mio::channel::Receiver<Message>, + tx: Arc<MioSender>, + rx: Receiver<Message>, dispatch: RefCell<Slab<Scheduled, usize>>, // Timer wheel keeping track of all timeouts. The `usize` stored in the @@ -45,6 +49,10 @@ pub struct Loop { timeouts: RefCell<Slab<(Timeout, TimeoutState), usize>>, } +struct MioSender { + inner: Sender<Message>, +} + /// Handle to an event loop, used to construct I/O objects, send messages, and /// otherwise interact indirectly with the event loop itself. /// @@ -53,7 +61,7 @@ pub struct Loop { #[derive(Clone)] pub struct LoopHandle { id: usize, - tx: mio::channel::Sender<Message>, + tx: Arc<MioSender>, } struct Scheduled { @@ -75,6 +83,8 @@ enum Message { AddTimeout(Instant, Arc<Slot<io::Result<TimeoutToken>>>), UpdateTimeout(TimeoutToken, TaskHandle), CancelTimeout(TimeoutToken), + Run(Box<ExecuteCallback>), + Drop(DropBox<Any>), Shutdown, } @@ -103,7 +113,7 @@ impl Loop { /// Creates a new event loop, returning any error that happened during the /// creation. pub fn new() -> io::Result<Loop> { - let (tx, rx) = mio::channel::from_std_channel(mpsc::channel()); + let (tx, rx) = channel(); let io = try!(mio::Poll::new()); try!(io.register(&rx, mio::Token(0), @@ -113,7 +123,7 @@ impl Loop { id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed), active: Cell::new(true), io: io, - tx: tx, + tx: Arc::new(MioSender { inner: tx }), rx: rx, dispatch: RefCell::new(Slab::new_starting_at(1, SLAB_CAPACITY)), timeouts: RefCell::new(Slab::new_starting_at(0, SLAB_CAPACITY)), @@ -314,7 +324,8 @@ impl Loop { } fn consume_queue(&self) { - while let Ok(msg) = self.rx.try_recv() { + // TODO: can we do better than `.unwrap()` here? + while let Some(msg) = self.rx.recv().unwrap() { self.notify(msg); } } @@ -337,6 +348,8 @@ impl Loop { } Message::UpdateTimeout(t, handle) => self.update_timeout(&t, handle), Message::CancelTimeout(t) => self.cancel_timeout(&t), + Message::Run(f) => f.call(), + Message::Drop(data) => drop(data), } } } @@ -352,21 +365,15 @@ impl LoopHandle { lp.notify(msg); } None => { - match self.tx.send(msg) { + match self.tx.inner.send(msg) { Ok(()) => {} // This should only happen when there was an error // writing to the pipe to wake up the event loop, // hopefully that never happens - Err(SendError::Io(e)) => { + Err(e) => { panic!("error sending message to event loop: {}", e) } - - // If we're still sending a message to the event loop - // after it's closed, then that's bad! - Err(SendError::Disconnected(_)) => { - panic!("event loop is no longer available") - } } } } @@ -511,6 +518,38 @@ impl LoopHandle { self.send(Message::CancelTimeout(timeout)) } + /// Schedules a closure to add some data to event loop thread itself. + /// + /// This function is useful for when storing non-`Send` data inside of a + /// future. This returns a future which will resolve to a `LoopData<A>` + /// handle, which is itself `Send + 'static` regardless of the underlying + /// `A`. That is, for example, you can create a handle to some data that + /// contains an `Rc`, for example. + /// + /// This function takes a closure which may be sent to the event loop to + /// generate an instance of type `A`. The closure itself is required to be + /// `Send + 'static`, but the data it produces is only required to adhere to + /// `Any`. + /// + /// If the returned future is polled on the event loop thread itself it will + /// very cheaply resolve to a handle to the data, but if it's not polled on + /// the event loop then it will send a message to the event loop to run the + /// closure `f`, generate a handle, and then the future will yield it back. + // TODO: more with examples + pub fn add_loop_data<F, A>(&self, f: F) -> AddLoopData<F, A> + where F: FnOnce() -> A + Send + 'static, + A: Any, + { + AddLoopData { + _marker: marker::PhantomData, + inner: LoopFuture { + loop_handle: self.clone(), + data: Some(f), + result: None, + }, + } + } + /// Send a message to the associated event loop that it should shut down, or /// otherwise break out of its current loop of iteration. /// @@ -574,6 +613,271 @@ impl Future for AddTimeout { } } +/// A handle to data that is owned by an event loop thread, and is only +/// accessible on that thread itself. +/// +/// This structure is created by the `LoopHandle::add_loop_data` method which +/// will return a future resolving to one of these references. A `LoopData<A>` +/// handle is `Send` regardless of what `A` is, but the internal data can only +/// be accessed on the event loop thread itself. +/// +/// Internally this reference also stores a handle to the event loop that the +/// data originated on, so it knows how to go back to the event loop to access +/// the data itself. +// TODO: write more once it's implemented +pub struct LoopData<A: Any> { + data: DropBox<A>, + handle: LoopHandle, +} + +/// Future returned from the `LoopHandle::add_loop_data` method. +/// +/// This future will resolve to a `LoopData<A>` reference when completed, which +/// represents a handle to data that is "owned" by the event loop thread but can +/// migrate among threads temporarily so travel with a future itself. +pub struct AddLoopData<F, A> { + inner: LoopFuture<DropBox<Any>, F>, + _marker: marker::PhantomData<fn() -> A>, +} + +fn _assert() { + fn _assert_send<T: Send>() {} + _assert_send::<LoopData<()>>(); +} + +impl<F, A> Future for AddLoopData<F, A> + where F: FnOnce() -> A + Send + 'static, + A: Any, +{ + type Item = LoopData<A>; + type Error = io::Error; + + fn poll(&mut self, _task: &mut Task) -> Poll<LoopData<A>, io::Error> { + let ret = self.inner.poll(|_lp, f| { + Ok(DropBox::new(f())) + }); + + ret.map(|mut data| { + match data.downcast::<A>() { + Some(data) => { + LoopData { + data: data, + handle: self.inner.loop_handle.clone(), + } + } + None => panic!("data mixed up?"), + } + }) + } + + fn schedule(&mut self, task: &mut Task) { + self.inner.schedule(task, |f, slot| { + Message::Run(Box::new(move || { + slot.try_produce(Ok(DropBox::new(f()))).ok() + .expect("add loop data try_produce intereference"); + })) + }) + } +} + +impl<A: Any> LoopData<A> { + /// Gets a shared reference to the underlying data in this handle. + /// + /// Returns `None` if it is not called from the event loop thread that this + /// `LoopData<A>` is associated with, or `Some` with a reference to the data + /// if we are indeed on the event loop thread. + pub fn get(&self) -> Option<&A> { + self.data.get() + } + + /// Gets a mutable reference to the underlying data in this handle. + /// + /// Returns `None` if it is not called from the event loop thread that this + /// `LoopData<A>` is associated with, or `Some` with a reference to the data + /// if we are indeed on the event loop thread. + pub fn get_mut(&mut self) -> Option<&mut A> { + self.data.get_mut() + } + + /// Acquire the executor associated with the thread that owns this + /// `LoopData<A>`'s data. + /// + /// If the `get` and `get_mut` functions above return `None`, then this data + /// is being polled on the wrong thread to access the data, and to make + /// progress a future may need to migrate to the actual thread which owns + /// the relevant data. + /// + /// This executor can in turn be passed to `Task::poll_on`, which will then + /// move the entire future to be polled on the right thread. + pub fn executor(&self) -> Arc<Executor> { + self.handle.tx.clone() + } +} + +impl<A: Any> Drop for LoopData<A> { + fn drop(&mut self) { + // The `DropBox` we store internally will cause a memory leak if it's + // dropped on the wrong thread. While necessary for safety, we don't + // actually want a memory leak, so for all normal circumstances we take + // out the `DropBox<A>` as a `DropBox<Any>` and then we send it off to + // the event loop. + // + // TODO: possible optimization is to do none of this if we're on the + // event loop thread itself + if let Some(data) = self.data.take_any() { + self.handle.send(Message::Drop(data)); + } + } +} + +/// A curious inner module with one `unsafe` keyword, yet quite an important +/// one! +/// +/// The purpose of this module is to define a type, `DropBox<A>`, which is able +/// to be sent across thread event when the underlying data `A` is itself not +/// sendable across threads. This is then in turn used to build up the +/// `LoopData` abstraction above. +/// +/// A `DropBox` currently contains two major components, an identification of +/// the thread that it originated from as well as the data itself. Right now the +/// data is stored in a `Box` as we'll transition between it and `Box<Any>`, but +/// this is perhaps optimizable. +/// +/// The `DropBox<A>` itself only provides a few safe methods, all of which are +/// safe to call from any thread. Access to the underlying data is only granted +/// if we're on the right thread, and otherwise the methods don't access the +/// data itself. +/// +/// Finally, one crucial piece, if the data is dropped it may run code that +/// assumes it's on the original thread. For this reason we have to be sure that +/// the data is only dropped on the originating thread itself. It's currently +/// the job of the outer `LoopData` to ensure that a `DropBox` is dropped on the +/// right thread, so we don't attempt to perform any communication in this +/// `Drop` implementation. Instead, if a `DropBox` is dropped on the wrong +/// thread, it simply leaks its contents. +/// +/// All that's really just a lot of words in an attempt to justify the `unsafe` +/// impl of `Send` below. The idea is that the data is only ever accessed on the +/// originating thread, even during `Drop`. +/// +/// Note that this is a private module to have a visibility boundary around the +/// unsafe internals. Although there's not any unsafe blocks here, the code +/// itself is quite unsafe as it has to make sure that the data is dropped in +/// the right place, if ever. +mod dropbox { + use std::any::Any; + use std::mem; + use super::CURRENT_LOOP; + + pub struct DropBox<A: ?Sized> { + id: usize, + inner: Option<Box<A>>, + } + + unsafe impl<A: ?Sized> Send for DropBox<A> {} + + impl DropBox<Any> { + /// Creates a new `DropBox` pinned to the current threads. + /// + /// Will panic if `CURRENT_LOOP` isn't set. + pub fn new<A: Any>(a: A) -> DropBox<Any> { + DropBox { + id: CURRENT_LOOP.with(|lp| lp.id), + inner: Some(Box::new(a) as Box<Any>), + } + } + + /// Downcasts this `DropBox` to the type specified. + /// + /// Normally this always succeeds as it's a static assertion that we + /// already have all the types matched up, but an `Option` is returned + /// here regardless. + pub fn downcast<A: Any>(&mut self) -> Option<DropBox<A>> { + self.inner.take().and_then(|data| { + match data.downcast::<A>() { + Ok(a) => Some(DropBox { id: self.id, inner: Some(a) }), + + // Note that we're careful that when a downcast fails we put + // the data back into ourselves, because we may be + // downcasting on any thread. This will ensure that if we + // drop accidentally we'll forget the data correctly. + Err(obj) => { + self.inner = Some(obj); + None + } + } + }) + } + } + + impl<A: Any> DropBox<A> { + /// Consumes the contents of this `DropBox<A>`, returning a new + /// `DropBox<Any>`. + /// + /// This is just intended to be a simple and cheap conversion, should + /// almost always return `Some`. + pub fn take_any(&mut self) -> Option<DropBox<Any>> { + self.inner.take().map(|d| { + DropBox { id: self.id, inner: Some(d as Box<Any>) } + }) + } + } + + impl<A: ?Sized> DropBox<A> { + /// Returns a shared reference to the data if we're on the right + /// thread. + pub fn get(&self) -> Option<&A> { + if CURRENT_LOOP.is_set() { + CURRENT_LOOP.with(|lp| { + if lp.id == self.id { + self.inner.as_ref().map(|b| &**b) + } else { + None + } + }) + } else { + None + } + } + + /// Returns a mutable reference to the data if we're on the right + /// thread. + pub fn get_mut(&mut self) -> Option<&mut A> { + if CURRENT_LOOP.is_set() { + CURRENT_LOOP.with(move |lp| { + if lp.id == self.id { + self.inner.as_mut().map(|b| &mut **b) + } else { + None + } + }) + } else { + None + } + } + } + + impl<A: ?Sized> Drop for DropBox<A> { + fn drop(&mut self) { + // Try our safe accessor first, and if it works then we know that + // we're on the right thread. In that case we can simply drop as + // usual. + if let Some(a) = self.get_mut().take() { + return drop(a) + } + + // If we're on the wrong thread but we actually have some data, then + // something in theory horrible has gone awry. Prevent memory safety + // issues by forgetting the data and then also warn about this odd + // event. + if let Some(data) = self.inner.take() { + mem::forget(data); + warn!("forgetting some data on an event loop"); + } + } + } +} + struct LoopFuture<T, U> { loop_handle: LoopHandle, data: Option<U>, @@ -671,3 +975,10 @@ impl<E: ?Sized> Source<E> { &self.io } } + +impl Executor for MioSender { + fn execute_boxed(&self, callback: Box<ExecuteCallback>) { + self.inner.send(Message::Run(callback)) + .expect("error sending a message to the event loop") + } +} @@ -29,8 +29,11 @@ pub mod timer_wheel; mod slot; #[path = "../../src/lock.rs"] mod lock; +mod mpsc_queue; +mod channel; -pub use event_loop::{Loop, LoopHandle}; +pub use event_loop::{Loop, LoopHandle, AddSource, AddTimeout}; +pub use event_loop::{LoopData, AddLoopData, TimeoutToken}; pub use readiness_stream::ReadinessStream; pub use tcp::{TcpListener, TcpStream}; pub use timeout::Timeout; diff --git a/src/mpsc_queue.rs b/src/mpsc_queue.rs new file mode 100644 index 00000000..7893bbb7 --- /dev/null +++ b/src/mpsc_queue.rs @@ -0,0 +1,148 @@ +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT + * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE + * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF + * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +//! A mostly lock-free multi-producer, single consumer queue. +//! +//! This module contains an implementation of a concurrent MPSC queue. This +//! queue can be used to share data between threads, and is also used as the +//! building block of channels in rust. +//! +//! Note that the current implementation of this queue has a caveat of the `pop` +//! method, and see the method for more information about it. Due to this +//! caveat, this queue may not be appropriate for all use-cases. + +// http://www.1024cores.net/home/lock-free-algorithms +// /queues/non-intrusive-mpsc-node-based-queue + +// NOTE: this implementation is lifted from the standard library and only +// slightly modified + +pub use self::PopResult::*; + +use std::cell::UnsafeCell; +use std::ptr; +use std::sync::atomic::{AtomicPtr, Ordering}; + +/// A result of the `pop` function. +pub enum PopResult<T> { + /// Some data has been popped + Data(T), + /// The queue is empty + Empty, + /// The queue is in an inconsistent state. Popping data should succeed, but + /// some pushers have yet to make enough progress in order allow a pop to + /// succeed. It is recommended that a pop() occur "in the near future" in + /// order to see if the sender has made progress or not + Inconsistent, +} + +struct Node<T> { + next: AtomicPtr<Node<T>>, + value: Option<T>, +} + +/// The multi-producer single-consumer structure. This is not cloneable, but it +/// may be safely shared so long as it is guaranteed that there is only one +/// popper at a time (many pushers are allowed). +pub struct Queue<T> { + head: AtomicPtr<Node<T>>, + tail: UnsafeCell<*mut Node<T>>, +} + +unsafe impl<T: Send> Send for Queue<T> { } +unsafe impl<T: Send> Sync for Queue<T> { } + +impl<T> Node<T> { + unsafe fn new(v: Option<T>) -> *mut Node<T> { + Box::into_raw(Box::new(Node { + next: AtomicPtr::new(ptr::null_mut()), + value: v, + })) + } +} + +impl<T> Queue<T> { + /// Creates a new queue that is safe to share among multiple producers and + /// one consumer. + pub fn new() -> Queue<T> { + let stub = unsafe { Node::new(None) }; + Queue { + head: AtomicPtr::new(stub), + tail: UnsafeCell::new(stub), + } + } + + /// Pushes a new value onto this queue. + pub fn push(&self, t: T) { + unsafe { + let n = Node::new(Some(t)); + let prev = self.head.swap(n, Ordering::AcqRel); + (*prev).next.store(n, Ordering::Release); + } + } + + /// Pops some data from this queue. + /// + /// Note that the current implementation means that this function cannot + /// return `Option<T>`. It is possible for this queue to be in an + /// inconsistent state where many pushes have succeeded and completely + /// finished, but pops cannot return `Some(t)`. This inconsistent state + /// happens when a pusher is pre-empted at an inopportune moment. + /// + /// This inconsistent state means that this queue does indeed have data, but + /// it does not currently have access to it at this time. + /// + /// This function is unsafe because only one thread can call it at a time. + pub unsafe fn pop(&self) -> PopResult<T> { + let tail = *self.tail.get(); + let next = (*tail).next.load(Ordering::Acquire); + + if !next.is_null() { + *self.tail.get() = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take().unwrap(); + drop(Box::from_raw(tail)); + return Data(ret); + } + + if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent} + } +} + +impl<T> Drop for Queue<T> { + fn drop(&mut self) { + unsafe { + let mut cur = *self.tail.get(); + while !cur.is_null() { + let next = (*cur).next.load(Ordering::Relaxed); + drop(Box::from_raw(cur)); + cur = next; + } + } + } +} |