checkpoint
This commit is contained in:
@@ -110,56 +110,6 @@ struct WaitableReply {
|
||||
send_data_kind: SendDataKind,
|
||||
}
|
||||
|
||||
struct RPCOperationInfo {
|
||||
name: &'static str,
|
||||
op_id: u64,
|
||||
index: u32,
|
||||
is_q: bool,
|
||||
wants_answer: bool,
|
||||
respond_to: RespondTo
|
||||
}
|
||||
|
||||
impl RPCOperationInfo {
|
||||
pub fn parse(operation_reader: &veilid_capnp::operation::Reader, sender_node_id: &DHTKey) -> Result<Self, RPCError> {
|
||||
let which_reader = operation_reader.get_detail().which().expect("missing which operation");
|
||||
let op_id = operation_reader.get_op_id();
|
||||
|
||||
let respond_to_reader = operation_reader.get_respond_to();
|
||||
let respond_to = RespondTo::decode(&respond_to_reader, sender_node_id)?;
|
||||
|
||||
let out = match which_reader {
|
||||
veilid_capnp::operation::detail::StatusQ(_) => Self { name: "StatusQ", op_id, index: 0, is_q: true, wants_answer: true, respond_to },
|
||||
veilid_capnp::operation::detail::StatusA(_) => Self { name: "StatusA", op_id, index: 1, is_q: false, wants_answer: false, respond_to},
|
||||
veilid_capnp::operation::detail::ValidateDialInfo(_) => Self { name: "ValidateDialInfo", op_id, index: 2, is_q: true, wants_answer: false, respond_to },
|
||||
veilid_capnp::operation::detail::FindNodeQ(_) => Self { name: "FindNodeQ", op_id, index: 3, is_q: true, wants_answer: true, respond_to },
|
||||
veilid_capnp::operation::detail::FindNodeA(_) => Self { name: "FindNodeA", op_id, index: 4, is_q: false, wants_answer: false, respond_to },
|
||||
veilid_capnp::operation::detail::Route(_) => Self { name: "Route", op_id, index: 5, is_q: true, wants_answer: false, respond_to },
|
||||
veilid_capnp::operation::detail::NodeInfoUpdate(_) => Self { name: "NodeInfoUpdate", op_id, index: 6, is_q: true, wants_answer: false, respond_to },
|
||||
veilid_capnp::operation::detail::GetValueQ(_) => Self { name: "GetValueQ", op_id, index: 7, is_q: true, wants_answer: true, respond_to },
|
||||
veilid_capnp::operation::detail::GetValueA(_) => Self { name: "GetValueA", op_id, index: 8, is_q: false, wants_answer: false, respond_to },
|
||||
veilid_capnp::operation::detail::SetValueQ(_) => Self { name: "SetValueQ", op_id, index: 9, is_q: true, wants_answer: true, respond_to },
|
||||
veilid_capnp::operation::detail::SetValueA(_) => Self { name: "SetValueA", op_id, index: 10, is_q: false, wants_answer: false, respond_to},
|
||||
veilid_capnp::operation::detail::WatchValueQ(_) => Self { name: "WatchValueQ", op_id, index: 11, is_q: true, wants_answer: true, respond_to},
|
||||
veilid_capnp::operation::detail::WatchValueA(_) => Self { name: "WatchValueA", op_id, index: 12, is_q: false, wants_answer: false, respond_to},
|
||||
veilid_capnp::operation::detail::ValueChanged(_) => Self { name: "ValueChanged", op_id, index: 13, is_q: true, wants_answer: false, respond_to },
|
||||
veilid_capnp::operation::detail::SupplyBlockQ(_) => Self { name: "SupplyBlockQ", op_id, index: 14, is_q: true, wants_answer: true, respond_to },
|
||||
veilid_capnp::operation::detail::SupplyBlockA(_) => Self { name: "SupplyBlockA", op_id, index: 15, is_q: false, wants_answer: false, respond_to },
|
||||
veilid_capnp::operation::detail::FindBlockQ(_) => Self { name: "FindBlockQ", op_id, index: 16, is_q: true, wants_answer: true, respond_to},
|
||||
veilid_capnp::operation::detail::FindBlockA(_) =>Self { name: "FindBlockA", op_id, index: 17, is_q: false, wants_answer: false, respond_to},
|
||||
veilid_capnp::operation::detail::Signal(_) => Self { name: "Signal", op_id, index: 18, is_q: true, wants_answer: false, respond_to },
|
||||
veilid_capnp::operation::detail::ReturnReceipt(_) => Self { name: "ReturnReceipt", op_id, index: 19, is_q: true, wants_answer: false, respond_to},
|
||||
veilid_capnp::operation::detail::StartTunnelQ(_) => Self { name: "StartTunnelQ", op_id, index: 20, is_q: true, wants_answer: true, respond_to },
|
||||
veilid_capnp::operation::detail::StartTunnelA(_) => Self { name: "StartTunnelA", op_id, index: 21, is_q: false, wants_answer: false, respond_to },
|
||||
veilid_capnp::operation::detail::CompleteTunnelQ(_) =>Self { name: "CompleteTunnelQ", op_id, index: 22, is_q: true, wants_answer: true, respond_to},
|
||||
veilid_capnp::operation::detail::CompleteTunnelA(_) => Self { name: "CompleteTunnelA", op_id, index: 23, is_q: false, wants_answer: false, respond_to},
|
||||
veilid_capnp::operation::detail::CancelTunnelQ(_) => Self { name: "CancelTunnelQ", op_id, index: 24, is_q: true, wants_answer: true, respond_to},
|
||||
veilid_capnp::operation::detail::CancelTunnelA(_) => Self { name: "CancelTunnelA", op_id, index: 25, is_q: false, wants_answer: false, respond_to},
|
||||
};
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
@@ -175,6 +125,12 @@ pub struct FindNodeAnswer {
|
||||
pub peers: Vec<PeerInfo>, // the list of closer peers
|
||||
}
|
||||
|
||||
struct RenderedOperation {
|
||||
out: Vec<u8>, // The rendered operation bytes
|
||||
out_node_id: DHTKey, // Node id we're sending to
|
||||
out_noderef: Option<NodeRef>, // Node to send envelope to (may not be destination node id in case of relay)
|
||||
hopcount: usize, // Total safety + private route hop count
|
||||
}
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
|
||||
pub struct RPCProcessorInner {
|
||||
@@ -244,10 +200,6 @@ impl RPCProcessor {
|
||||
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
|
||||
fn get_next_op_id(&self) -> OperationId {
|
||||
intf::get_random_u64()
|
||||
}
|
||||
|
||||
fn filter_peer_scope(&self, node_info: &NodeInfo) -> bool {
|
||||
// if local peer scope is enabled, then don't reject any peer info
|
||||
if self.enable_local_peer_scope {
|
||||
@@ -420,35 +372,18 @@ impl RPCProcessor {
|
||||
out
|
||||
}
|
||||
|
||||
// Issue a request over the network, possibly using an anonymized route
|
||||
#[instrument(level = "debug", skip(self, message, safety_route_spec), err)]
|
||||
async fn request<T: capnp::message::ReaderSegments>(
|
||||
&self,
|
||||
dest: Destination,
|
||||
message: capnp::message::Reader<T>,
|
||||
safety_route_spec: Option<&SafetyRouteSpec>,
|
||||
) -> Result<Option<WaitableReply>, RPCError> {
|
||||
|
||||
let info = {
|
||||
let operation = message
|
||||
.get_root::<veilid_capnp::operation::Reader>()
|
||||
.map_err(map_error_internal!("invalid operation"))
|
||||
.map_err(logthru_rpc!(error))?;
|
||||
RPCOperationInfo::parse()
|
||||
}
|
||||
let (op_id, wants_answer) = {
|
||||
|
||||
let op_id = operation.get_op_id();
|
||||
let wants_answer = self.wants_answer(&operation).map_err(logthru_rpc!())?;
|
||||
|
||||
(op_id, wants_answer)
|
||||
};
|
||||
#[instrument(level = "debug", skip(self, operation, safety_route_spec), err)]
|
||||
fn render_operation(&self, dest: Destination, operation: &RPCOperation, safety_route_spec: Option<&SafetyRouteSpec>) -> Result<RenderedOperation, RPCError>
|
||||
{
|
||||
// Encode message to a builder and make a message reader for it
|
||||
let mut msg_builder = ::capnp::message::Builder::new_default();
|
||||
let mut op_builder = msg_builder.init_root::<veilid_capnp::operation::Builder>();
|
||||
operation.encode(&mut op_builder)?;
|
||||
|
||||
// Create envelope data
|
||||
let out_node_id; // Envelope Node Id
|
||||
let mut out_noderef: Option<NodeRef> = None; // Node to send envelope to
|
||||
let hopcount: usize; // Total safety + private route hop count
|
||||
|
||||
// Create envelope data
|
||||
let out = {
|
||||
let out; // Envelope data
|
||||
|
||||
@@ -471,7 +406,7 @@ impl RPCProcessor {
|
||||
None => {
|
||||
// If no safety route is being used, and we're not sending to a private
|
||||
// route, we can use a direct envelope instead of routing
|
||||
out = reader_to_vec(&message)?;
|
||||
out = builder_to_vec(msg_builder)?;
|
||||
|
||||
// Message goes directly to the node
|
||||
out_node_id = node_id;
|
||||
@@ -485,7 +420,7 @@ impl RPCProcessor {
|
||||
let private_route =
|
||||
self.new_stub_private_route(node_id, &mut pr_builder)?;
|
||||
|
||||
let message_vec = reader_to_vec(&message)?;
|
||||
let message_vec = builder_to_vec(msg_builder)?;
|
||||
// first
|
||||
out_node_id = sr
|
||||
.hops
|
||||
@@ -511,7 +446,7 @@ impl RPCProcessor {
|
||||
let pr_reader = pr_builder.into_reader();
|
||||
|
||||
// Reply with 'route' operation
|
||||
let message_vec = reader_to_vec(&message)?;
|
||||
let message_vec = builder_to_vec(msg_builder)?;
|
||||
out_node_id = match safety_route_spec {
|
||||
None => {
|
||||
// If no safety route, the first node is the first hop of the private route
|
||||
@@ -546,9 +481,35 @@ impl RPCProcessor {
|
||||
if hopcount > self.inner.lock().max_route_hop_count {
|
||||
return Err(rpc_error_internal("hop count too long for route"))
|
||||
.map_err(logthru_rpc!(warn));
|
||||
}
|
||||
// calculate actual timeout
|
||||
// timeout is number of hops times the timeout per hop
|
||||
}
|
||||
|
||||
Ok(RenderedOperation {
|
||||
out,
|
||||
out_node_id,
|
||||
out_noderef,
|
||||
hopcount,
|
||||
})
|
||||
}
|
||||
|
||||
// Issue a question over the network, possibly using an anonymized route
|
||||
#[instrument(level = "debug", skip(self, question, safety_route_spec), err)]
|
||||
async fn question(
|
||||
&self,
|
||||
dest: Destination,
|
||||
question: RPCQuestion,
|
||||
safety_route_spec: Option<&SafetyRouteSpec>,
|
||||
) -> Result<WaitableReply, RPCError> {
|
||||
|
||||
// Wrap question in operation
|
||||
let operation = RPCOperation::new_question(question);
|
||||
|
||||
// Produce rendered operation
|
||||
let RenderedOperation {
|
||||
out, out_node_id, out_noderef, hopcount
|
||||
} = self.render_operation(dest, &operation, safety_route_spec)?;
|
||||
|
||||
// Calculate answer timeout
|
||||
// Timeout is number of hops times the timeout per hop
|
||||
let timeout = self.inner.lock().timeout * (hopcount as u64);
|
||||
|
||||
// if we need to resolve the first hop, do it
|
||||
@@ -565,19 +526,14 @@ impl RPCProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
// set up op id eventual
|
||||
let eventual = if wants_answer {
|
||||
Some(self.add_op_id_waiter(op_id))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// Set up op id eventual
|
||||
let op_id = operation.op_id();
|
||||
let eventual = self.add_op_id_waiter(op_id);
|
||||
|
||||
// Log rpc receive
|
||||
debug!(target: "rpc_message", dir = "send", is_q, kind = Self::get_rpc_operation_detail_debug_info(&which_reader), op_id = operation.get_op_id(), sender_id = msg.header.envelope.get_sender_id().encode());
|
||||
// Log rpc send
|
||||
debug!(target: "rpc_message", dir = "send", kind = "question", op_id, desc = operation.kind().desc());
|
||||
|
||||
log_rpc!(debug "==>> REQUEST({}) -> {:?}", self.get_rpc_message_debug_info(&message), dest);
|
||||
|
||||
// send question
|
||||
// Send question
|
||||
let bytes = out.len() as u64;
|
||||
let send_ts = intf::get_timestamp();
|
||||
let send_data_kind = match self
|
||||
@@ -589,12 +545,10 @@ impl RPCProcessor {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
// Make sure to clean up op id waiter in case of error
|
||||
if eventual.is_some() {
|
||||
self.cancel_op_id_waiter(op_id);
|
||||
}
|
||||
self.cancel_op_id_waiter(op_id);
|
||||
|
||||
self.routing_table()
|
||||
.stats_failed_to_send(node_ref, send_ts, wants_answer);
|
||||
.stats_failed_to_send(node_ref, send_ts, true);
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
@@ -602,29 +556,25 @@ impl RPCProcessor {
|
||||
|
||||
// Successfully sent
|
||||
self.routing_table()
|
||||
.stats_question_sent(node_ref.clone(), send_ts, bytes, wants_answer);
|
||||
.stats_question_sent(node_ref.clone(), send_ts, bytes, true);
|
||||
|
||||
// Pass back waitable reply completion
|
||||
match eventual {
|
||||
None => {
|
||||
// if we don't want an answer, don't wait for one
|
||||
Ok(None)
|
||||
}
|
||||
Some(eventual) => Ok(Some(WaitableReply {
|
||||
op_id,
|
||||
eventual,
|
||||
timeout,
|
||||
node_ref,
|
||||
send_ts,
|
||||
send_data_kind,
|
||||
})),
|
||||
}
|
||||
Ok(WaitableReply {
|
||||
op_id,
|
||||
eventual,
|
||||
timeout,
|
||||
node_ref,
|
||||
send_ts,
|
||||
send_data_kind,
|
||||
})
|
||||
}
|
||||
|
||||
xxxx continue here, make 'statement' sender
|
||||
|
||||
// Issue a reply over the network, possibly using an anonymized route
|
||||
// The request must want a response, or this routine fails
|
||||
#[instrument(level = "debug", skip(self, request_rpcreader, reply_msg, safety_route_spec), err)]
|
||||
async fn reply<T: capnp::message::ReaderSegments>(
|
||||
async fn answer<T: capnp::message::ReaderSegments>(
|
||||
&self,
|
||||
request_rpcreader: RPCMessageReader,
|
||||
reply_msg: capnp::message::Reader<T>,
|
||||
@@ -1497,25 +1447,17 @@ impl RPCProcessor {
|
||||
// Send StatusQ RPC request, receive StatusA answer
|
||||
// Can be sent via relays, but not via routes
|
||||
pub async fn rpc_call_status(self, peer: NodeRef) -> Result<StatusAnswer, RPCError> {
|
||||
let status_q_msg = {
|
||||
let mut status_q_msg = ::capnp::message::Builder::new_default();
|
||||
let mut question = status_q_msg.init_root::<veilid_capnp::operation::Builder>();
|
||||
question.set_op_id(self.get_next_op_id());
|
||||
let mut respond_to = question.reborrow().init_respond_to();
|
||||
self.make_respond_to_sender(peer.clone())
|
||||
.encode(&mut respond_to)?;
|
||||
let detail = question.reborrow().init_detail();
|
||||
let mut sqb = detail.init_status_q();
|
||||
let mut node_status_builder = sqb.reborrow().init_node_status();
|
||||
let node_status = self.network_manager().generate_node_status();
|
||||
encode_node_status(&node_status, &mut node_status_builder)?;
|
||||
|
||||
status_q_msg.into_reader()
|
||||
let node_status = self.network_manager().generate_node_status();
|
||||
let status_q = RPCOperationStatusQ {
|
||||
node_status
|
||||
};
|
||||
let respond_to = self.make_respond_to_sender(peer.clone());
|
||||
let operation = RPCOperation::new_question(RPCQuestion::new(respond_to, RPCQuestionDetail::StatusQ(status_q)));
|
||||
|
||||
// Send the info request
|
||||
let waitable_reply = self
|
||||
.request(Destination::Direct(peer.clone()), status_q_msg, None)
|
||||
.request(Destination::Direct(peer.clone()), operation, None)
|
||||
.await?
|
||||
.unwrap();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user