summaryrefslogtreecommitdiffstats
path: root/atuin-client/src/sync.rs
blob: 16524fc6f4897d441faa87d65dd529a032f6694e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use std::convert::TryInto;

use chrono::prelude::*;
use eyre::Result;

use atuin_common::{api::AddHistoryRequest, utils::hash_str};

use crate::api_client;
use crate::database::Database;
use crate::encryption::{encrypt, load_encoded_key, load_key};
use crate::settings::{Settings, HISTORY_PAGE_SIZE};

// Currently sync is kinda naive, and basically just pages backwards through
// history. This means newly added stuff shows up properly! We also just use
// the total count in each database to indicate whether a sync is needed.
// I think this could be massively improved! If we had a way of easily
// indicating count per time period (hour, day, week, year, etc) then we can
// easily pinpoint where we are missing data and what needs downloading. Start
// with year, then find the week, then the day, then the hour, then download it
// all! The current naive approach will do for now.

// Check if remote has things we don't, and if so, download them.
// Returns (num downloaded, total local)
async fn sync_download(
    force: bool,
    client: &api_client::Client<'_>,
    db: &mut (impl Database + Send),
) -> Result<(i64, i64)> {
    debug!("starting sync download");

    let remote_count = client.count().await?;

    let initial_local = db.history_count().await?;
    let mut local_count = initial_local;

    let mut last_sync = if force {
        Utc.timestamp_millis(0)
    } else {
        Settings::last_sync()?
    };

    let mut last_timestamp = Utc.timestamp_millis(0);

    let host = if force { Some(String::from("")) } else { None };

    while remote_count > local_count {
        let page = client
            .get_history(last_sync, last_timestamp, host.clone())
            .await?;

        db.save_bulk(&page).await?;

        local_count = db.history_count().await?;

        if page.len() < HISTORY_PAGE_SIZE.try_into().unwrap() {
            break;
        }

        let page_last = page
            .last()
            .expect("could not get last element of page")
            .timestamp;

        // in the case of a small sync frequency, it's possible for history to
        // be "lost" between syncs. In this case we need to rewind the sync
        // timestamps
        if page_last == last_timestamp {
            last_timestamp = Utc.timestamp_millis(0);
            last_sync = last_sync - chrono::Duration::hours(1);
        } else {
            last_timestamp = page_last;
        }
    }

    Ok((local_count - initial_local, local_count))
}

// Check if we have things remote doesn't, and if so, upload them
async fn sync_upload(
    settings: &Settings,
    _force: bool,
    client: &api_client::Client<'_>,
    db: &mut (impl Database + Send),
) -> Result<()> {
    debug!("starting sync upload");

    let initial_remote_count = client.count().await?;
    let mut remote_count = initial_remote_count;

    let local_count = db.history_count().await?;

    debug!("remote has {}, we have {}", remote_count, local_count);

    let key = load_key(settings)?; // encryption key

    // first just try the most recent set

    let mut cursor = Utc::now();

    while local_count > remote_count {
        let last = db.before(cursor, HISTORY_PAGE_SIZE).await?;
        let mut buffer = Vec::new();

        if last.is_empty() {
            break;
        }

        for i in last {
            let data = encrypt(&i, &key)?;
            let add_hist = AddHistoryRequest {
                id: i.id.into(),
                timestamp: i.timestamp,
                data,
                hostname: hash_str(&i.hostname).into(),
            };

            buffer.push(add_hist);
        }

        // anything left over outside of the 100 block size
        client.post_history(&buffer).await?;
        cursor = buffer.last().unwrap().timestamp;
        remote_count = client.count().await?;

        debug!("upload cursor: {:?}", cursor);
    }

    Ok(())
}

pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> {
    let client = api_client::Client::new(
        &settings.sync_address,
        &settings.session_token,
        load_encoded_key(settings)?,
    )?;

    sync_upload(settings, force, &client, db).await?;

    let download = sync_download(force, &client, db).await?;

    debug!("sync downloaded {}", download.0);

    Settings::save_sync_time()?;

    Ok(())
}