debugging, add async_tag_lock
This commit is contained in:
@@ -1321,88 +1321,93 @@ impl NetworkManager {
|
||||
data: Vec<u8>,
|
||||
) -> SendPinBoxFuture<EyreResult<NetworkResult<SendDataKind>>> {
|
||||
let this = self.clone();
|
||||
Box::pin(async move {
|
||||
// info!("{}", format!("send_data to: {:?}", node_ref).red());
|
||||
Box::pin(
|
||||
async move {
|
||||
// info!("{}", format!("send_data to: {:?}", node_ref).red());
|
||||
|
||||
// First try to send data to the last socket we've seen this peer on
|
||||
let data = if let Some(connection_descriptor) = node_ref.last_connection() {
|
||||
// info!(
|
||||
// "{}",
|
||||
// format!("last_connection to: {:?}", connection_descriptor).red()
|
||||
// );
|
||||
// First try to send data to the last socket we've seen this peer on
|
||||
let data = if let Some(connection_descriptor) = node_ref.last_connection() {
|
||||
// info!(
|
||||
// "{}",
|
||||
// format!("last_connection to: {:?}", connection_descriptor).red()
|
||||
// );
|
||||
|
||||
match this
|
||||
.net()
|
||||
.send_data_to_existing_connection(connection_descriptor, data)
|
||||
.await?
|
||||
{
|
||||
None => {
|
||||
// info!(
|
||||
// "{}",
|
||||
// format!("sent to existing connection: {:?}", connection_descriptor)
|
||||
// .red()
|
||||
// );
|
||||
match this
|
||||
.net()
|
||||
.send_data_to_existing_connection(connection_descriptor, data)
|
||||
.await?
|
||||
{
|
||||
None => {
|
||||
// info!(
|
||||
// "{}",
|
||||
// format!("sent to existing connection: {:?}", connection_descriptor)
|
||||
// .red()
|
||||
// );
|
||||
|
||||
// Update timestamp for this last connection since we just sent to it
|
||||
// Update timestamp for this last connection since we just sent to it
|
||||
node_ref
|
||||
.set_last_connection(connection_descriptor, intf::get_timestamp());
|
||||
|
||||
return Ok(NetworkResult::value(SendDataKind::Existing(
|
||||
connection_descriptor,
|
||||
)));
|
||||
}
|
||||
Some(d) => d,
|
||||
}
|
||||
} else {
|
||||
data
|
||||
};
|
||||
|
||||
// info!("{}", "no existing connection".red());
|
||||
|
||||
// If we don't have last_connection, try to reach out to the peer via its dial info
|
||||
let contact_method = this.get_contact_method(node_ref.clone());
|
||||
log_net!(
|
||||
"send_data via {:?} to dialinfo {:?}",
|
||||
contact_method,
|
||||
node_ref
|
||||
);
|
||||
match contact_method {
|
||||
ContactMethod::OutboundRelay(relay_nr)
|
||||
| ContactMethod::InboundRelay(relay_nr) => {
|
||||
network_result_try!(this.send_data(relay_nr, data).await?);
|
||||
Ok(NetworkResult::value(SendDataKind::Indirect))
|
||||
}
|
||||
ContactMethod::Direct(dial_info) => {
|
||||
let connection_descriptor = network_result_try!(
|
||||
this.net().send_data_to_dial_info(dial_info, data).await?
|
||||
);
|
||||
// If we connected to this node directly, save off the last connection so we can use it again
|
||||
node_ref.set_last_connection(connection_descriptor, intf::get_timestamp());
|
||||
|
||||
return Ok(NetworkResult::value(SendDataKind::Existing(
|
||||
Ok(NetworkResult::value(SendDataKind::Direct(
|
||||
connection_descriptor,
|
||||
)));
|
||||
)))
|
||||
}
|
||||
Some(d) => d,
|
||||
ContactMethod::SignalReverse(relay_nr, target_node_ref) => {
|
||||
let connection_descriptor = network_result_try!(
|
||||
this.do_reverse_connect(relay_nr, target_node_ref, data)
|
||||
.await?
|
||||
);
|
||||
Ok(NetworkResult::value(SendDataKind::Direct(
|
||||
connection_descriptor,
|
||||
)))
|
||||
}
|
||||
ContactMethod::SignalHolePunch(relay_nr, target_node_ref) => {
|
||||
let connection_descriptor = network_result_try!(
|
||||
this.do_hole_punch(relay_nr, target_node_ref, data).await?
|
||||
);
|
||||
Ok(NetworkResult::value(SendDataKind::Direct(
|
||||
connection_descriptor,
|
||||
)))
|
||||
}
|
||||
ContactMethod::Unreachable => Ok(NetworkResult::no_connection_other(
|
||||
"Can't send to this node",
|
||||
)),
|
||||
}
|
||||
} else {
|
||||
data
|
||||
};
|
||||
|
||||
// info!("{}", "no existing connection".red());
|
||||
|
||||
// If we don't have last_connection, try to reach out to the peer via its dial info
|
||||
let contact_method = this.get_contact_method(node_ref.clone());
|
||||
log_net!(
|
||||
"send_data via {:?} to dialinfo {:?}",
|
||||
contact_method,
|
||||
node_ref
|
||||
);
|
||||
match contact_method {
|
||||
ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => {
|
||||
network_result_try!(this.send_data(relay_nr, data).await?);
|
||||
Ok(NetworkResult::value(SendDataKind::Indirect))
|
||||
}
|
||||
ContactMethod::Direct(dial_info) => {
|
||||
let connection_descriptor = network_result_try!(
|
||||
this.net().send_data_to_dial_info(dial_info, data).await?
|
||||
);
|
||||
// If we connected to this node directly, save off the last connection so we can use it again
|
||||
node_ref.set_last_connection(connection_descriptor, intf::get_timestamp());
|
||||
|
||||
Ok(NetworkResult::value(SendDataKind::Direct(
|
||||
connection_descriptor,
|
||||
)))
|
||||
}
|
||||
ContactMethod::SignalReverse(relay_nr, target_node_ref) => {
|
||||
let connection_descriptor = network_result_try!(
|
||||
this.do_reverse_connect(relay_nr, target_node_ref, data)
|
||||
.await?
|
||||
);
|
||||
Ok(NetworkResult::value(SendDataKind::Direct(
|
||||
connection_descriptor,
|
||||
)))
|
||||
}
|
||||
ContactMethod::SignalHolePunch(relay_nr, target_node_ref) => {
|
||||
let connection_descriptor = network_result_try!(
|
||||
this.do_hole_punch(relay_nr, target_node_ref, data).await?
|
||||
);
|
||||
Ok(NetworkResult::value(SendDataKind::Direct(
|
||||
connection_descriptor,
|
||||
)))
|
||||
}
|
||||
ContactMethod::Unreachable => Ok(NetworkResult::no_connection_other(
|
||||
"Can't send to this node",
|
||||
)),
|
||||
}
|
||||
})
|
||||
.instrument(trace_span!("send_data")),
|
||||
)
|
||||
}
|
||||
|
||||
// Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism)
|
||||
@@ -1466,6 +1471,7 @@ impl NetworkManager {
|
||||
// Called when a packet potentially containing an RPC envelope is received by a low-level
|
||||
// network protocol handler. Processes the envelope, authenticates and decrypts the RPC message
|
||||
// and passes it to the RPC handler
|
||||
#[instrument(level = "trace", ret, err, skip(self, data), fields(data.len = data.len()))]
|
||||
async fn on_recv_envelope(
|
||||
&self,
|
||||
data: &[u8],
|
||||
|
||||
Reference in New Issue
Block a user