checkpoint
This commit is contained in:
@@ -140,10 +140,10 @@ impl<T> Answer<T> {
|
||||
}
|
||||
|
||||
struct RenderedOperation {
|
||||
out: Vec<u8>, // The rendered operation bytes
|
||||
out_node_id: DHTKey, // Node id we're sending to
|
||||
out_noderef: Option<NodeRef>, // Node to send envelope to (may not be destination node id in case of relay)
|
||||
hopcount: usize, // Total safety + private route hop count
|
||||
message: Vec<u8>, // The rendered operation bytes
|
||||
node_id: DHTKey, // Node id we're sending to
|
||||
node_ref: Option<NodeRef>, // Node to send envelope to (may not be destination node id in case of relay)
|
||||
hop_count: usize, // Total safety + private route hop count
|
||||
}
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
|
||||
@@ -400,6 +400,9 @@ impl RPCProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
// Produce a byte buffer that represents the wire encoding of the entire
|
||||
// unencrypted envelope body for a RPC message. This incorporates
|
||||
// wrapping a private and/or safety route if they are specified.
|
||||
#[instrument(level = "debug", skip(self, operation, safety_route_spec), err)]
|
||||
fn render_operation(
|
||||
&self,
|
||||
@@ -407,110 +410,108 @@ impl RPCProcessor {
|
||||
operation: &RPCOperation,
|
||||
safety_route_spec: Option<&SafetyRouteSpec>,
|
||||
) -> Result<RenderedOperation, RPCError> {
|
||||
// Encode message to a builder and make a message reader for it
|
||||
let mut msg_builder = ::capnp::message::Builder::new_default();
|
||||
let mut op_builder = msg_builder.init_root::<veilid_capnp::operation::Builder>();
|
||||
operation.encode(&mut op_builder)?;
|
||||
|
||||
// Create envelope data
|
||||
let out_node_id; // Envelope Node Id
|
||||
let mut out_noderef: Option<NodeRef> = None; // Node to send envelope to
|
||||
let hopcount: usize; // Total safety + private route hop count
|
||||
let out = {
|
||||
let out; // Envelope data
|
||||
let mut out_node_ref: Option<NodeRef> = None; // Node to send envelope to
|
||||
let out_hop_count: usize; // Total safety + private route hop count
|
||||
let out_message; // Envelope data
|
||||
|
||||
// To where are we sending the request
|
||||
match dest {
|
||||
Destination::Direct(node_ref) | Destination::Relay(node_ref, _) => {
|
||||
// Send to a node without a private route
|
||||
// --------------------------------------
|
||||
// Encode message to a builder and make a message reader for it
|
||||
// Then produce the message as an unencrypted byte buffer
|
||||
let message_vec = {
|
||||
let mut msg_builder = ::capnp::message::Builder::new_default();
|
||||
let mut op_builder = msg_builder.init_root::<veilid_capnp::operation::Builder>();
|
||||
operation.encode(&mut op_builder)?;
|
||||
builder_to_vec(msg_builder)?
|
||||
};
|
||||
|
||||
// Get the actual destination node id accounting for relays
|
||||
let (node_ref, node_id) = if let Destination::Relay(_, dht_key) = dest {
|
||||
(node_ref.clone(), dht_key.clone())
|
||||
} else {
|
||||
let node_id = node_ref.node_id();
|
||||
(node_ref.clone(), node_id)
|
||||
};
|
||||
// To where are we sending the request
|
||||
match dest {
|
||||
Destination::Direct(node_ref) | Destination::Relay(node_ref, _) => {
|
||||
// Send to a node without a private route
|
||||
// --------------------------------------
|
||||
|
||||
// Handle the existence of safety route
|
||||
match safety_route_spec {
|
||||
None => {
|
||||
// If no safety route is being used, and we're not sending to a private
|
||||
// route, we can use a direct envelope instead of routing
|
||||
out = builder_to_vec(msg_builder)?;
|
||||
// Get the actual destination node id accounting for relays
|
||||
let (node_ref, node_id) = if let Destination::Relay(_, dht_key) = dest {
|
||||
(node_ref.clone(), dht_key.clone())
|
||||
} else {
|
||||
let node_id = node_ref.node_id();
|
||||
(node_ref.clone(), node_id)
|
||||
};
|
||||
|
||||
// Message goes directly to the node
|
||||
out_node_id = node_id;
|
||||
out_noderef = Some(node_ref);
|
||||
hopcount = 1;
|
||||
}
|
||||
Some(sr) => {
|
||||
// No private route was specified for the request
|
||||
// but we are using a safety route, so we must create an empty private route
|
||||
let mut pr_builder = ::capnp::message::Builder::new_default();
|
||||
let private_route = PrivateRoute::new_stub(node_id);
|
||||
// Handle the existence of safety route
|
||||
match safety_route_spec {
|
||||
None => {
|
||||
// If no safety route is being used, and we're not sending to a private
|
||||
// route, we can use a direct envelope instead of routing
|
||||
out_message = message_vec;
|
||||
|
||||
let message_vec = builder_to_vec(msg_builder)?;
|
||||
// first
|
||||
out_node_id = sr
|
||||
.hops
|
||||
.first()
|
||||
.ok_or_else(|| rpc_error_internal("no hop in safety route"))?
|
||||
.dial_info
|
||||
.node_id
|
||||
.key;
|
||||
out = self.wrap_with_route(Some(sr), private_route, message_vec)?;
|
||||
hopcount = 1 + sr.hops.len();
|
||||
}
|
||||
};
|
||||
}
|
||||
Destination::PrivateRoute(private_route) => {
|
||||
// Send to private route
|
||||
// ---------------------
|
||||
// Reply with 'route' operation
|
||||
let message_vec = builder_to_vec(msg_builder)?;
|
||||
out_node_id = match safety_route_spec {
|
||||
None => {
|
||||
// If no safety route, the first node is the first hop of the private route
|
||||
hopcount = private_route.hop_count as usize;
|
||||
let out_node_id = match &private_route.hops {
|
||||
Some(rh) => rh.dial_info.node_id.key,
|
||||
_ => return Err(rpc_error_internal("private route has no hops")),
|
||||
};
|
||||
out = self.wrap_with_route(None, private_route, message_vec)?;
|
||||
out_node_id
|
||||
}
|
||||
Some(sr) => {
|
||||
// If safety route is in use, first node is the first hop of the safety route
|
||||
hopcount = 1 + sr.hops.len() + (private_route.hop_count as usize);
|
||||
let out_node_id = sr
|
||||
.hops
|
||||
.first()
|
||||
.ok_or_else(|| rpc_error_internal("no hop in safety route"))?
|
||||
.dial_info
|
||||
.node_id
|
||||
.key;
|
||||
out = self.wrap_with_route(Some(sr), private_route, message_vec)?;
|
||||
out_node_id
|
||||
}
|
||||
// Message goes directly to the node
|
||||
out_node_id = node_id;
|
||||
out_node_ref = Some(node_ref);
|
||||
out_hop_count = 1;
|
||||
}
|
||||
Some(sr) => {
|
||||
// No private route was specified for the request
|
||||
// but we are using a safety route, so we must create an empty private route
|
||||
let mut pr_builder = ::capnp::message::Builder::new_default();
|
||||
let private_route = PrivateRoute::new_stub(node_id);
|
||||
|
||||
// first
|
||||
out_node_id = sr
|
||||
.hops
|
||||
.first()
|
||||
.ok_or_else(|| rpc_error_internal("no hop in safety route"))?
|
||||
.dial_info
|
||||
.node_id
|
||||
.key;
|
||||
out_message = self.wrap_with_route(Some(sr), private_route, message_vec)?;
|
||||
out_hop_count = 1 + sr.hops.len();
|
||||
}
|
||||
};
|
||||
}
|
||||
Destination::PrivateRoute(private_route) => {
|
||||
// Send to private route
|
||||
// ---------------------
|
||||
// Reply with 'route' operation
|
||||
out_node_id = match safety_route_spec {
|
||||
None => {
|
||||
// If no safety route, the first node is the first hop of the private route
|
||||
out_hop_count = private_route.hop_count as usize;
|
||||
let out_node_id = match &private_route.hops {
|
||||
Some(rh) => rh.dial_info.node_id.key,
|
||||
_ => return Err(rpc_error_internal("private route has no hops")),
|
||||
};
|
||||
out_message = self.wrap_with_route(None, private_route, message_vec)?;
|
||||
out_node_id
|
||||
}
|
||||
Some(sr) => {
|
||||
// If safety route is in use, first node is the first hop of the safety route
|
||||
out_hop_count = 1 + sr.hops.len() + (private_route.hop_count as usize);
|
||||
let out_node_id = sr
|
||||
.hops
|
||||
.first()
|
||||
.ok_or_else(|| rpc_error_internal("no hop in safety route"))?
|
||||
.dial_info
|
||||
.node_id
|
||||
.key;
|
||||
out_message = self.wrap_with_route(Some(sr), private_route, message_vec)?;
|
||||
out_node_id
|
||||
}
|
||||
}
|
||||
}
|
||||
out
|
||||
};
|
||||
}
|
||||
|
||||
// Verify hop count isn't larger than out maximum routed hop count
|
||||
if hopcount > self.inner.lock().max_route_hop_count {
|
||||
if out_hop_count > self.inner.lock().max_route_hop_count {
|
||||
return Err(rpc_error_internal("hop count too long for route"))
|
||||
.map_err(logthru_rpc!(warn));
|
||||
}
|
||||
|
||||
Ok(RenderedOperation {
|
||||
out,
|
||||
out_node_id,
|
||||
out_noderef,
|
||||
hopcount,
|
||||
message: out_message,
|
||||
node_id: out_node_id,
|
||||
node_ref: out_node_ref,
|
||||
hop_count: out_hop_count,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -531,21 +532,17 @@ impl RPCProcessor {
|
||||
|
||||
// Produce rendered operation
|
||||
let RenderedOperation {
|
||||
out,
|
||||
out_node_id,
|
||||
out_noderef,
|
||||
hopcount,
|
||||
message,
|
||||
node_id,
|
||||
node_ref,
|
||||
hop_count,
|
||||
} = self.render_operation(dest, &operation, safety_route_spec)?;
|
||||
|
||||
// Calculate answer timeout
|
||||
// Timeout is number of hops times the timeout per hop
|
||||
let timeout = self.inner.lock().timeout * (hopcount as u64);
|
||||
|
||||
// If we need to resolve the first hop, do it
|
||||
let node_ref = match out_noderef {
|
||||
let node_ref = match node_ref {
|
||||
None => {
|
||||
// resolve node
|
||||
self.resolve_node(out_node_id)
|
||||
self.resolve_node(node_id)
|
||||
.await
|
||||
.map_err(logthru_rpc!(error))?
|
||||
}
|
||||
@@ -555,15 +552,19 @@ impl RPCProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
// Calculate answer timeout
|
||||
// Timeout is number of hops times the timeout per hop
|
||||
let timeout = self.inner.lock().timeout * (hop_count as u64);
|
||||
|
||||
// Set up op id eventual
|
||||
let eventual = self.add_op_id_waiter(op_id);
|
||||
|
||||
// Send question
|
||||
let bytes = out.len() as u64;
|
||||
let bytes = message.len() as u64;
|
||||
let send_ts = intf::get_timestamp();
|
||||
let send_data_kind = match self
|
||||
.network_manager()
|
||||
.send_envelope(node_ref.clone(), Some(out_node_id), out)
|
||||
.send_envelope(node_ref.clone(), Some(node_id), message)
|
||||
.await
|
||||
.map_err(RPCError::Internal)
|
||||
{
|
||||
@@ -610,21 +611,17 @@ impl RPCProcessor {
|
||||
|
||||
// Produce rendered operation
|
||||
let RenderedOperation {
|
||||
out,
|
||||
out_node_id,
|
||||
out_noderef,
|
||||
hopcount,
|
||||
message,
|
||||
node_id,
|
||||
node_ref,
|
||||
hop_count,
|
||||
} = self.render_operation(dest, &operation, safety_route_spec)?;
|
||||
|
||||
// Calculate answer timeout
|
||||
// Timeout is number of hops times the timeout per hop
|
||||
let timeout = self.inner.lock().timeout * (hopcount as u64);
|
||||
|
||||
// If we need to resolve the first hop, do it
|
||||
let node_ref = match out_noderef {
|
||||
let node_ref = match node_ref {
|
||||
None => {
|
||||
// resolve node
|
||||
self.resolve_node(out_node_id)
|
||||
self.resolve_node(node_id)
|
||||
.await
|
||||
.map_err(logthru_rpc!(error))?
|
||||
}
|
||||
@@ -634,12 +631,16 @@ impl RPCProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
// Calculate answer timeout
|
||||
// Timeout is number of hops times the timeout per hop
|
||||
let timeout = self.inner.lock().timeout * (hop_count as u64);
|
||||
|
||||
// Send statement
|
||||
let bytes = out.len() as u64;
|
||||
let bytes = message.len() as u64;
|
||||
let send_ts = intf::get_timestamp();
|
||||
let send_data_kind = match self
|
||||
.network_manager()
|
||||
.send_envelope(node_ref.clone(), Some(out_node_id), out)
|
||||
.send_envelope(node_ref.clone(), Some(node_id), message)
|
||||
.await
|
||||
.map_err(RPCError::Internal)
|
||||
{
|
||||
@@ -709,17 +710,17 @@ impl RPCProcessor {
|
||||
|
||||
// Produce rendered operation
|
||||
let RenderedOperation {
|
||||
out,
|
||||
out_node_id,
|
||||
out_noderef,
|
||||
hopcount,
|
||||
message,
|
||||
node_id,
|
||||
node_ref,
|
||||
hop_count,
|
||||
} = self.render_operation(dest, &operation, safety_route_spec)?;
|
||||
|
||||
// If we need to resolve the first hop, do it
|
||||
let node_ref = match out_noderef {
|
||||
let node_ref = match node_ref {
|
||||
None => {
|
||||
// resolve node
|
||||
self.resolve_node(out_node_id).await?
|
||||
self.resolve_node(node_id).await?
|
||||
}
|
||||
Some(nr) => {
|
||||
// got the node in the routing table already
|
||||
@@ -728,10 +729,10 @@ impl RPCProcessor {
|
||||
};
|
||||
|
||||
// Send the reply
|
||||
let bytes = out.len() as u64;
|
||||
let bytes = message.len() as u64;
|
||||
let send_ts = intf::get_timestamp();
|
||||
self.network_manager()
|
||||
.send_envelope(node_ref.clone(), Some(out_node_id), out)
|
||||
.send_envelope(node_ref.clone(), Some(node_id), message)
|
||||
.await
|
||||
.map_err(RPCError::Internal)
|
||||
.map_err(|e| {
|
||||
|
||||
Reference in New Issue
Block a user