checkpoint

This commit is contained in:
John Smith
2022-07-03 15:52:27 -04:00
parent 6d1198cd92
commit b010c8a61f
17 changed files with 940 additions and 90 deletions
+76 -83
View File
@@ -43,38 +43,6 @@ impl fmt::Display for Destination {
}
}
#[derive(Debug, Clone)]
pub enum RespondTo {
None,
Sender(Option<SignedNodeInfo>),
PrivateRoute(PrivateRoute),
}
impl RespondTo {
pub fn encode(
&self,
builder: &mut veilid_capnp::operation::respond_to::Builder,
) -> Result<(), RPCError> {
match self {
Self::None => {
builder.set_none(());
}
Self::Sender(Some(sni)) => {
let mut sni_builder = builder.reborrow().init_sender_with_info();
encode_signed_node_info(sni, &mut sni_builder)?;
}
Self::Sender(None) => {
builder.reborrow().set_sender(());
}
Self::PrivateRoute(pr) => {
let mut pr_builder = builder.reborrow().init_private_route();
encode_private_route(pr, &mut pr_builder)?;
}
};
Ok(())
}
}
#[derive(Debug, Clone)]
struct RPCMessageHeader {
timestamp: u64, // time the message was received, not sent
@@ -142,6 +110,56 @@ struct WaitableReply {
send_data_kind: SendDataKind,
}
struct RPCOperationInfo {
name: &'static str,
op_id: u64,
index: u32,
is_q: bool,
wants_answer: bool,
respond_to: RespondTo
}
impl RPCOperationInfo {
pub fn parse(operation_reader: &veilid_capnp::operation::Reader, sender_node_id: &DHTKey) -> Result<Self, RPCError> {
let which_reader = operation_reader.get_detail().which().expect("missing which operation");
let op_id = operation_reader.get_op_id();
let respond_to_reader = operation_reader.get_respond_to();
let respond_to = RespondTo::decode(&respond_to_reader, sender_node_id)?;
let out = match which_reader {
veilid_capnp::operation::detail::StatusQ(_) => Self { name: "StatusQ", op_id, index: 0, is_q: true, wants_answer: true, respond_to },
veilid_capnp::operation::detail::StatusA(_) => Self { name: "StatusA", op_id, index: 1, is_q: false, wants_answer: false, respond_to},
veilid_capnp::operation::detail::ValidateDialInfo(_) => Self { name: "ValidateDialInfo", op_id, index: 2, is_q: true, wants_answer: false, respond_to },
veilid_capnp::operation::detail::FindNodeQ(_) => Self { name: "FindNodeQ", op_id, index: 3, is_q: true, wants_answer: true, respond_to },
veilid_capnp::operation::detail::FindNodeA(_) => Self { name: "FindNodeA", op_id, index: 4, is_q: false, wants_answer: false, respond_to },
veilid_capnp::operation::detail::Route(_) => Self { name: "Route", op_id, index: 5, is_q: true, wants_answer: false, respond_to },
veilid_capnp::operation::detail::NodeInfoUpdate(_) => Self { name: "NodeInfoUpdate", op_id, index: 6, is_q: true, wants_answer: false, respond_to },
veilid_capnp::operation::detail::GetValueQ(_) => Self { name: "GetValueQ", op_id, index: 7, is_q: true, wants_answer: true, respond_to },
veilid_capnp::operation::detail::GetValueA(_) => Self { name: "GetValueA", op_id, index: 8, is_q: false, wants_answer: false, respond_to },
veilid_capnp::operation::detail::SetValueQ(_) => Self { name: "SetValueQ", op_id, index: 9, is_q: true, wants_answer: true, respond_to },
veilid_capnp::operation::detail::SetValueA(_) => Self { name: "SetValueA", op_id, index: 10, is_q: false, wants_answer: false, respond_to},
veilid_capnp::operation::detail::WatchValueQ(_) => Self { name: "WatchValueQ", op_id, index: 11, is_q: true, wants_answer: true, respond_to},
veilid_capnp::operation::detail::WatchValueA(_) => Self { name: "WatchValueA", op_id, index: 12, is_q: false, wants_answer: false, respond_to},
veilid_capnp::operation::detail::ValueChanged(_) => Self { name: "ValueChanged", op_id, index: 13, is_q: true, wants_answer: false, respond_to },
veilid_capnp::operation::detail::SupplyBlockQ(_) => Self { name: "SupplyBlockQ", op_id, index: 14, is_q: true, wants_answer: true, respond_to },
veilid_capnp::operation::detail::SupplyBlockA(_) => Self { name: "SupplyBlockA", op_id, index: 15, is_q: false, wants_answer: false, respond_to },
veilid_capnp::operation::detail::FindBlockQ(_) => Self { name: "FindBlockQ", op_id, index: 16, is_q: true, wants_answer: true, respond_to},
veilid_capnp::operation::detail::FindBlockA(_) =>Self { name: "FindBlockA", op_id, index: 17, is_q: false, wants_answer: false, respond_to},
veilid_capnp::operation::detail::Signal(_) => Self { name: "Signal", op_id, index: 18, is_q: true, wants_answer: false, respond_to },
veilid_capnp::operation::detail::ReturnReceipt(_) => Self { name: "ReturnReceipt", op_id, index: 19, is_q: true, wants_answer: false, respond_to},
veilid_capnp::operation::detail::StartTunnelQ(_) => Self { name: "StartTunnelQ", op_id, index: 20, is_q: true, wants_answer: true, respond_to },
veilid_capnp::operation::detail::StartTunnelA(_) => Self { name: "StartTunnelA", op_id, index: 21, is_q: false, wants_answer: false, respond_to },
veilid_capnp::operation::detail::CompleteTunnelQ(_) =>Self { name: "CompleteTunnelQ", op_id, index: 22, is_q: true, wants_answer: true, respond_to},
veilid_capnp::operation::detail::CompleteTunnelA(_) => Self { name: "CompleteTunnelA", op_id, index: 23, is_q: false, wants_answer: false, respond_to},
veilid_capnp::operation::detail::CancelTunnelQ(_) => Self { name: "CancelTunnelQ", op_id, index: 24, is_q: true, wants_answer: true, respond_to},
veilid_capnp::operation::detail::CancelTunnelA(_) => Self { name: "CancelTunnelA", op_id, index: 25, is_q: false, wants_answer: false, respond_to},
};
Ok(out)
}
}
/////////////////////////////////////////////////////////////////////
#[derive(Clone, Debug, Default)]
@@ -403,6 +421,7 @@ impl RPCProcessor {
}
// Issue a request over the network, possibly using an anonymized route
#[instrument(level = "debug", skip(self, message, safety_route_spec), err)]
async fn request<T: capnp::message::ReaderSegments>(
&self,
dest: Destination,
@@ -410,11 +429,15 @@ impl RPCProcessor {
safety_route_spec: Option<&SafetyRouteSpec>,
) -> Result<Option<WaitableReply>, RPCError> {
let (op_id, wants_answer) = {
let info = {
let operation = message
.get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_internal!("invalid operation"))
.map_err(logthru_rpc!(error))?;
RPCOperationInfo::parse()
}
let (op_id, wants_answer) = {
let op_id = operation.get_op_id();
let wants_answer = self.wants_answer(&operation).map_err(logthru_rpc!())?;
@@ -549,9 +572,12 @@ impl RPCProcessor {
None
};
// send question
// Log rpc receive
debug!(target: "rpc_message", dir = "send", is_q, kind = Self::get_rpc_operation_detail_debug_info(&which_reader), op_id = operation.get_op_id(), sender_id = msg.header.envelope.get_sender_id().encode());
log_rpc!(debug "==>> REQUEST({}) -> {:?}", self.get_rpc_message_debug_info(&message), dest);
// send question
let bytes = out.len() as u64;
let send_ts = intf::get_timestamp();
let send_data_kind = match self
@@ -597,6 +623,7 @@ impl RPCProcessor {
// Issue a reply over the network, possibly using an anonymized route
// The request must want a response, or this routine fails
#[instrument(level = "debug", skip(self, request_rpcreader, reply_msg, safety_route_spec), err)]
async fn reply<T: capnp::message::ReaderSegments>(
&self,
request_rpcreader: RPCMessageReader,
@@ -761,44 +788,9 @@ impl RPCProcessor {
Ok(())
}
fn wants_answer(&self, operation: &veilid_capnp::operation::Reader) -> Result<bool, RPCError> {
match operation
.get_respond_to()
.which()
.map_err(map_error_capnp_notinschema!())?
{
veilid_capnp::operation::respond_to::None(_) => Ok(false),
veilid_capnp::operation::respond_to::Sender(_)
| veilid_capnp::operation::respond_to::SenderWithInfo(_)
| veilid_capnp::operation::respond_to::PrivateRoute(_) => Ok(true),
}
}
fn get_respond_to_sender_signed_node_info(
&self,
operation: &veilid_capnp::operation::Reader,
sender_node_id: &DHTKey,
) -> Result<Option<SignedNodeInfo>, RPCError> {
match operation
.get_respond_to()
.which()
.map_err(map_error_capnp_notinschema!())?
{
veilid_capnp::operation::respond_to::SenderWithInfo(Ok(sender_ni_reader)) => Ok(Some(
decode_signed_node_info(&sender_ni_reader, sender_node_id, true)?,
)),
veilid_capnp::operation::respond_to::SenderWithInfo(Err(e)) => Err(rpc_error_protocol(
format!("invalid sender_with_info signed node info: {}", e),
)),
veilid_capnp::operation::respond_to::None(_)
| veilid_capnp::operation::respond_to::Sender(_)
| veilid_capnp::operation::respond_to::PrivateRoute(_) => Ok(None),
}
}
//////////////////////////////////////////////////////////////////////
async fn generate_sender_info(&self, peer_noderef: NodeRef) -> SenderInfo {
async fn generate_sender_info(peer_noderef: NodeRef) -> SenderInfo {
let socket_address = peer_noderef
.last_connection()
.await
@@ -808,7 +800,7 @@ impl RPCProcessor {
async fn process_status_q(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> {
let peer_noderef = rpcreader.header.peer_noderef.clone();
let sender_info = self.generate_sender_info(peer_noderef).await;
let sender_info = Self::generate_sender_info(peer_noderef).await;
let reply_msg = {
let operation = rpcreader
@@ -818,19 +810,20 @@ impl RPCProcessor {
.map_err(logthru_rpc!())?;
// Don't bother unless we are going to answer
if !self.wants_answer(&operation)? {
if !Self::wants_answer(&operation)? {
return Ok(());
}
// get StatusQ reader
let iq_reader = match operation.get_detail().which() {
Ok(veilid_capnp::operation::detail::Which::StatusQ(Ok(x))) => x,
let which_reader = operation.get_detail().which().expect("missing which operation");
let statusq_reader = match which_reader {
veilid_capnp::operation::detail::Which::StatusQ(Ok(x)) => x,
_ => panic!("invalid operation type in process_status_q"),
};
// Parse out fields
let node_status = decode_node_status(
&iq_reader
&statusq_reader
.get_node_status()
.map_err(map_error_internal!("no valid node status"))?,
)?;
@@ -1253,11 +1246,12 @@ impl RPCProcessor {
.get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?;
let (which, is_q) = match operation
let which_reader = operation
.get_detail()
.which()
.map_err(map_error_capnp_notinschema!())?
.map_err(map_error_capnp_notinschema!())?;
let (which, is_q) = match which_reader
{
veilid_capnp::operation::detail::StatusQ(_) => (0u32, true),
veilid_capnp::operation::detail::StatusA(_) => (1u32, false),
@@ -1287,11 +1281,8 @@ impl RPCProcessor {
veilid_capnp::operation::detail::CancelTunnelA(_) => (25u32, false),
};
log_rpc!(debug "<<== {}({}) <- {:?}",
if is_q { "REQUEST" } else { "REPLY" },
self.get_rpc_message_debug_info(&reader),
msg.header.envelope.get_sender_id()
);
// Log rpc receive
debug!(target: "rpc_message", dir = "recv", is_q, kind = Self::get_rpc_operation_detail_debug_info(&which_reader), op_id = operation.get_op_id(), sender_id = msg.header.envelope.get_sender_id().encode());
// Accounting for questions we receive
if is_q {
@@ -1384,6 +1375,7 @@ impl RPCProcessor {
}
}
#[instrument(level = "debug", skip_all, err)]
pub async fn startup(&self) -> Result<(), String> {
trace!("startup rpc processor");
let mut inner = self.inner.lock();
@@ -1430,6 +1422,7 @@ impl RPCProcessor {
Ok(())
}
#[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) {
debug!("starting rpc processor shutdown");