checkpoint

This commit is contained in:
John Smith
2022-07-04 17:58:26 -04:00
parent 6a87e32836
commit 9214bcf9a4
17 changed files with 276 additions and 476 deletions

View File

@@ -403,7 +403,7 @@ impl RPCProcessor {
#[instrument(level = "debug", skip(self, operation, safety_route_spec), err)]
fn render_operation(
&self,
dest: &Destination,
dest: Destination,
operation: &RPCOperation,
safety_route_spec: Option<&SafetyRouteSpec>,
) -> Result<RenderedOperation, RPCError> {
@@ -420,7 +420,7 @@ impl RPCProcessor {
let out; // Envelope data
// To where are we sending the request
match &dest {
match dest {
Destination::Direct(node_ref) | Destination::Relay(node_ref, _) => {
// Send to a node without a private route
// --------------------------------------
@@ -449,8 +449,7 @@ impl RPCProcessor {
// No private route was specified for the request
// but we are using a safety route, so we must create an empty private route
let mut pr_builder = ::capnp::message::Builder::new_default();
let private_route =
self.new_stub_private_route(node_id, &mut pr_builder)?;
let private_route = PrivateRoute::new_stub(node_id);
let message_vec = builder_to_vec(msg_builder)?;
// first
@@ -469,14 +468,6 @@ impl RPCProcessor {
Destination::PrivateRoute(private_route) => {
// Send to private route
// ---------------------
// Encode the private route
let mut pr_msg_builder = ::capnp::message::Builder::new_default();
let mut pr_builder =
pr_msg_builder.init_root::<veilid_capnp::private_route::Builder>();
encode_private_route(private_route, &mut pr_builder)?;
let pr_reader = pr_builder.into_reader();
// Reply with 'route' operation
let message_vec = builder_to_vec(msg_builder)?;
out_node_id = match safety_route_spec {
@@ -487,7 +478,7 @@ impl RPCProcessor {
Some(rh) => rh.dial_info.node_id.key,
_ => return Err(rpc_error_internal("private route has no hops")),
};
out = self.wrap_with_route(None, pr_reader, message_vec)?;
out = self.wrap_with_route(None, private_route, message_vec)?;
out_node_id
}
Some(sr) => {
@@ -500,7 +491,7 @@ impl RPCProcessor {
.dial_info
.node_id
.key;
out = self.wrap_with_route(Some(sr), pr_reader, message_vec)?;
out = self.wrap_with_route(Some(sr), private_route, message_vec)?;
out_node_id
}
}
@@ -533,6 +524,10 @@ impl RPCProcessor {
) -> Result<WaitableReply, RPCError> {
// Wrap question in operation
let operation = RPCOperation::new_question(question);
let op_id = operation.op_id();
// Log rpc send
debug!(target: "rpc_message", dir = "send", kind = "question", op_id, desc = operation.kind().desc(), ?dest);
// Produce rendered operation
let RenderedOperation {
@@ -540,7 +535,7 @@ impl RPCProcessor {
out_node_id,
out_noderef,
hopcount,
} = self.render_operation(&dest, &operation, safety_route_spec)?;
} = self.render_operation(dest, &operation, safety_route_spec)?;
// Calculate answer timeout
// Timeout is number of hops times the timeout per hop
@@ -561,12 +556,8 @@ impl RPCProcessor {
};
// Set up op id eventual
let op_id = operation.op_id();
let eventual = self.add_op_id_waiter(op_id);
// Log rpc send
debug!(target: "rpc_message", dir = "send", kind = "question", op_id, desc = operation.kind().desc(), ?dest);
// Send question
let bytes = out.len() as u64;
let send_ts = intf::get_timestamp();
@@ -614,13 +605,16 @@ impl RPCProcessor {
// Wrap statement in operation
let operation = RPCOperation::new_statement(statement);
// Log rpc send
debug!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest);
// Produce rendered operation
let RenderedOperation {
out,
out_node_id,
out_noderef,
hopcount,
} = self.render_operation(&dest, &operation, safety_route_spec)?;
} = self.render_operation(dest, &operation, safety_route_spec)?;
// Calculate answer timeout
// Timeout is number of hops times the timeout per hop
@@ -640,9 +634,6 @@ impl RPCProcessor {
}
};
// Log rpc send
debug!(target: "rpc_message", dir = "send", kind = "statement", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest);
// Send statement
let bytes = out.len() as u64;
let send_ts = intf::get_timestamp();
@@ -713,13 +704,16 @@ impl RPCProcessor {
// Extract destination from respond_to
let dest = self.get_respond_to_destination(&request);
// Log rpc send
debug!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest);
// Produce rendered operation
let RenderedOperation {
out,
out_node_id,
out_noderef,
hopcount,
} = self.render_operation(&dest, &operation, safety_route_spec)?;
} = self.render_operation(dest, &operation, safety_route_spec)?;
// If we need to resolve the first hop, do it
let node_ref = match out_noderef {
@@ -733,9 +727,6 @@ impl RPCProcessor {
}
};
// Log rpc send
debug!(target: "rpc_message", dir = "send", kind = "answer", op_id = operation.op_id(), desc = operation.kind().desc(), ?dest);
// Send the reply
let bytes = out.len() as u64;
let send_ts = intf::get_timestamp();
@@ -768,16 +759,18 @@ impl RPCProcessor {
&self,
encoded_msg: RPCMessageEncoded,
) -> Result<(), RPCError> {
// Make an operation reader
let reader = capnp::message::Reader::new(encoded_msg.data, Default::default());
// Decode the operation
let sender_node_id = encoded_msg.header.envelope.get_sender_id();
let operation = reader
.get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?;
// Decode the RPC message
let operation = RPCOperation::decode(&operation, &sender_node_id)?;
let operation = {
let reader = capnp::message::Reader::new(encoded_msg.data, Default::default());
let op_reader = reader
.get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?;
RPCOperation::decode(&op_reader, &sender_node_id)?
};
// Get the sender noderef, incorporating and 'sender node info' we have from a question
let mut opt_sender_nr: Option<NodeRef> = None;