summaryrefslogtreecommitdiffstats
path: root/lib/src/reactor/mod.rs
blob: b4aa597fa55d229bb1f4a22f607ec7f4a116ffb4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use std::sync::Arc;
use std::fmt::Debug;

use anyhow::Result;
use tokio::sync::RwLock;

use crate::profile::Profile;

mod gossip;
mod device;
mod account;
mod ctrl;

pub use ctrl::ReactorReceiver;
pub use ctrl::ReactorReply;
pub use ctrl::ReactorRequest;
pub use ctrl::ReactorSender;
pub use ctrl::ReplyChannel;

/// Reactor type, for running the application logic
///
/// The Reactor runs the whole application logic, that is syncing with other devices, fetching and
/// keeping profile updates of other accounts, communication on the gossipsub topics... etc
#[derive(Debug)]
pub(super) struct Reactor<CustomReactorRequest, CustomReactorReply>
    where CustomReactorRequest: Debug + Send + Sync,
          CustomReactorReply: Debug + Send + Sync
{
    profile: Arc<RwLock<Profile>>,
    rx: ReactorReceiver<CustomReactorRequest, CustomReactorReply>,
}

impl<CustomReactorRequest, CustomReactorReply> Reactor<CustomReactorRequest, CustomReactorReply>
    where CustomReactorRequest: Debug + Send + Sync,
          CustomReactorReply: Debug + Send + Sync
{
    pub(super) fn new(profile: Arc<RwLock<Profile>>) -> (Self, ReactorSender<CustomReactorRequest, CustomReactorReply>) {
        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
        let reactor = Reactor {
            profile,
            rx,
        };

        (reactor, tx)
    }

    pub async fn head(&self) -> Option<cid::Cid> {
        self.profile.read().await.head().map(cid::Cid::clone)
    }

    pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> {
        self.profile.read().await.connect(peer).await
    }

    pub fn profile(&self) -> Arc<RwLock<Profile>> {
        self.profile.clone()
    }

    pub async fn exit(self) -> Result<()> {
        let mut inner = self.profile;
        loop {
            match Arc::try_unwrap(inner) {
                Err(arc) => inner = arc,
                Ok(inner) => return inner.into_inner().exit().await,
            }
        }
    }

    pub(super) async fn receive_next_message(&mut self) -> Option<(ReactorRequest<CustomReactorRequest>, ReplyChannel<CustomReactorReply>)> {
        self.rx.recv().await
    }

    /// Process the request if it is not a specialized request,
    /// return the specialized request if it is one and cannot be processed by this reactor
    /// implementation
    pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest<CustomReactorRequest>, ReplyChannel<CustomReactorReply>)) -> Result<Option<(CustomReactorRequest, ReplyChannel<CustomReactorReply>)>> {
        match request {
            (ReactorRequest::Ping, reply_channel) => {
                if let Err(_) = reply_channel.send(ReactorReply::Pong) {
                    anyhow::bail!("Failed sending PONG reply")
                }
                Ok(None)
            },

            (ReactorRequest::Exit, reply_channel) => {
                if let Err(_) = reply_channel.send(ReactorReply::Exiting) {
                    anyhow::bail!("Failed sending EXITING reply")
                }
                Ok(None)
            },

            (ReactorRequest::Custom(c), reply_channel) => {
                Ok(Some((c, reply_channel)))
            }
        }
    }

}