app call/message and private routing checkpoint

This commit is contained in:
John Smith
2022-09-25 18:04:53 -04:00
parent 507d02974c
commit baa1714943
29 changed files with 1139 additions and 538 deletions

View File

@@ -1,6 +1,9 @@
mod coders;
mod destination;
mod operation_waiter;
mod private_route;
mod rpc_app_call;
mod rpc_app_message;
mod rpc_cancel_tunnel;
mod rpc_complete_tunnel;
mod rpc_error;
@@ -20,6 +23,7 @@ mod rpc_value_changed;
mod rpc_watch_value;
pub use destination::*;
pub use operation_waiter::*;
pub use private_route::*;
pub use rpc_error::*;
@@ -108,8 +112,7 @@ where
#[derive(Debug)]
struct WaitableReply {
op_id: OperationId,
eventual: EventualValue<(Option<Id>, RPCMessage)>,
handle: OperationWaitHandle<RPCMessage>,
timeout: u64,
node_ref: NodeRef,
send_ts: u64,
@@ -138,62 +141,162 @@ struct RenderedOperation {
/////////////////////////////////////////////////////////////////////
pub struct RPCProcessorInner {
network_manager: NetworkManager,
routing_table: RoutingTable,
node_id: DHTKey,
node_id_secret: DHTKeySecret,
send_channel: Option<flume::Sender<(Option<Id>, RPCMessageEncoded)>>,
timeout: u64,
max_route_hop_count: usize,
waiting_rpc_table: BTreeMap<OperationId, EventualValue<(Option<Id>, RPCMessage)>>,
stop_source: Option<StopSource>,
worker_join_handles: Vec<MustJoinHandle<()>>,
}
pub struct RPCProcessorUnlockedInner {
node_id: DHTKey,
node_id_secret: DHTKeySecret,
timeout: u64,
queue_size: u32,
concurrency: u32,
max_route_hop_count: usize,
validate_dial_info_receipt_time_ms: u32,
update_callback: UpdateCallback,
waiting_rpc_table: OperationWaiter<RPCMessage>,
waiting_app_call_table: OperationWaiter<Vec<u8>>,
}
#[derive(Clone)]
pub struct RPCProcessor {
crypto: Crypto,
config: VeilidConfig,
network_manager: NetworkManager,
routing_table: RoutingTable,
inner: Arc<Mutex<RPCProcessorInner>>,
unlocked_inner: Arc<RPCProcessorUnlockedInner>,
}
impl RPCProcessor {
fn new_inner(network_manager: NetworkManager) -> RPCProcessorInner {
fn new_inner() -> RPCProcessorInner {
RPCProcessorInner {
network_manager: network_manager.clone(),
routing_table: network_manager.routing_table(),
node_id: DHTKey::default(),
node_id_secret: DHTKeySecret::default(),
send_channel: None,
timeout: 10000000,
max_route_hop_count: 7,
waiting_rpc_table: BTreeMap::new(),
stop_source: None,
worker_join_handles: Vec::new(),
}
}
pub fn new(network_manager: NetworkManager) -> Self {
fn new_unlocked_inner(
config: VeilidConfig,
update_callback: UpdateCallback,
) -> RPCProcessorUnlockedInner {
// make local copy of node id for easy access
let c = config.get();
let node_id = c.network.node_id;
let node_id_secret = c.network.node_id_secret;
// set up channel
let mut concurrency = c.network.rpc.concurrency;
let mut queue_size = c.network.rpc.queue_size;
let mut timeout = ms_to_us(c.network.rpc.timeout_ms);
let mut max_route_hop_count = c.network.rpc.max_route_hop_count as usize;
if concurrency == 0 {
concurrency = intf::get_concurrency() / 2;
if concurrency == 0 {
concurrency = 1;
}
}
if queue_size == 0 {
queue_size = 1024;
}
if timeout == 0 {
timeout = 10000000;
}
if max_route_hop_count == 0 {
max_route_hop_count = 7usize;
}
let validate_dial_info_receipt_time_ms = c.network.dht.validate_dial_info_receipt_time_ms;
RPCProcessorUnlockedInner {
node_id,
node_id_secret,
timeout,
queue_size,
concurrency,
max_route_hop_count,
validate_dial_info_receipt_time_ms,
update_callback,
waiting_rpc_table: OperationWaiter::new(),
waiting_app_call_table: OperationWaiter::new(),
}
}
pub fn new(network_manager: NetworkManager, update_callback: UpdateCallback) -> Self {
let config = network_manager.config();
Self {
crypto: network_manager.crypto(),
config: network_manager.config(),
inner: Arc::new(Mutex::new(Self::new_inner(network_manager))),
config: config.clone(),
network_manager: network_manager.clone(),
routing_table: network_manager.routing_table(),
inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner(config, update_callback)),
}
}
pub fn network_manager(&self) -> NetworkManager {
self.inner.lock().network_manager.clone()
self.network_manager.clone()
}
pub fn routing_table(&self) -> RoutingTable {
self.inner.lock().routing_table.clone()
self.routing_table.clone()
}
pub fn node_id(&self) -> DHTKey {
self.inner.lock().node_id
//////////////////////////////////////////////////////////////////////
#[instrument(level = "debug", skip_all, err)]
pub async fn startup(&self) -> EyreResult<()> {
trace!("startup rpc processor");
let mut inner = self.inner.lock();
let channel = flume::bounded(self.unlocked_inner.queue_size as usize);
inner.send_channel = Some(channel.0.clone());
inner.stop_source = Some(StopSource::new());
// spin up N workers
trace!(
"Spinning up {} RPC workers",
self.unlocked_inner.concurrency
);
for _ in 0..self.unlocked_inner.concurrency {
let this = self.clone();
let receiver = channel.1.clone();
let jh = intf::spawn(Self::rpc_worker(
this,
inner.stop_source.as_ref().unwrap().token(),
receiver,
));
inner.worker_join_handles.push(jh);
}
Ok(())
}
pub fn node_id_secret(&self) -> DHTKeySecret {
self.inner.lock().node_id_secret
#[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) {
debug!("starting rpc processor shutdown");
// Stop the rpc workers
let mut unord = FuturesUnordered::new();
{
let mut inner = self.inner.lock();
// take the join handles out
for h in inner.worker_join_handles.drain(..) {
unord.push(h);
}
// drop the stop
drop(inner.stop_source.take());
}
debug!("stopping {} rpc worker tasks", unord.len());
// Wait for them to complete
while unord.next().await.is_some() {}
debug!("resetting rpc processor state");
// Release the rpc processor
*self.inner.lock() = Self::new_inner();
debug!("finished rpc processor shutdown");
}
//////////////////////////////////////////////////////////////////////
@@ -278,71 +381,18 @@ impl RPCProcessor {
})
}
// set up wait for reply
fn add_op_id_waiter(&self, op_id: OperationId) -> EventualValue<(Option<Id>, RPCMessage)> {
let mut inner = self.inner.lock();
let e = EventualValue::new();
inner.waiting_rpc_table.insert(op_id, e.clone());
e
}
// remove wait for reply
fn cancel_op_id_waiter(&self, op_id: OperationId) {
let mut inner = self.inner.lock();
inner.waiting_rpc_table.remove(&op_id);
}
// complete the reply
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
async fn complete_op_id_waiter(&self, msg: RPCMessage) -> Result<(), RPCError> {
let op_id = msg.operation.op_id();
let eventual = {
let mut inner = self.inner.lock();
inner
.waiting_rpc_table
.remove(&op_id)
.ok_or_else(RPCError::else_internal(format!(
"Unmatched operation id: {:#?}",
msg
)))?
};
eventual.resolve((Span::current().id(), msg)).await;
Ok(())
}
// wait for reply
async fn do_wait_for_reply(
&self,
waitable_reply: &WaitableReply,
) -> Result<TimeoutOr<(RPCMessage, u64)>, RPCError> {
let timeout_ms = u32::try_from(waitable_reply.timeout / 1000u64)
.map_err(RPCError::map_internal("invalid timeout"))?;
// wait for eventualvalue
let start_ts = intf::get_timestamp();
let res = intf::timeout(timeout_ms, waitable_reply.eventual.instance())
.await
.into_timeout_or();
Ok(res.map(|res| {
let (_span_id, rpcreader) = res.take_value().unwrap();
let end_ts = intf::get_timestamp();
// fixme: causes crashes? "Missing otel data span extensions"??
//Span::current().follows_from(span_id);
(rpcreader, end_ts - start_ts)
}))
}
#[instrument(level = "trace", skip(self, waitable_reply), err)]
async fn wait_for_reply(
&self,
waitable_reply: WaitableReply,
) -> Result<TimeoutOr<(RPCMessage, u64)>, RPCError> {
let out = self.do_wait_for_reply(&waitable_reply).await;
let out = self
.unlocked_inner
.waiting_rpc_table
.wait_for_op(waitable_reply.handle, waitable_reply.timeout)
.await;
match &out {
Err(_) | Ok(TimeoutOr::Timeout) => {
self.cancel_op_id_waiter(waitable_reply.op_id);
waitable_reply.node_ref.stats_question_lost();
}
Ok(TimeoutOr::Value((rpcreader, _))) => {
@@ -476,7 +526,7 @@ impl RPCProcessor {
}
// Verify hop count isn't larger than out maximum routed hop count
if out_hop_count > self.inner.lock().max_route_hop_count {
if out_hop_count > self.unlocked_inner.max_route_hop_count {
return Err(RPCError::internal("hop count too long for route"))
.map_err(logthru_rpc!(warn));
}
@@ -574,10 +624,10 @@ impl RPCProcessor {
// Calculate answer timeout
// Timeout is number of hops times the timeout per hop
let timeout = self.inner.lock().timeout * (hop_count as u64);
let timeout = self.unlocked_inner.timeout * (hop_count as u64);
// Set up op id eventual
let eventual = self.add_op_id_waiter(op_id);
let handle = self.unlocked_inner.waiting_rpc_table.add_op_waiter(op_id);
// Send question
let bytes = message.len() as u64;
@@ -588,13 +638,11 @@ impl RPCProcessor {
.await
.map_err(|e| {
// If we're returning an error, clean up
self.cancel_op_id_waiter(op_id);
node_ref
.stats_failed_to_send(send_ts, true);
RPCError::network(e)
})? => {
// If we couldn't send we're still cleaning up
self.cancel_op_id_waiter(op_id);
node_ref
.stats_failed_to_send(send_ts, true);
}
@@ -605,8 +653,7 @@ impl RPCProcessor {
// Pass back waitable reply completion
Ok(NetworkResult::value(WaitableReply {
op_id,
eventual,
handle,
timeout,
node_ref,
send_ts,
@@ -794,7 +841,7 @@ impl RPCProcessor {
let mut opt_sender_nr: Option<NodeRef> = None;
if let Some(sender_node_info) = operation.sender_node_info() {
// Sender NodeInfo was specified, update our routing table with it
if !self.filter_node_info(RoutingDomain::PublicInternet, &sender_node_info.node_info) {
if !self.filter_node_info(routing_domain, &sender_node_info.node_info) {
return Err(RPCError::invalid_format(
"sender signednodeinfo has invalid peer scope",
));
@@ -853,6 +900,7 @@ impl RPCProcessor {
RPCOperationKind::Question(q) => match q.detail() {
RPCQuestionDetail::StatusQ(_) => self.process_status_q(msg).await,
RPCQuestionDetail::FindNodeQ(_) => self.process_find_node_q(msg).await,
RPCQuestionDetail::AppCallQ(_) => self.process_app_call_q(msg).await,
RPCQuestionDetail::GetValueQ(_) => self.process_get_value_q(msg).await,
RPCQuestionDetail::SetValueQ(_) => self.process_set_value_q(msg).await,
RPCQuestionDetail::WatchValueQ(_) => self.process_watch_value_q(msg).await,
@@ -871,8 +919,14 @@ impl RPCProcessor {
RPCStatementDetail::ValueChanged(_) => self.process_value_changed(msg).await,
RPCStatementDetail::Signal(_) => self.process_signal(msg).await,
RPCStatementDetail::ReturnReceipt(_) => self.process_return_receipt(msg).await,
RPCStatementDetail::AppMessage(_) => self.process_app_message(msg).await,
},
RPCOperationKind::Answer(_) => self.complete_op_id_waiter(msg).await,
RPCOperationKind::Answer(_) => {
self.unlocked_inner
.waiting_rpc_table
.complete_op_waiter(msg.operation.op_id(), msg)
.await
}
}
}
@@ -908,85 +962,6 @@ impl RPCProcessor {
}
}
#[instrument(level = "debug", skip_all, err)]
pub async fn startup(&self) -> EyreResult<()> {
trace!("startup rpc processor");
let mut inner = self.inner.lock();
// make local copy of node id for easy access
let c = self.config.get();
inner.node_id = c.network.node_id;
inner.node_id_secret = c.network.node_id_secret;
// set up channel
let mut concurrency = c.network.rpc.concurrency;
let mut queue_size = c.network.rpc.queue_size;
let mut timeout = ms_to_us(c.network.rpc.timeout_ms);
let mut max_route_hop_count = c.network.rpc.max_route_hop_count as usize;
if concurrency == 0 {
concurrency = intf::get_concurrency() / 2;
if concurrency == 0 {
concurrency = 1;
}
}
if queue_size == 0 {
queue_size = 1024;
}
if timeout == 0 {
timeout = 10000000;
}
if max_route_hop_count == 0 {
max_route_hop_count = 7usize;
}
inner.timeout = timeout;
inner.max_route_hop_count = max_route_hop_count;
let channel = flume::bounded(queue_size as usize);
inner.send_channel = Some(channel.0.clone());
inner.stop_source = Some(StopSource::new());
// spin up N workers
trace!("Spinning up {} RPC workers", concurrency);
for _ in 0..concurrency {
let this = self.clone();
let receiver = channel.1.clone();
let jh = intf::spawn(Self::rpc_worker(
this,
inner.stop_source.as_ref().unwrap().token(),
receiver,
));
inner.worker_join_handles.push(jh);
}
Ok(())
}
#[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) {
debug!("starting rpc processor shutdown");
// Stop the rpc workers
let mut unord = FuturesUnordered::new();
{
let mut inner = self.inner.lock();
// take the join handles out
for h in inner.worker_join_handles.drain(..) {
unord.push(h);
}
// drop the stop
drop(inner.stop_source.take());
}
debug!("stopping {} rpc worker tasks", unord.len());
// Wait for them to complete
while unord.next().await.is_some() {}
debug!("resetting rpc processor state");
// Release the rpc processor
*self.inner.lock() = Self::new_inner(self.network_manager());
debug!("finished rpc processor shutdown");
}
#[instrument(level = "trace", skip(self, body), err)]
pub fn enqueue_message(
&self,