1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
use std::collections::HashSet;
use std::fs;
use std::path::PathBuf;
use std::thread;
use crossbeam_channel as channel;
use rayon::{self, prelude::*};
use crate::unique_id::{generate_unique_id, UniqueID};
pub enum Err {
NoMetadataForPath(PathBuf),
CouldNotReadDir(PathBuf),
}
enum Message {
SizeEntry(Option<UniqueID>, u64),
Error { err: Err },
}
fn walk(tx: channel::Sender<Message>, entries: &[PathBuf]) {
entries.into_par_iter().for_each_with(tx, |tx_ref, entry| {
if let Ok(metadata) = entry.symlink_metadata() {
let unique_id = generate_unique_id(&metadata);
let size = metadata.len();
tx_ref.send(Message::SizeEntry(unique_id, size)).unwrap();
if metadata.is_dir() {
let mut children = vec![];
match fs::read_dir(entry) {
Ok(child_entries) => {
for child_entry in child_entries {
if let Ok(child_entry) = child_entry {
children.push(child_entry.path());
}
}
}
Err(_) => {
tx_ref
.send(Message::Error {
err: Err::CouldNotReadDir(entry.clone()),
})
.unwrap();
}
}
walk(tx_ref.clone(), &children[..]);
};
} else {
tx_ref
.send(Message::Error {
err: Err::NoMetadataForPath(entry.clone()),
})
.unwrap();
};
});
}
pub struct Walk<'a> {
root_directories: &'a [PathBuf],
num_threads: usize,
}
impl<'a> Walk<'a> {
pub fn new(root_directories: &'a [PathBuf], num_threads: usize) -> Walk {
Walk {
root_directories,
num_threads,
}
}
pub fn run(&self) -> (u64, Vec<Err>) {
let (tx, rx) = channel::unbounded();
let receiver_thread = thread::spawn(move || {
let mut total = 0;
let mut ids = HashSet::new();
let mut error_messages: Vec<Err> = Vec::new();
for msg in rx {
match msg {
Message::SizeEntry(unique_id, size) => {
if let Some(unique_id) = unique_id {
// Only count this entry if the ID has not been seen
if ids.insert(unique_id) {
total += size;
}
} else {
total += size;
}
}
Message::Error { err } => {
error_messages.push(err);
}
}
}
(total, error_messages)
});
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(self.num_threads)
.build()
.unwrap();
pool.install(|| walk(tx, self.root_directories));
receiver_thread.join().unwrap()
}
}
|