networkresult

This commit is contained in:
John Smith
2022-07-20 09:39:38 -04:00
parent 400d7021d2
commit 39eb13f34d
32 changed files with 613 additions and 407 deletions

View File

@@ -120,7 +120,7 @@ where
#[derive(Debug)]
struct WaitableReply {
op_id: OperationId,
eventual: EventualValue<RPCMessage>,
eventual: EventualValue<(Option<Id>, RPCMessage)>,
timeout: u64,
node_ref: NodeRef,
send_ts: u64,
@@ -153,10 +153,10 @@ pub struct RPCProcessorInner {
routing_table: RoutingTable,
node_id: DHTKey,
node_id_secret: DHTKeySecret,
send_channel: Option<flume::Sender<RPCMessageEncoded>>,
send_channel: Option<flume::Sender<(Option<Id>, RPCMessageEncoded)>>,
timeout: u64,
max_route_hop_count: usize,
waiting_rpc_table: BTreeMap<OperationId, EventualValue<RPCMessage>>,
waiting_rpc_table: BTreeMap<OperationId, EventualValue<(Option<Id>, RPCMessage)>>,
stop_source: Option<StopSource>,
worker_join_handles: Vec<MustJoinHandle<()>>,
}
@@ -242,13 +242,14 @@ impl RPCProcessor {
//////////////////////////////////////////////////////////////////////
// Search the DHT for a single node closest to a key and add it to the routing table and return the node reference
// If no node was found in the timeout, this returns None
pub async fn search_dht_single_key(
&self,
_node_id: DHTKey,
_count: u32,
_fanout: u32,
_timeout: Option<u64>,
) -> Result<NodeRef, RPCError> {
) -> Result<Option<NodeRef>, RPCError> {
//let routing_table = self.routing_table();
// xxx find node but stop if we find the exact node we want
@@ -270,7 +271,10 @@ impl RPCProcessor {
// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference
// Note: This routine can possible be recursive, hence the SendPinBoxFuture async form
pub fn resolve_node(&self, node_id: DHTKey) -> SendPinBoxFuture<Result<NodeRef, RPCError>> {
pub fn resolve_node(
&self,
node_id: DHTKey,
) -> SendPinBoxFuture<Result<Option<NodeRef>, RPCError>> {
let this = self.clone();
Box::pin(async move {
let routing_table = this.routing_table();
@@ -280,7 +284,7 @@ impl RPCProcessor {
// ensure we have some dial info for the entry already,
// if not, we should do the find_node anyway
if nr.has_any_dial_info() {
return Ok(nr);
return Ok(Some(nr));
}
}
@@ -298,9 +302,11 @@ impl RPCProcessor {
.search_dht_single_key(node_id, count, fanout, timeout)
.await?;
if nr.node_id() != node_id {
// found a close node, but not exact within our configured resolve_node timeout
return Err(RPCError::Timeout).map_err(logthru_rpc!());
if let Some(nr) = &nr {
if nr.node_id() != node_id {
// found a close node, but not exact within our configured resolve_node timeout
return Ok(None);
}
}
Ok(nr)
@@ -308,7 +314,7 @@ impl RPCProcessor {
}
// set up wait for reply
fn add_op_id_waiter(&self, op_id: OperationId) -> EventualValue<RPCMessage> {
fn add_op_id_waiter(&self, op_id: OperationId) -> EventualValue<(Option<Id>, RPCMessage)> {
let mut inner = self.inner.lock();
let e = EventualValue::new();
inner.waiting_rpc_table.insert(op_id, e.clone());
@@ -322,6 +328,7 @@ impl RPCProcessor {
}
// complete the reply
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err)]
async fn complete_op_id_waiter(&self, msg: RPCMessage) -> Result<(), RPCError> {
let op_id = msg.operation.op_id();
let eventual = {
@@ -331,7 +338,7 @@ impl RPCProcessor {
.remove(&op_id)
.ok_or_else(RPCError::else_internal("Unmatched operation id"))?
};
eventual.resolve(msg).await;
eventual.resolve((Span::current().id(), msg)).await;
Ok(())
}
@@ -339,32 +346,38 @@ impl RPCProcessor {
async fn do_wait_for_reply(
&self,
waitable_reply: &WaitableReply,
) -> Result<(RPCMessage, u64), RPCError> {
) -> Result<TimeoutOr<(RPCMessage, u64)>, RPCError> {
let timeout_ms = u32::try_from(waitable_reply.timeout / 1000u64)
.map_err(RPCError::map_internal("invalid timeout"))?;
// wait for eventualvalue
let start_ts = intf::get_timestamp();
let res = intf::timeout(timeout_ms, waitable_reply.eventual.instance())
.await
.map_err(|_| RPCError::Timeout)?;
let rpcreader = res.take_value().unwrap();
let end_ts = intf::get_timestamp();
Ok((rpcreader, end_ts - start_ts))
.into_timeout_or();
Ok(res.map(|res| {
let (span_id, rpcreader) = res.take_value().unwrap();
let end_ts = intf::get_timestamp();
Span::current().follows_from(span_id);
(rpcreader, end_ts - start_ts)
}))
}
#[instrument(level = "trace", skip(self, waitable_reply), err)]
async fn wait_for_reply(
&self,
waitable_reply: WaitableReply,
) -> Result<(RPCMessage, u64), RPCError> {
) -> Result<TimeoutOr<(RPCMessage, u64)>, RPCError> {
let out = self.do_wait_for_reply(&waitable_reply).await;
match &out {
Err(_) => {
Err(_) | Ok(TimeoutOr::Timeout) => {
self.cancel_op_id_waiter(waitable_reply.op_id);
self.routing_table()
.stats_question_lost(waitable_reply.node_ref.clone());
}
Ok((rpcreader, _)) => {
Ok(TimeoutOr::Value((rpcreader, _))) => {
// Note that we definitely received this node info since we got a reply
waitable_reply.node_ref.set_seen_our_node_info();
@@ -522,7 +535,7 @@ impl RPCProcessor {
dest: Destination,
question: RPCQuestion,
safety_route_spec: Option<&SafetyRouteSpec>,
) -> Result<WaitableReply, RPCError> {
) -> Result<NetworkResult<WaitableReply>, RPCError> {
// Wrap question in operation
let operation = RPCOperation::new_question(question);
let op_id = operation.op_id();
@@ -540,16 +553,13 @@ impl RPCProcessor {
// If we need to resolve the first hop, do it
let node_ref = match node_ref {
None => {
// resolve node
self.resolve_node(node_id)
.await
.map_err(logthru_rpc!(error))?
}
Some(nr) => {
// got the node in the routing table already
nr
}
None => match self.resolve_node(node_id).await? {
None => {
return Ok(NetworkResult::no_connection_other(node_id));
}
Some(nr) => nr,
},
Some(nr) => nr,
};
// Calculate answer timeout
@@ -562,37 +572,31 @@ impl RPCProcessor {
// Send question
let bytes = message.len() as u64;
let send_ts = intf::get_timestamp();
let send_data_kind = match self
let send_data_kind = network_result_try!(self
.network_manager()
.send_envelope(node_ref.clone(), Some(node_id), message)
.await
.map_err(RPCError::internal)
{
Ok(v) => v,
Err(e) => {
.map_err(RPCError::network)? => {
// Make sure to clean up op id waiter in case of error
self.cancel_op_id_waiter(op_id);
self.routing_table()
.stats_failed_to_send(node_ref, send_ts, true);
return Err(e);
}
};
);
// Successfully sent
self.routing_table()
.stats_question_sent(node_ref.clone(), send_ts, bytes, true);
// Pass back waitable reply completion
Ok(WaitableReply {
Ok(NetworkResult::value(WaitableReply {
op_id,
eventual,
timeout,
node_ref,
send_ts,
send_data_kind,
})
}))
}
// Issue a statement over the network, possibly using an anonymized route
@@ -602,7 +606,7 @@ impl RPCProcessor {
dest: Destination,
statement: RPCStatement,
safety_route_spec: Option<&SafetyRouteSpec>,
) -> Result<(), RPCError> {
) -> Result<NetworkResult<()>, RPCError> {
// Wrap statement in operation
let operation = RPCOperation::new_statement(statement);
@@ -619,40 +623,33 @@ impl RPCProcessor {
// If we need to resolve the first hop, do it
let node_ref = match node_ref {
None => {
// resolve node
self.resolve_node(node_id)
.await
.map_err(logthru_rpc!(error))?
}
Some(nr) => {
// got the node in the routing table already
nr
}
None => match self.resolve_node(node_id).await? {
None => {
return Ok(NetworkResult::no_connection_other(node_id));
}
Some(nr) => nr,
},
Some(nr) => nr,
};
// Send statement
let bytes = message.len() as u64;
let send_ts = intf::get_timestamp();
let _send_data_kind = match self
let _send_data_kind = network_result_try!(self
.network_manager()
.send_envelope(node_ref.clone(), Some(node_id), message)
.await
.map_err(RPCError::network)
{
Ok(v) => v,
Err(e) => {
.map_err(RPCError::network)? => {
self.routing_table()
.stats_failed_to_send(node_ref, send_ts, true);
return Err(e);
}
};
);
// Successfully sent
self.routing_table()
.stats_question_sent(node_ref.clone(), send_ts, bytes, true);
Ok(())
Ok(NetworkResult::value(()))
}
// Convert the 'RespondTo' into a 'Destination' for a response
@@ -694,7 +691,7 @@ impl RPCProcessor {
request: RPCMessage,
answer: RPCAnswer,
safety_route_spec: Option<&SafetyRouteSpec>,
) -> Result<(), RPCError> {
) -> Result<NetworkResult<()>, RPCError> {
// Wrap answer in operation
let operation = RPCOperation::new_answer(&request.operation, answer);
@@ -714,33 +711,31 @@ impl RPCProcessor {
// If we need to resolve the first hop, do it
let node_ref = match node_ref {
None => {
// resolve node
self.resolve_node(node_id).await?
}
Some(nr) => {
// got the node in the routing table already
nr
}
None => match self.resolve_node(node_id).await? {
None => {
return Ok(NetworkResult::no_connection_other(node_id));
}
Some(nr) => nr,
},
Some(nr) => nr,
};
// Send the reply
let bytes = message.len() as u64;
let send_ts = intf::get_timestamp();
self.network_manager()
network_result_try!(self.network_manager()
.send_envelope(node_ref.clone(), Some(node_id), message)
.await
.map_err(RPCError::network)
.map_err(|e| {
.map_err(RPCError::network)? => {
self.routing_table()
.stats_failed_to_send(node_ref.clone(), send_ts, false);
e
})?;
}
);
// Reply successfully sent
self.routing_table().stats_answer_sent(node_ref, bytes);
Ok(())
Ok(NetworkResult::value(()))
}
async fn generate_sender_info(peer_noderef: NodeRef) -> SenderInfo {
@@ -752,6 +747,7 @@ impl RPCProcessor {
}
//////////////////////////////////////////////////////////////////////
#[instrument(level = "trace", skip(self, encoded_msg), err)]
async fn process_rpc_message_version_0(
&self,
encoded_msg: RPCMessageEncoded,
@@ -781,12 +777,9 @@ impl RPCProcessor {
"respond_to_sender_signed_node_info has invalid peer scope",
));
}
let nr = self
opt_sender_nr = self
.routing_table()
.register_node_with_signed_node_info(sender_node_id, sender_ni.clone())
.map_err(map_to_string)
.map_err(RPCError::Internal)?;
opt_sender_nr = Some(nr);
.register_node_with_signed_node_info(sender_node_id, sender_ni.clone());
}
_ => {}
}
@@ -864,6 +857,7 @@ impl RPCProcessor {
}
}
#[instrument(level = "trace", skip(self, msg), err)]
async fn process_rpc_message(&self, msg: RPCMessageEncoded) -> Result<(), RPCError> {
if msg.header.envelope.get_version() == 0 {
self.process_rpc_message_version_0(msg).await
@@ -875,8 +869,18 @@ impl RPCProcessor {
}
}
async fn rpc_worker(self, stop_token: StopToken, receiver: flume::Receiver<RPCMessageEncoded>) {
while let Ok(Ok(msg)) = receiver.recv_async().timeout_at(stop_token.clone()).await {
async fn rpc_worker(
self,
stop_token: StopToken,
receiver: flume::Receiver<(Option<Id>, RPCMessageEncoded)>,
) {
while let Ok(Ok((span_id, msg))) =
receiver.recv_async().timeout_at(stop_token.clone()).await
{
let rpc_worker_span = span!(parent: span_id, Level::TRACE, "rpc_worker");
//rpc_worker_span.follows_from(span_id);
let _enter = rpc_worker_span.enter();
let _ = self
.process_rpc_message(msg)
.await
@@ -963,6 +967,7 @@ impl RPCProcessor {
debug!("finished rpc processor shutdown");
}
#[instrument(level = "trace", skip(self, body), err)]
pub fn enqueue_message(
&self,
envelope: Envelope,
@@ -982,8 +987,9 @@ impl RPCProcessor {
let inner = self.inner.lock();
inner.send_channel.as_ref().unwrap().clone()
};
let span_id = Span::current().id();
send_channel
.try_send(msg)
.try_send((span_id, msg))
.wrap_err("failed to enqueue received RPC message")?;
Ok(())
}