1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
use std::sync::Arc;
use std::fmt::Debug;
use anyhow::Result;
use tokio::sync::RwLock;
use crate::profile::Profile;
mod gossip;
mod device;
mod account;
mod ctrl;
pub use ctrl::ReactorReceiver;
pub use ctrl::ReactorReply;
pub use ctrl::ReactorRequest;
pub use ctrl::ReactorSender;
pub use ctrl::ReplyChannel;
/// Reactor type, for running the application logic
///
/// The Reactor runs the whole application logic, that is syncing with other devices, fetching and
/// keeping profile updates of other accounts, communication on the gossipsub topics... etc
#[derive(Debug)]
pub(super) struct Reactor<CustomReactorRequest, CustomReactorReply>
where CustomReactorRequest: Debug + Send + Sync,
CustomReactorReply: Debug + Send + Sync
{
profile: Arc<RwLock<Profile>>,
rx: ReactorReceiver<CustomReactorRequest, CustomReactorReply>,
}
impl<CustomReactorRequest, CustomReactorReply> Reactor<CustomReactorRequest, CustomReactorReply>
where CustomReactorRequest: Debug + Send + Sync,
CustomReactorReply: Debug + Send + Sync
{
pub(super) fn new(profile: Arc<RwLock<Profile>>) -> (Self, ReactorSender<CustomReactorRequest, CustomReactorReply>) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let reactor = Reactor {
profile,
rx,
};
(reactor, tx)
}
pub async fn head(&self) -> Option<cid::Cid> {
self.profile.read().await.head().map(cid::Cid::clone)
}
pub async fn connect(&self, peer: ipfs::MultiaddrWithPeerId) -> Result<()> {
self.profile.read().await.connect(peer).await
}
pub fn profile(&self) -> Arc<RwLock<Profile>> {
self.profile.clone()
}
pub async fn exit(self) -> Result<()> {
let mut inner = self.profile;
loop {
match Arc::try_unwrap(inner) {
Err(arc) => inner = arc,
Ok(inner) => return inner.into_inner().exit().await,
}
}
}
pub(super) async fn receive_next_message(&mut self) -> Option<(ReactorRequest<CustomReactorRequest>, ReplyChannel<CustomReactorReply>)> {
self.rx.recv().await
}
/// Process the request if it is not a specialized request,
/// return the specialized request if it is one and cannot be processed by this reactor
/// implementation
pub(super) async fn process_reactor_message(&mut self, request: (ReactorRequest<CustomReactorRequest>, ReplyChannel<CustomReactorReply>)) -> Result<Option<(CustomReactorRequest, ReplyChannel<CustomReactorReply>)>> {
match request {
(ReactorRequest::Ping, reply_channel) => {
if let Err(_) = reply_channel.send(ReactorReply::Pong) {
anyhow::bail!("Failed sending PONG reply")
}
Ok(None)
},
(ReactorRequest::Exit, reply_channel) => {
if let Err(_) = reply_channel.send(ReactorReply::Exiting) {
anyhow::bail!("Failed sending EXITING reply")
}
Ok(None)
},
(ReactorRequest::Custom(c), reply_channel) => {
Ok(Some((c, reply_channel)))
}
}
}
}
|