checkpoint

This commit is contained in:
John Smith
2022-08-08 20:42:27 -04:00
parent 0204af263d
commit 6226845e9f
16 changed files with 345 additions and 219 deletions
+68 -58
View File
@@ -123,11 +123,13 @@ pub(crate) enum ContactMethod {
#[derive(Copy, Clone, Debug)]
pub enum SendDataKind {
LocalDirect,
GlobalDirect,
GlobalIndirect,
Direct(ConnectionDescriptor),
Indirect,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
struct PublicAddressCheckCacheKey(ProtocolType, AddressType);
// The mutable state of the network manager
struct NetworkManagerInner {
routing_table: Option<RoutingTable>,
@@ -136,7 +138,8 @@ struct NetworkManagerInner {
stats: NetworkManagerStats,
client_whitelist: LruCache<DHTKey, ClientWhitelistEntry>,
relay_node: Option<NodeRef>,
public_address_check_cache: LruCache<DHTKey, SocketAddress>,
public_address_check_cache:
BTreeMap<PublicAddressCheckCacheKey, LruCache<DHTKey, SocketAddress>>,
protocol_config: Option<ProtocolConfig>,
public_inbound_dial_info_filter: Option<DialInfoFilter>,
local_inbound_dial_info_filter: Option<DialInfoFilter>,
@@ -172,7 +175,7 @@ impl NetworkManager {
stats: NetworkManagerStats::default(),
client_whitelist: LruCache::new_unbounded(),
relay_node: None,
public_address_check_cache: LruCache::new(8),
public_address_check_cache: BTreeMap::new(),
protocol_config: None,
public_inbound_dial_info_filter: None,
local_inbound_dial_info_filter: None,
@@ -760,9 +763,6 @@ impl NetworkManager {
Some(nr) => nr,
};
// Remove any 'last connection' to this peer to ensure we start a new connection with the reverse connection
peer_nr.clear_last_connection();
// Make a reverse connection to the peer and send the receipt to it
rpc.rpc_call_return_receipt(Destination::Direct(peer_nr), None, receipt)
.await
@@ -786,9 +786,6 @@ impl NetworkManager {
Some(nr) => nr,
};
// Remove any 'last connection' to this peer to ensure we start a new connection with the hole punch
peer_nr.clear_last_connection();
// Get the udp direct dialinfo for the hole punch
let outbound_dif = self
.get_outbound_dial_info_filter(RoutingDomain::PublicInternet)
@@ -897,14 +894,7 @@ impl NetworkManager {
let out = self.build_envelope(envelope_node_id, version, body)?;
// Send the envelope via whatever means necessary
let send_data_kind = network_result_try!(self.send_data(node_ref.clone(), out).await?);
// If we asked to relay from the start, then this is always indirect
Ok(NetworkResult::value(if envelope_node_id != via_node_id {
SendDataKind::GlobalIndirect
} else {
send_data_kind
}))
self.send_data(node_ref.clone(), out).await
}
// Called by the RPC handler when we want to issue an direct receipt
@@ -1094,7 +1084,7 @@ impl NetworkManager {
relay_nr: NodeRef,
target_nr: NodeRef,
data: Vec<u8>,
) -> EyreResult<NetworkResult<()>> {
) -> EyreResult<NetworkResult<ConnectionDescriptor>> {
// Build a return receipt for the signal
let receipt_timeout =
ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms);
@@ -1143,7 +1133,7 @@ impl NetworkManager {
.send_data_to_existing_connection(descriptor, data)
.await?
{
None => Ok(NetworkResult::value(())),
None => Ok(NetworkResult::value(descriptor)),
Some(_) => Ok(NetworkResult::no_connection_other(
"unable to send over reverse connection",
)),
@@ -1161,11 +1151,11 @@ impl NetworkManager {
relay_nr: NodeRef,
target_nr: NodeRef,
data: Vec<u8>,
) -> EyreResult<NetworkResult<()>> {
) -> EyreResult<NetworkResult<ConnectionDescriptor>> {
// Ensure we are filtered down to UDP (the only hole punch protocol supported today)
assert!(target_nr
.filter_ref()
.map(|dif| dif.protocol_set == ProtocolTypeSet::only(ProtocolType::UDP))
.map(|dif| dif.protocol_type_set == ProtocolTypeSet::only(ProtocolType::UDP))
.unwrap_or_default());
// Build a return receipt for the signal
@@ -1233,7 +1223,7 @@ impl NetworkManager {
.send_data_to_existing_connection(descriptor, data)
.await?
{
None => Ok(NetworkResult::value(())),
None => Ok(NetworkResult::value(descriptor)),
Some(_) => Ok(NetworkResult::no_connection_other(
"unable to send over hole punch",
)),
@@ -1260,20 +1250,19 @@ impl NetworkManager {
let this = self.clone();
Box::pin(async move {
// First try to send data to the last socket we've seen this peer on
let data = if let Some(descriptor) = node_ref.last_connection().await {
let data = if let Some(connection_descriptor) = node_ref.last_connection().await {
match this
.net()
.send_data_to_existing_connection(descriptor, data)
.send_data_to_existing_connection(connection_descriptor, data)
.await?
{
None => {
return Ok(
if descriptor.matches_peer_scope(PeerScopeSet::only(PeerScope::Local)) {
NetworkResult::value(SendDataKind::LocalDirect)
} else {
NetworkResult::value(SendDataKind::GlobalDirect)
},
);
// Update timestamp for this last connection since we just sent to it
node_ref.set_last_connection(connection_descriptor, intf::get_timestamp());
return Ok(NetworkResult::value(SendDataKind::Direct(
connection_descriptor,
)));
}
Some(d) => d,
}
@@ -1291,32 +1280,35 @@ impl NetworkManager {
match contact_method {
ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => {
network_result_try!(this.send_data(relay_nr, data).await?);
Ok(NetworkResult::value(SendDataKind::GlobalIndirect))
Ok(NetworkResult::value(SendDataKind::Indirect))
}
ContactMethod::Direct(dial_info) => {
let send_data_kind = if dial_info.is_local() {
SendDataKind::LocalDirect
} else {
SendDataKind::GlobalDirect
};
let connection_descriptor = network_result_try!(
this.net().send_data_to_dial_info(dial_info, data).await?
);
// If we connected to this node directly, save off the last connection so we can use it again
node_ref.set_last_connection(connection_descriptor, intf::get_timestamp());
Ok(NetworkResult::value(send_data_kind))
Ok(NetworkResult::value(SendDataKind::Direct(
connection_descriptor,
)))
}
ContactMethod::SignalReverse(relay_nr, target_node_ref) => {
network_result_try!(
let connection_descriptor = network_result_try!(
this.do_reverse_connect(relay_nr, target_node_ref, data)
.await?
);
Ok(NetworkResult::value(SendDataKind::GlobalDirect))
Ok(NetworkResult::value(SendDataKind::Direct(
connection_descriptor,
)))
}
ContactMethod::SignalHolePunch(relay_nr, target_node_ref) => {
network_result_try!(this.do_hole_punch(relay_nr, target_node_ref, data).await?);
Ok(NetworkResult::value(SendDataKind::GlobalDirect))
let connection_descriptor = network_result_try!(
this.do_hole_punch(relay_nr, target_node_ref, data).await?
);
Ok(NetworkResult::value(SendDataKind::Direct(
connection_descriptor,
)))
}
ContactMethod::Unreachable => Ok(NetworkResult::no_connection_other(
"Can't send to this node",
@@ -1626,6 +1618,7 @@ impl NetworkManager {
pub async fn report_local_socket_address(
&self,
_socket_address: SocketAddress,
_connection_descriptor: ConnectionDescriptor,
_reporting_peer: NodeRef,
) {
// XXX: Nothing here yet.
@@ -1636,16 +1629,24 @@ impl NetworkManager {
// Wait until we have received confirmation from N different peers
pub async fn report_global_socket_address(
&self,
socket_address: SocketAddress,
reporting_peer: NodeRef,
socket_address: SocketAddress, // the socket address as seen by the remote peer
connection_descriptor: ConnectionDescriptor, // the connection descriptor used
reporting_peer: NodeRef, // the peer's noderef reporting the socket address
) {
let key = PublicAddressCheckCacheKey(
connection_descriptor.protocol_type(),
connection_descriptor.address_type(),
);
let (net, routing_table) = {
let mut inner = self.inner.lock();
// Store the reported address
inner
let pacc = inner
.public_address_check_cache
.insert(reporting_peer.node_id(), socket_address);
.entry(key)
.or_insert_with(|| LruCache::new(8));
pacc.insert(reporting_peer.node_id(), socket_address);
let net = inner.components.as_ref().unwrap().net.clone();
let routing_table = inner.routing_table.as_ref().unwrap().clone();
@@ -1660,11 +1661,14 @@ impl NetworkManager {
// Determine if our external address has likely changed
let needs_public_address_detection =
if matches!(network_class, NetworkClass::InboundCapable) {
// Get the dial info filter for this connection so we can check if we have any public dialinfo that may have changed
let dial_info_filter = connection_descriptor.make_dial_info_filter();
// Get current external ip/port from registered global dialinfo
let current_addresses: BTreeSet<SocketAddress> = routing_table
.all_filtered_dial_info_details(
Some(RoutingDomain::PublicInternet),
&DialInfoFilter::all(),
&dial_info_filter,
)
.iter()
.map(|did| did.dial_info.socket_address())
@@ -1672,11 +1676,15 @@ impl NetworkManager {
// If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers
// then we zap the network class and re-detect it
let inner = self.inner.lock();
let mut inner = self.inner.lock();
let mut inconsistencies = 0;
let mut changed = false;
// Iteration goes from most recent to least recent node/address pair
for (_, a) in &inner.public_address_check_cache {
let pacc = inner
.public_address_check_cache
.entry(key)
.or_insert_with(|| LruCache::new(8));
for (_, a) in pacc {
if !current_addresses.contains(a) {
inconsistencies += 1;
if inconsistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT {
@@ -1691,12 +1699,17 @@ impl NetworkManager {
// but if we are starting to see consistent socket address from multiple reporting peers
// then we may be become inbound capable, so zap the network class so we can re-detect it and any public dial info
let inner = self.inner.lock();
let mut inner = self.inner.lock();
let mut consistencies = 0;
let mut consistent = false;
let mut current_address = Option::<SocketAddress>::None;
// Iteration goes from most recent to least recent node/address pair
for (_, a) in &inner.public_address_check_cache {
let pacc = inner
.public_address_check_cache
.entry(key)
.or_insert_with(|| LruCache::new(8));
for (_, a) in pacc {
if let Some(current_address) = current_address {
if current_address == *a {
consistencies += 1;
@@ -1721,11 +1734,8 @@ impl NetworkManager {
inner.public_address_check_cache.clear();
// Reset the network class and dial info so we can re-detect it
//routing_table.clear_dial_info_details(RoutingDomain::PublicInternet);
//net.reset_network_class();
// Do a full network reset since this doesn't take that long and ensures we get the local network stuff correct too
net.restart_network();
routing_table.clear_dial_info_details(RoutingDomain::PublicInternet);
net.reset_network_class();
} else {
warn!("Public address may have changed. Restarting the server may be required.");
}