summaryrefslogtreecommitdiffstats
path: root/src/file_sizes/file_sizes_unix.rs
blob: 48940c8667663633657420a3b38ccaa8ef8e658e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use {
    crate::task_sync::TaskLifetime,
    crossbeam::{channel::unbounded, sync::WaitGroup},
    std::{
        collections::HashSet,
        fs,
        os::unix::fs::MetadataExt,
        path::{Path, PathBuf},
        sync::{
            atomic::{AtomicIsize, AtomicU64, Ordering},
            Arc, Mutex,
        },
        thread,
        time::Duration,
    },
    super::FileSize,
};

pub fn compute_dir_size(path: &Path, tl: &TaskLifetime) -> Option<u64> {
    debug!("compute size of dir {:?} --------------- ", path);
    let inodes = Arc::new(Mutex::new(HashSet::<u64>::default())); // to avoid counting twice an inode
    // the computation is done on blocks of 512 bytes
    // see https://doc.rust-lang.org/std/os/unix/fs/trait.MetadataExt.html#tymethod.blocks
    let blocks = Arc::new(AtomicU64::new(0));

    // this MPMC channel contains the directory paths which must be handled
    let (dirs_sender, dirs_receiver) = unbounded();

    // busy is the number of directories which are either being processed or queued
    // We use this count to determine when threads can stop waiting for tasks
    let busy = Arc::new(AtomicIsize::new(0));
    busy.fetch_add(1, Ordering::Relaxed);
    dirs_sender.send(Some(PathBuf::from(path))).unwrap();

    let wg = WaitGroup::new();
    let period = Duration::from_micros(50);
    for _ in 0..8 {
        let blocks = Arc::clone(&blocks);
        let busy = Arc::clone(&busy);
        let wg = wg.clone();
        let (dirs_sender, dirs_receiver) = (dirs_sender.clone(), dirs_receiver.clone());
        let tl = tl.clone();
        let inodes = inodes.clone();
        thread::spawn(move || {
            loop {
                let o = dirs_receiver.recv_timeout(period);
                if let Ok(Some(open_dir)) = o {
                    if let Ok(entries) = fs::read_dir(&open_dir) {
                        for e in entries.flatten() {
                            if let Ok(md) = e.metadata() {
                                if md.is_dir() {
                                    busy.fetch_add(1, Ordering::Relaxed);
                                    dirs_sender.send(Some(e.path())).unwrap();
                                } else if md.nlink() > 1 {
                                    let mut inodes = inodes.lock().unwrap();
                                    if !inodes.insert(md.ino()) {
                                        // it was already in the set
                                        continue; // let's not add the blocks
                                    }
                                }
                                blocks.fetch_add(md.blocks(), Ordering::Relaxed);
                            }
                        }
                    }
                    busy.fetch_sub(1, Ordering::Relaxed);
                    dirs_sender.send(None).unwrap();
                } else if busy.load(Ordering::Relaxed) < 1 {
                    break;
                }
                if tl.is_expired() {
                    break;
                }
            }
            drop(wg);
        });
    }
    wg.wait();

    if tl.is_expired() {
        return None;
    }
    let blocks = blocks.load(Ordering::Relaxed);
    Some(blocks*512)
}

pub fn compute_file_size(path: &Path) -> FileSize {
    match fs::metadata(path) {
        Ok(md) => {
            let nominal_size = md.size();
            let block_size = md.blocks() * 512;
            FileSize::new(block_size.min(nominal_size), block_size < nominal_size)
        }
        Err(_) => FileSize::new(0, false),
    }
}