summaryrefslogtreecommitdiffstats
path: root/src/context/worker_context.rs
blob: 5bdf055500a954898d5d0e743417357b7ce71296 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use std::collections::vec_deque::Iter;
use std::collections::{HashMap, VecDeque};
use std::sync::mpsc;
use std::thread;

use crate::event::AppEvent;
use crate::io::{FileOperationProgress, IoWorkerObserver, IoWorkerThread};

pub struct WorkerContext {
    // forks of applications
    child_pool: HashMap<u32, thread::JoinHandle<()>>,
    // to send info
    event_tx: mpsc::Sender<AppEvent>,
    // queue of IO workers
    worker_queue: VecDeque<IoWorkerThread>,
    // current worker
    worker: Option<IoWorkerObserver>,
}

impl WorkerContext {
    pub fn new(event_tx: mpsc::Sender<AppEvent>) -> Self {
        Self {
            child_pool: HashMap::new(),
            event_tx,
            worker_queue: VecDeque::new(),
            worker: None,
        }
    }
    pub fn clone_event_tx(&self) -> mpsc::Sender<AppEvent> {
        self.event_tx.clone()
    }
    // worker related
    pub fn push_worker(&mut self, thread: IoWorkerThread) {
        self.worker_queue.push_back(thread);
        // error is ignored
        let _ = self.event_tx.send(AppEvent::IoWorkerCreate);
    }
    pub fn is_busy(&self) -> bool {
        self.worker.is_some()
    }
    pub fn is_empty(&self) -> bool {
        self.worker_queue.is_empty()
    }

    pub fn iter(&self) -> Iter<IoWorkerThread> {
        self.worker_queue.iter()
    }
    pub fn worker_ref(&self) -> Option<&IoWorkerObserver> {
        self.worker.as_ref()
    }

    pub fn set_progress(&mut self, res: FileOperationProgress) {
        if let Some(s) = self.worker.as_mut() {
            s.set_progress(res);
        }
    }

    pub fn get_msg(&self) -> Option<&str> {
        let worker = self.worker.as_ref()?;
        Some(worker.get_msg())
    }
    pub fn update_msg(&mut self) {
        if let Some(s) = self.worker.as_mut() {
            s.update_msg();
        }
    }

    pub fn start_next_job(&mut self) {
        let tx = self.clone_event_tx();

        if let Some(worker) = self.worker_queue.pop_front() {
            let src = worker.paths[0].parent().unwrap().to_path_buf();
            let dest = worker.dest.clone();
            let handle = thread::spawn(move || {
                let (wtx, wrx) = mpsc::channel();
                // start worker
                let worker_handle = thread::spawn(move || worker.start(wtx));
                // relay worker info to event loop
                while let Ok(progress) = wrx.recv() {
                    let _ = tx.send(AppEvent::FileOperationProgress(progress));
                }
                let result = worker_handle.join();

                match result {
                    Ok(res) => {
                        let _ = tx.send(AppEvent::IoWorkerResult(res));
                    }
                    Err(_) => {
                        let err = std::io::Error::new(std::io::ErrorKind::Other, "Sending Error");
                        let _ = tx.send(AppEvent::IoWorkerResult(Err(err)));
                    }
                }
            });
            let observer = IoWorkerObserver::new(handle, src, dest);
            self.worker = Some(observer);
        }
    }

    pub fn remove_worker(&mut self) -> Option<IoWorkerObserver> {
        self.worker.take()
    }

    pub fn push_child(&mut self, child_id: u32, handle: thread::JoinHandle<()>) {
        self.child_pool.insert(child_id, handle);
    }

    pub fn join_child(&mut self, child_id: u32) {
        if let Some(handle) = self.child_pool.remove(&child_id) {
            let _ = handle.join();
        }
    }
}