1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
//! 单生产者单消费者并发队列
//!
//! 此模块包含 SPSC 队列的实现,该队列可在两个线程之间并发使用。
//! 该数据结构体是安全使用的,并具有一个推送器和一个弹出器的语义。
//!

// https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue

#[cfg(all(test, not(target_os = "emscripten")))]
mod tests;

use core::cell::UnsafeCell;
use core::ptr;

use crate::boxed::Box;
use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};

use super::cache_aligned::CacheAligned;

// 链表中要发送的消息队列中的节点
struct Node<T> {
    // FIXME: 如果我们足够小心的话,这可能是未初始化的 T,这将减少内存使用量 (并且速度更快)。
    //
    //      这值得么?
    value: Option<T>,         // 可为空,以重新使用节点
    cached: bool,             // 该节点进入节点缓存
    next: AtomicPtr<Node<T>>, // 队列中的下一个节点
}

/// 单生产者单消费者队列。
/// 该结构体不可克隆,但是如果可以确保在任何时间点只有一个弹出器和一个推动器触摸队列,则可以在 Arc 中安全地共享该结构体。
///
///
pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> {
    // consumer 字段
    consumer: CacheAligned<Consumer<T, ConsumerAddition>>,

    // producer 字段
    producer: CacheAligned<Producer<T, ProducerAddition>>,
}

struct Consumer<T, Addition> {
    tail: UnsafeCell<*mut Node<T>>, // 从哪里弹出
    tail_prev: AtomicPtr<Node<T>>,  // 从哪里弹出
    cache_bound: usize,             // 最大缓存大小
    cached_nodes: AtomicUsize,      // 标记为可缓存的节点数
    addition: Addition,
}

struct Producer<T, Addition> {
    head: UnsafeCell<*mut Node<T>>,      // push 到哪里
    first: UnsafeCell<*mut Node<T>>,     // 从哪里获得新节点
    tail_copy: UnsafeCell<*mut Node<T>>, // 在 first/tail 之间
    addition: Addition,
}

unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> {}

unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> {}

impl<T> Node<T> {
    fn new() -> *mut Node<T> {
        Box::into_raw(box Node {
            value: None,
            cached: false,
            next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
        })
    }
}

impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
    /// 创建一个新队列。在队列的生产者和消费者部分中具有给定的其他元素。
    ///
    /// 由于缓存争用对性能的影响,我们希望将主要由生产者使用的字段与使用者所使用的字段保持在单独的缓存行上。
    /// 由于缓存行通常为 64 个字节,因此为小字段分配一个缓存区会不合理地昂贵,因此我们允许用户将其他字段插入为此已分配给生产者和使用者的缓存行中。
    ///
    ///
    /// 这是不安全的,因为类型系统不会强制执行单个消费者与生产者的关系。由于所有方法都具有非可变接收者,因此它还允许使用者在有 `peek` 处于活动状态时使用 `pop` 项。
    ///
    /// # Arguments
    ///
    ///   * `bound` - 该队列实现是通过链表实现的,这意味着推送始终是 malloc。
    ///   为了摊销此成本,维护了节点的内部缓存,以防止始终需要 malloc。
    ///   此限制是对高速缓存大小的限制 (如果需要)。
    ///   如果值为 0,则高速缓存没有边界。
    ///   否则,缓存将永远不会超过 `bound` (尽管队列本身可能会更大)。
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    ///
    pub unsafe fn with_additions(
        bound: usize,
        producer_addition: ProducerAddition,
        consumer_addition: ConsumerAddition,
    ) -> Self {
        let n1 = Node::new();
        let n2 = Node::new();
        (*n1).next.store(n2, Ordering::Relaxed);
        Queue {
            consumer: CacheAligned::new(Consumer {
                tail: UnsafeCell::new(n2),
                tail_prev: AtomicPtr::new(n1),
                cache_bound: bound,
                cached_nodes: AtomicUsize::new(0),
                addition: consumer_addition,
            }),
            producer: CacheAligned::new(Producer {
                head: UnsafeCell::new(n2),
                first: UnsafeCell::new(n1),
                tail_copy: UnsafeCell::new(n1),
                addition: producer_addition,
            }),
        }
    }

    /// 将新值推送到此队列。
    /// 请注意,为了安全地使用此函数,必须从外部保证只有一个按钮。
    pub fn push(&self, t: T) {
        unsafe {
            // 获取一个节点 (使用缓存的节点或分配一个新的节点),然后将其附加到 'head' 节点。
            //
            let n = self.alloc();
            assert!((*n).value.is_none());
            (*n).value = Some(t);
            (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
            (**self.producer.head.get()).next.store(n, Ordering::Release);
            *(&self.producer.head).get() = n;
        }
    }

    unsafe fn alloc(&self) -> *mut Node<T> {
        // 首先尝试看看我们是否可以使用 'first' 节点供我们使用。
        if *self.producer.first.get() != *self.producer.tail_copy.get() {
            let ret = *self.producer.first.get();
            *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
            return ret;
        }
        // 如果以上方法均失败,请更新我们的尾部副本,然后重试。
        //
        *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire);
        if *self.producer.first.get() != *self.producer.tail_copy.get() {
            let ret = *self.producer.first.get();
            *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
            return ret;
        }
        // 如果所有操作均失败,那么我们必须分配一个新节点 (节点缓存中没有任何内容)。
        //
        Node::new()
    }

    /// 尝试从此队列中弹出一个值。
    /// 请记住,要安全使用此类型,必须确保一次仅弹出一个弹出窗口。
    pub fn pop(&self) -> Option<T> {
        unsafe {
            // `tail` 节点实际上不是一个使用过的节点,而是一个应该从其开始弹出的标记。
            // 因此,请查看 tail 的下一个字段,看看是否可以使用它。
            // 如果我们弹出,则当前尾节点是进入缓存的候选对象。
            //
            let tail = *self.consumer.tail.get();
            let next = (*tail).next.load(Ordering::Acquire);
            if next.is_null() {
                return None;
            }
            assert!((*next).value.is_some());
            let ret = (*next).value.take();

            *self.consumer.0.tail.get() = next;
            if self.consumer.cache_bound == 0 {
                self.consumer.tail_prev.store(tail, Ordering::Release);
            } else {
                let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
                if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
                    self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
                    (*tail).cached = true;
                }

                if (*tail).cached {
                    self.consumer.tail_prev.store(tail, Ordering::Release);
                } else {
                    (*self.consumer.tail_prev.load(Ordering::Relaxed))
                        .next
                        .store(next, Ordering::Relaxed);
                    // 我们已经成功擦除了所有对 'tail' 的引用,因此现在可以放心地丢弃它了。
                    //
                    let _: Box<Node<T>> = Box::from_raw(tail);
                }
            }
            ret
        }
    }

    /// 尝试偷看队列的开头,如果队列当前没有数据,则返回 `None`
    ///
    /// # Warning
    /// 如果在使用者将值从队列中弹出之前未使用返回的 quot,则该引用无效。
    /// 如果生产者然后将另一个值压入队列,它将覆盖引用所指向的值。
    ///
    ///
    pub fn peek(&self) -> Option<&mut T> {
        // 这与上面的基本相同,所有的弹出位都被去除了。
        //
        unsafe {
            let tail = *self.consumer.tail.get();
            let next = (*tail).next.load(Ordering::Acquire);
            if next.is_null() { None } else { (*next).value.as_mut() }
        }
    }

    pub fn producer_addition(&self) -> &ProducerAddition {
        &self.producer.addition
    }

    pub fn consumer_addition(&self) -> &ConsumerAddition {
        &self.consumer.addition
    }
}

impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
    fn drop(&mut self) {
        unsafe {
            let mut cur = *self.producer.first.get();
            while !cur.is_null() {
                let next = (*cur).next.load(Ordering::Relaxed);
                let _n: Box<Node<T>> = Box::from_raw(cur);
                cur = next;
            }
        }
    }
}