summaryrefslogtreecommitdiffstats
path: root/src/indexer/delete_queue.rs
blob: be36bef7afa93615ace4dfea6cb7fae27b5a8898 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
use super::operation::DeleteOperation;
use std::mem;
use std::ops::DerefMut;
use std::sync::{Arc, RwLock};

// The DeleteQueue is similar in conceptually to a multiple
// consumer single producer broadcast channel.
//
// All consumer will receive all messages.
//
// Consumer of the delete queue are holding a `DeleteCursor`,
// which points to a specific place of the `DeleteQueue`.
//
// New consumer can be created in two ways
// - calling `delete_queue.cursor()` returns a cursor, that
//   will include all future delete operation (and no past operations).
// - cloning an existing cursor returns a new cursor, that
//   is at the exact same position, and can now advance independently
//   from the original cursor.
#[derive(Default)]
struct InnerDeleteQueue {
    writer: Vec<DeleteOperation>,
    last_block: Option<Arc<Block>>,
}

#[derive(Clone, Default)]
pub struct DeleteQueue {
    inner: Arc<RwLock<InnerDeleteQueue>>,
}

impl DeleteQueue {
    // Creates a new delete queue.
    pub fn new() -> DeleteQueue {
        let delete_queue = DeleteQueue {
            inner: Arc::default(),
        };

        let next_block = NextBlock::from(delete_queue.clone());
        {
            let mut delete_queue_wlock = delete_queue.inner.write().unwrap();
            delete_queue_wlock.last_block = Some(Arc::new(Block {
                operations: Arc::default(),
                next: next_block,
            }));
        }

        delete_queue
    }

    // Creates a new cursor that makes it possible to
    // consume future delete operations.
    //
    // Past delete operations are not accessible.
    pub fn cursor(&self) -> DeleteCursor {
        let last_block = self
            .inner
            .read()
            .expect("Read lock poisoned when opening delete queue cursor")
            .last_block
            .clone()
            .expect(
                "Failed to unwrap last_block. This should never happen
                as the Option<> is only here to make
                initialization possible",
            );
        let operations_len = last_block.operations.len();
        DeleteCursor {
            block: last_block,
            pos: operations_len,
        }
    }

    // Appends a new delete operations.
    pub fn push(&self, delete_operation: DeleteOperation) {
        self.inner
            .write()
            .expect("Failed to acquire write lock on delete queue writer")
            .writer
            .push(delete_operation);
    }

    // DeleteQueue is a linked list of blocks of
    // delete operations.
    //
    // Writing happens by simply appending to a vec.
    // `.flush()` takes this pending delete operations vec
    // creates a new read-only block from it,
    // and appends it to the linked list.
    //
    // `.flush()` happens when, for instance,
    // a consumer reaches the last read-only operations.
    // It then ask the delete queue if there happen to
    // be some unflushed operations.
    //
    fn flush(&self) -> Option<Arc<Block>> {
        let mut self_wlock = self
            .inner
            .write()
            .expect("Failed to acquire write lock on delete queue writer");

        let delete_operations;
        {
            let writer: &mut Vec<DeleteOperation> = &mut self_wlock.writer;
            if writer.is_empty() {
                return None;
            }
            delete_operations = mem::replace(writer, vec![]);
        }

        let next_block = NextBlock::from(self.clone());
        {
            self_wlock.last_block = Some(Arc::new(Block {
                operations: Arc::new(delete_operations),
                next: next_block,
            }));
        }
        self_wlock.last_block.clone()
    }
}

enum InnerNextBlock {
    Writer(DeleteQueue),
    Closed(Arc<Block>),
}

struct NextBlock(RwLock<InnerNextBlock>);

impl From<DeleteQueue> for NextBlock {
    fn from(delete_queue: DeleteQueue) -> NextBlock {
        NextBlock(RwLock::new(InnerNextBlock::Writer(delete_queue)))
    }
}

