summaryrefslogtreecommitdiffstats
path: root/atuin-client/src/sync.rs
blob: 1f0d3dd81bf3eac93817fe1595a04c29c59f76a2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
use std::collections::HashSet;
use std::convert::TryInto;
use std::iter::FromIterator;

use eyre::Result;

use atuin_common::api::AddHistoryRequest;
use crypto_secretbox::Key;
use time::OffsetDateTime;

use crate::{
    api_client,
    database::Database,
    encryption::{decrypt, encrypt, load_key},
    settings::Settings,
};

pub fn hash_str(string: &str) -> String {
    use sha2::{Digest, Sha256};
    let mut hasher = Sha256::new();
    hasher.update(string.as_bytes());
    hex::encode(hasher.finalize())
}

// Currently sync is kinda naive, and basically just pages backwards through
// history. This means newly added stuff shows up properly! We also just use
// the total count in each database to indicate whether a sync is needed.
// I think this could be massively improved! If we had a way of easily
// indicating count per time period (hour, day, week, year, etc) then we can
// easily pinpoint where we are missing data and what needs downloading. Start
// with year, then find the week, then the day, then the hour, then download it
// all! The current naive approach will do for now.

// Check if remote has things we don't, and if so, download them.
// Returns (num downloaded, total local)
async fn sync_download(
    key: &Key,
    force: bool,
    client: &api_client::Client<'_>,
    db: &(impl Database + Send),
) -> Result<(i64, i64)> {
    debug!("starting sync download");

    let remote_status = client.status().await?;
    let remote_count = remote_status.count;

    // useful to ensure we don't even save something that hasn't yet been synced + deleted
    let remote_deleted =
        HashSet::<&str>::from_iter(remote_status.deleted.iter().map(String::as_str));

    let initial_local = db.history_count(true).await?;
    let mut local_count = initial_local;

    let mut last_sync = if force {
        OffsetDateTime::UNIX_EPOCH
    } else {
        Settings::last_sync()?
    };

    let mut last_timestamp = OffsetDateTime::UNIX_EPOCH;

    let host = if force { Some(String::from("")) } else { None };

    while remote_count > local_count {
        let page = client
            .get_history(last_sync, last_timestamp, host.clone())
            .await?;

        let history: Vec<_> = page
            .history
            .iter()
            // TODO: handle deletion earlier in this chain
            .map(|h| serde_json::from_str(h).expect("invalid base64"))
            .map(|h| decrypt(h, key).expect("failed to decrypt history! check your key"))
            .map(|mut h| {
                if remote_deleted.contains(h.id.0.as_str()) {
                    h.deleted_at = Some(time::OffsetDateTime::now_utc());
                    h.command = String::from("");
                }

                h
            })
            .collect();

        db.save_bulk(&history).await?;

        local_count = db.history_count(true).await?;

        if history.len() < remote_status.page_size.try_into().unwrap() {
            break;
        }

        let page_last = history
            .last()
            .expect("could not get last element of page")
            .timestamp;

        // in the case of a small sync frequency, it's possible for history to
        // be "lost" between syncs. In this case we need to rewind the sync
        // timestamps
        if page_last == last_timestamp {
            last_timestamp = OffsetDateTime::UNIX_EPOCH;
            last_sync -= time::Duration::hours(1);
        } else {
            last_timestamp = page_last;
        }
    }

    for i in remote_status.deleted {
        // we will update the stored history to have this data
        // pretty much everything can be nullified
        if let Some(h) = db.load(i.as_str()).await? {
            db.delete(h).await?;
        } else {
            info!(
                "could not delete history with id {}, not found locally",
                i.as_str()
            );
        }
    }

    Ok((local_count - initial_local, local_count))
}

// Check if we have things remote doesn't, and if so, upload them
async fn sync_upload(
    key: &Key,
    _force: bool,
    client: &api_client::Client<'_>,
    db: &(impl Database + Send),
) -> Result<()> {
    debug!("starting sync upload");

    let remote_status = client.status().await?;
    let remote_deleted: HashSet<String> = HashSet::from_iter(remote_status.deleted.clone());

    let initial_remote_count = client.count().await?;
    let mut remote_count = initial_remote_count;

    let local_count = db.history_count(true).await?;

    debug!("remote has {}, we have {}", remote_count, local_count);

    // first just try the most recent set
    let mut cursor = OffsetDateTime::now_utc();

    while local_count > remote_count {
        let last = db.before(cursor, remote_status.page_size).await?;
        let mut buffer = Vec::new();

        if last.is_empty() {
            break;
        }

        for i in last {
            let data = encrypt(&i, key)?;
            let data = serde_json::to_string(&data)?;

            let add_hist = AddHistoryRequest {
                id: i.id.to_string(),
                timestamp: i.timestamp,
                data,
                hostname: hash_str(&i.hostname),
            };

            buffer.push(add_hist);
        }

        // anything left over outside of the 100 block size
        client.post_history(&buffer).await?;
        cursor = buffer.last().unwrap().timestamp;
        remote_count = client.count().await?;

        debug!("upload cursor: {:?}", cursor);
    }

    let deleted = db.deleted().await?;

    for i in deleted {
        if remote_deleted.contains(&i.id.to_string()) {
            continue;
        }

        info!("deleting {} on remote", i.id);
        client.delete_history(i).await?;
    }

    Ok(())
}

pub async fn sync(settings: &Settings, force: bool, db: &(impl Database + Send)) -> Result<()> {
    let client = api_client::Client::new(
        &settings.sync_address,
        &settings.session_token,
        settings.network_connect_timeout,
        settings.network_timeout,
    )?;

    Settings::save_sync_time()?;

    let key = load_key(settings)?; // encryption key

    sync_upload(&key, force, &client, db).await?;

    let download = sync_download(&key, force, &client, db).await?;

    debug!("sync downloaded {}", download.0);

    Ok(())
}