impl NextBlock {
    fn next_block(&self) -> Option<Arc<Block>> {
        {
            let next_read_lock = self
                .0
                .read()
                .expect("Failed to acquire write lock in delete queue");
            if let InnerNextBlock::Closed(ref block) = *next_read_lock {
                return Some(Arc::clone(block));
            }
        }
        let next_block;
        {
            let mut next_write_lock = self
                .0
                .write()
                .expect("Failed to acquire write lock in delete queue");
            match *next_write_lock {
                InnerNextBlock::Closed(ref block) => {
                    return Some(Arc::clone(block));
                }
                InnerNextBlock::Writer(ref writer) => match writer.flush() {
                    Some(flushed_next_block) => {
                        next_block = flushed_next_block;
                    }
                    None => {
                        return None;
                    }
                },
            }
            *next_write_lock.deref_mut() = InnerNextBlock::Closed(Arc::clone(&next_block));
            Some(next_block)
        }
    }
}

struct Block {
    operations: Arc<Vec<DeleteOperation>>,
    next: NextBlock,
}

#[derive(Clone)]
pub struct DeleteCursor {
    block: Arc<Block>,
    pos: usize,
}

impl DeleteCursor {
    /// Skips operations and position it so that
    /// - either all of the delete operation currently in the
    ///   queue are consume and the next get will return None.
    /// - the next get will return the first operation with an
    /// `opstamp >= target_opstamp`.
    pub fn skip_to(&mut self, target_opstamp: u64) {
        // TODO Can be optimize as we work with block.
        while self.is_behind_opstamp(target_opstamp) {
            self.advance();
        }
    }

    #[cfg_attr(feature = "cargo-clippy", allow(clippy::wrong_self_convention))]
    fn is_behind_opstamp(&mut self, target_opstamp: u64) -> bool {
        self.get()
            .map(|operation| operation.opstamp < target_opstamp)
            .unwrap_or(false)
    }

    /// If the current block has been entirely
    /// consumed, try to load the next one.
    ///
    /// Return `true`, if after this attempt,
    /// the cursor is on a block that has not
    /// been entirely consumed.
    /// Return `false`, if we have reached the end of the queue.
    fn load_block_if_required(&mut self) -> bool {
        if self.pos >= self.block.operations.len() {
            // we have consumed our operations entirely.
            // let's ask our writer if he has more for us.
            // self.go_next_block();
            match self.block.next.next_block() {
                Some(block) => {
                    self.block = block;
                    self.pos = 0;
                    true
                }
                None => false,
            }
        } else {
            true
        }
    }

    /// Advance to the next delete operation.
    /// Returns true iff there is such an operation.
    pub fn advance(&mut self) -> bool {
        if self.load_block_if_required() {
            self.pos += 1;
            true
        } else {
            false
        }
    }

    /// Get the current delete operation.
    /// Calling `.get` does not advance the cursor.
    pub fn get(&mut self) -> Option<&DeleteOperation> {
        if self.load_block_if_required() {
            Some(&self.block.operations[self.pos])
        } else {
            None
        }
    }
}

#[cfg(test)]
mod tests {

    use super::{DeleteOperation, DeleteQueue};
    use schema::{Field, Term};

    #[test]
    fn test_deletequeue() {
        let delete_queue = DeleteQueue::new();

        let make_op = |i: usize| {
            let field = Field(1u32);
            DeleteOperation {
                opstamp: i as u64,
                term: Term::from_field_u64(field, i as u64),
            }
        };

        delete_queue.push(make_op(1));
        delete_queue.push(make_op(2));

        let snapshot = delete_queue.cursor();
        {
            let mut operations_it = snapshot.clone();
            assert_eq!(operations_it.get().unwrap().opstamp, 1);
            operations_it.advance();
            assert_eq!(operations_it.get().unwrap().opstamp, 2);
            operations_it.advance();
            assert!(operations_it.get().is_none());
            operations_it.advance();

            let mut snapshot2 = delete_queue.cursor();
            assert!(snapshot2.get().is_none());
            delete_queue.push(make_op(3));
            assert_eq!(snapshot2.get().unwrap().opstamp, 3);
            assert_eq!(operations_it.get().unwrap().opstamp, 3);
            assert_eq!(operations_it.get().unwrap().opstamp, 3);
            operations_it.advance();
            assert!(operations_it.get().is_none());
            operations_it.advance();
        }
        {
            let mut operations_it = snapshot.clone();
            assert_eq!(operations_it.get().unwrap().opstamp, 1);
            operations_it.advance();
            assert_eq!(operations_it.get().unwrap().opstamp, 2);
            operations_it.advance();
            assert_eq!(operations_it.get().unwrap().opstamp, 3);
            operations_it.advance();
            assert!(operations_it.get().is_none());
        }
    }
}