public address detection work
This commit is contained in:
@@ -41,30 +41,6 @@ impl ConnectionLimits {
|
||||
}
|
||||
}
|
||||
|
||||
// Converts an ip to a ip block by applying a netmask
|
||||
// to the host part of the ip address
|
||||
// ipv4 addresses are treated as single hosts
|
||||
// ipv6 addresses are treated as prefix allocated blocks
|
||||
fn ip_to_ipblock(&self, addr: IpAddr) -> IpAddr {
|
||||
match addr {
|
||||
IpAddr::V4(_) => addr,
|
||||
IpAddr::V6(v6) => {
|
||||
let mut hostlen = 128usize.saturating_sub(self.max_connections_per_ip6_prefix_size);
|
||||
let mut out = v6.octets();
|
||||
for i in (0..16).rev() {
|
||||
if hostlen >= 8 {
|
||||
out[i] = 0xFF;
|
||||
hostlen -= 8;
|
||||
} else {
|
||||
out[i] |= !(0xFFu8 << hostlen);
|
||||
break;
|
||||
}
|
||||
}
|
||||
IpAddr::V6(Ipv6Addr::from(out))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn purge_old_timestamps(&mut self, cur_ts: u64) {
|
||||
// v4
|
||||
{
|
||||
@@ -101,7 +77,7 @@ impl ConnectionLimits {
|
||||
}
|
||||
|
||||
pub fn add(&mut self, addr: IpAddr) -> Result<(), AddressFilterError> {
|
||||
let ipblock = self.ip_to_ipblock(addr);
|
||||
let ipblock = ip_to_ipblock(self.max_connections_per_ip6_prefix_size, addr);
|
||||
let ts = intf::get_timestamp();
|
||||
|
||||
self.purge_old_timestamps(ts);
|
||||
@@ -156,7 +132,7 @@ impl ConnectionLimits {
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, addr: IpAddr) -> Result<(), AddressNotInTableError> {
|
||||
let ipblock = self.ip_to_ipblock(addr);
|
||||
let ipblock = ip_to_ipblock(self.max_connections_per_ip6_prefix_size, addr);
|
||||
|
||||
let ts = intf::get_timestamp();
|
||||
self.purge_old_timestamps(ts);
|
||||
|
||||
@@ -41,9 +41,11 @@ pub const RELAY_MANAGEMENT_INTERVAL_SECS: u32 = 1;
|
||||
pub const MAX_MESSAGE_SIZE: usize = MAX_ENVELOPE_SIZE;
|
||||
pub const IPADDR_TABLE_SIZE: usize = 1024;
|
||||
pub const IPADDR_MAX_INACTIVE_DURATION_US: u64 = 300_000_000u64; // 5 minutes
|
||||
pub const GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT: usize = 3;
|
||||
pub const PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT: usize = 3;
|
||||
pub const PUBLIC_ADDRESS_CHECK_CACHE_SIZE: usize = 8;
|
||||
pub const PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS: u32 = 60;
|
||||
pub const PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US: u64 = 300_000_000u64; // 5 minutes
|
||||
pub const BOOT_MAGIC: &[u8; 4] = b"BOOT";
|
||||
|
||||
pub const BOOTSTRAP_TXT_VERSION: u8 = 0;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -110,15 +112,21 @@ struct ClientWhitelistEntry {
|
||||
last_seen_ts: u64,
|
||||
}
|
||||
|
||||
// Mechanism required to contact another node
|
||||
/// Mechanism required to contact another node
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) enum ContactMethod {
|
||||
Unreachable, // Node is not reachable by any means
|
||||
Direct(DialInfo), // Contact the node directly
|
||||
SignalReverse(NodeRef, NodeRef), // Request via signal the node connect back directly
|
||||
SignalHolePunch(NodeRef, NodeRef), // Request via signal the node negotiate a hole punch
|
||||
InboundRelay(NodeRef), // Must use an inbound relay to reach the node
|
||||
OutboundRelay(NodeRef), // Must use outbound relay to reach the node
|
||||
/// Node is not reachable by any means
|
||||
Unreachable,
|
||||
/// Contact the node directly
|
||||
Direct(DialInfo),
|
||||
/// Request via signal the node connect back directly (relay_nr, target_node_ref)
|
||||
SignalReverse(NodeRef, NodeRef),
|
||||
/// Request via signal the node negotiate a hole punch (relay_nr, target_node_ref)
|
||||
SignalHolePunch(NodeRef, NodeRef),
|
||||
/// Must use an inbound relay to reach the node
|
||||
InboundRelay(NodeRef),
|
||||
/// Must use outbound relay to reach the node
|
||||
OutboundRelay(NodeRef),
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
@@ -140,7 +148,9 @@ struct NetworkManagerInner {
|
||||
client_whitelist: LruCache<DHTKey, ClientWhitelistEntry>,
|
||||
relay_node: Option<NodeRef>,
|
||||
public_address_check_cache:
|
||||
BTreeMap<PublicAddressCheckCacheKey, LruCache<DHTKey, SocketAddress>>,
|
||||
BTreeMap<PublicAddressCheckCacheKey, LruCache<IpAddr, SocketAddress>>,
|
||||
public_address_inconsistencies_table:
|
||||
BTreeMap<PublicAddressCheckCacheKey, HashMap<IpAddr, u64>>,
|
||||
protocol_config: Option<ProtocolConfig>,
|
||||
public_inbound_dial_info_filter: Option<DialInfoFilter>,
|
||||
local_inbound_dial_info_filter: Option<DialInfoFilter>,
|
||||
@@ -155,6 +165,7 @@ struct NetworkManagerUnlockedInner {
|
||||
bootstrap_task: TickTask<EyreReport>,
|
||||
peer_minimum_refresh_task: TickTask<EyreReport>,
|
||||
ping_validator_task: TickTask<EyreReport>,
|
||||
public_address_check_task: TickTask<EyreReport>,
|
||||
node_info_update_single_future: MustJoinSingleFuture<()>,
|
||||
}
|
||||
|
||||
@@ -177,6 +188,7 @@ impl NetworkManager {
|
||||
client_whitelist: LruCache::new_unbounded(),
|
||||
relay_node: None,
|
||||
public_address_check_cache: BTreeMap::new(),
|
||||
public_address_inconsistencies_table: BTreeMap::new(),
|
||||
protocol_config: None,
|
||||
public_inbound_dial_info_filter: None,
|
||||
local_inbound_dial_info_filter: None,
|
||||
@@ -192,6 +204,7 @@ impl NetworkManager {
|
||||
bootstrap_task: TickTask::new(1),
|
||||
peer_minimum_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms),
|
||||
ping_validator_task: TickTask::new(1),
|
||||
public_address_check_task: TickTask::new(PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS),
|
||||
node_info_update_single_future: MustJoinSingleFuture::new(),
|
||||
}
|
||||
}
|
||||
@@ -247,6 +260,15 @@ impl NetworkManager {
|
||||
Box::pin(this2.clone().ping_validator_task_routine(s, l, t))
|
||||
});
|
||||
}
|
||||
// Set public address check task
|
||||
{
|
||||
let this2 = this.clone();
|
||||
this.unlocked_inner
|
||||
.public_address_check_task
|
||||
.set_routine(move |s, l, t| {
|
||||
Box::pin(this2.clone().public_address_check_task_routine(s, l, t))
|
||||
});
|
||||
}
|
||||
this
|
||||
}
|
||||
pub fn config(&self) -> VeilidConfig {
|
||||
@@ -755,6 +777,7 @@ impl NetworkManager {
|
||||
let peer_nr = match routing_table.register_node_with_signed_node_info(
|
||||
peer_info.node_id.key,
|
||||
peer_info.signed_node_info,
|
||||
false,
|
||||
) {
|
||||
None => {
|
||||
return Ok(NetworkResult::invalid_message(
|
||||
@@ -777,6 +800,7 @@ impl NetworkManager {
|
||||
let mut peer_nr = match routing_table.register_node_with_signed_node_info(
|
||||
peer_info.node_id.key,
|
||||
peer_info.signed_node_info,
|
||||
false,
|
||||
) {
|
||||
None => {
|
||||
return Ok(NetworkResult::invalid_message(
|
||||
@@ -1653,23 +1677,27 @@ impl NetworkManager {
|
||||
connection_descriptor.address_type(),
|
||||
);
|
||||
|
||||
let (net, routing_table) = {
|
||||
let (net, routing_table, detect_address_changes) = {
|
||||
let mut inner = self.inner.lock();
|
||||
let c = self.config.get();
|
||||
|
||||
// Get the ip(block) this report is coming from
|
||||
let ip6_prefix_size = c.network.max_connections_per_ip6_prefix_size as usize;
|
||||
let ipblock = ip_to_ipblock(
|
||||
ip6_prefix_size,
|
||||
connection_descriptor.remote_address().to_ip_addr(),
|
||||
);
|
||||
|
||||
// Store the reported address
|
||||
let pacc = inner
|
||||
.public_address_check_cache
|
||||
.entry(key)
|
||||
.or_insert_with(|| LruCache::new(8));
|
||||
pacc.insert(reporting_peer.node_id(), socket_address);
|
||||
.or_insert_with(|| LruCache::new(PUBLIC_ADDRESS_CHECK_CACHE_SIZE));
|
||||
pacc.insert(ipblock, socket_address);
|
||||
|
||||
let net = inner.components.as_ref().unwrap().net.clone();
|
||||
let routing_table = inner.routing_table.as_ref().unwrap().clone();
|
||||
(net, routing_table)
|
||||
};
|
||||
let detect_address_changes = {
|
||||
let c = self.config.get();
|
||||
c.network.detect_address_changes
|
||||
(net, routing_table, c.network.detect_address_changes)
|
||||
};
|
||||
let network_class = net.get_network_class().unwrap_or(NetworkClass::Invalid);
|
||||
|
||||
@@ -1691,31 +1719,47 @@ impl NetworkManager {
|
||||
|
||||
// If we are inbound capable, but start to see inconsistent socket addresses from multiple reporting peers
|
||||
// then we zap the network class and re-detect it
|
||||
let mut inner = self.inner.lock();
|
||||
let mut inconsistencies = 0;
|
||||
let mut changed = false;
|
||||
let inner = &mut *self.inner.lock();
|
||||
let mut inconsistencies = Vec::new();
|
||||
let mut inconsistent = false;
|
||||
// Iteration goes from most recent to least recent node/address pair
|
||||
let pacc = inner
|
||||
.public_address_check_cache
|
||||
.entry(key)
|
||||
.or_insert_with(|| LruCache::new(8));
|
||||
for (_, a) in pacc {
|
||||
if !current_addresses.contains(a) {
|
||||
inconsistencies += 1;
|
||||
if inconsistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT {
|
||||
changed = true;
|
||||
.or_insert_with(|| LruCache::new(PUBLIC_ADDRESS_CHECK_CACHE_SIZE));
|
||||
let pait = inner
|
||||
.public_address_inconsistencies_table
|
||||
.entry(key)
|
||||
.or_insert_with(|| HashMap::new());
|
||||
for (reporting_ip_block, a) in pacc {
|
||||
// If this address is not one of our current addresses (inconsistent)
|
||||
// and we haven't already denylisted the reporting source,
|
||||
if !current_addresses.contains(a) && !pait.contains_key(reporting_ip_block) {
|
||||
// Record the origin of the inconsistency
|
||||
inconsistencies.push(*reporting_ip_block);
|
||||
|
||||
// If we have enough inconsistencies to consider changing our public dial info,
|
||||
// add them to our denylist (throttling) and go ahead and check for new
|
||||
// public dialinfo
|
||||
if inconsistencies.len() >= PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT {
|
||||
let exp_ts =
|
||||
intf::get_timestamp() + PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US;
|
||||
for i in inconsistencies {
|
||||
pait.insert(i, exp_ts);
|
||||
}
|
||||
|
||||
inconsistent = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// // debug code
|
||||
// if changed {
|
||||
// trace!("public_address_check_cache: {:#?}\ncurrent_addresses: {:#?}\ninconsistencies: {}", inner
|
||||
// .public_address_check_cache, current_addresses, inconsistencies);
|
||||
// }
|
||||
|
||||
changed
|
||||
inconsistent
|
||||
} else {
|
||||
// If we are currently outbound only, we don't have any public dial info
|
||||
// but if we are starting to see consistent socket address from multiple reporting peers
|
||||
@@ -1729,13 +1773,13 @@ impl NetworkManager {
|
||||
let pacc = inner
|
||||
.public_address_check_cache
|
||||
.entry(key)
|
||||
.or_insert_with(|| LruCache::new(8));
|
||||
.or_insert_with(|| LruCache::new(PUBLIC_ADDRESS_CHECK_CACHE_SIZE));
|
||||
|
||||
for (_, a) in pacc {
|
||||
if let Some(current_address) = current_address {
|
||||
if current_address == *a {
|
||||
consistencies += 1;
|
||||
if consistencies >= GLOBAL_ADDRESS_CHANGE_DETECTION_COUNT {
|
||||
if consistencies >= PUBLIC_ADDRESS_CHANGE_DETECTION_COUNT {
|
||||
consistent = true;
|
||||
break;
|
||||
}
|
||||
@@ -1811,7 +1855,7 @@ impl NetworkManager {
|
||||
return;
|
||||
}
|
||||
|
||||
// Mark the node as updated
|
||||
// Mark the node as having seen our node info
|
||||
nr.set_seen_our_node_info();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -188,7 +188,7 @@ impl IGDManager {
|
||||
}
|
||||
}
|
||||
let pmk = found?;
|
||||
let pmv = inner.port_maps.remove(&pmk).unwrap();
|
||||
let _pmv = inner.port_maps.remove(&pmk).expect("key found but remove failed");
|
||||
|
||||
// Find gateway
|
||||
let gw = Self::find_gateway(&mut *inner, at)?;
|
||||
|
||||
@@ -213,7 +213,7 @@ impl DiscoveryContext {
|
||||
|
||||
#[instrument(level = "trace", skip(self), ret)]
|
||||
async fn try_port_mapping(&self) -> Option<DialInfo> {
|
||||
let (enable_upnp, enable_natpmp) = {
|
||||
let (enable_upnp, _enable_natpmp) = {
|
||||
let c = self.net.config.get();
|
||||
(c.network.upnp, c.network.natpmp)
|
||||
};
|
||||
|
||||
@@ -113,7 +113,7 @@ impl Network {
|
||||
let addr = match tcp_stream.peer_addr() {
|
||||
Ok(addr) => addr,
|
||||
Err(e) => {
|
||||
log_net!(error "failed to get peer address: {}", e);
|
||||
log_net!(debug "failed to get peer address: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -139,7 +139,7 @@ impl Network {
|
||||
{
|
||||
// If we fail to get a packet within the connection initial timeout
|
||||
// then we punt this connection
|
||||
log_net!(warn "connection initial timeout from: {:?}", addr);
|
||||
log_net!("connection initial timeout from: {:?}", addr);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -169,12 +169,12 @@ impl Network {
|
||||
}
|
||||
Ok(None) => {
|
||||
// No protocol handlers matched? drop it.
|
||||
log_net!(warn "no protocol handler for connection from {:?}", addr);
|
||||
log_net!(debug "no protocol handler for connection from {:?}", addr);
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
// Failed to negotiate connection? drop it.
|
||||
log_net!(warn "failed to negotiate connection from {:?}: {}", addr, e);
|
||||
log_net!(debug "failed to negotiate connection from {:?}: {}", addr, e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -69,11 +69,15 @@ impl RawTcpNetworkConnection {
|
||||
|
||||
network_result_try!(stream.read_exact(&mut header).await.into_network_result()?);
|
||||
if header[0] != b'V' || header[1] != b'L' {
|
||||
bail_io_error_other!("received invalid TCP frame header");
|
||||
return Ok(NetworkResult::invalid_message(
|
||||
"received invalid TCP frame header",
|
||||
));
|
||||
}
|
||||
let len = ((header[3] as usize) << 8) | (header[2] as usize);
|
||||
if len > MAX_MESSAGE_SIZE {
|
||||
bail_io_error_other!("received too large TCP frame");
|
||||
return Ok(NetworkResult::invalid_message(
|
||||
"received too large TCP frame",
|
||||
));
|
||||
}
|
||||
|
||||
let mut out: Vec<u8> = vec![0u8; len];
|
||||
|
||||
@@ -188,7 +188,7 @@ impl NetworkManager {
|
||||
let k = pi.node_id.key;
|
||||
// Register the node
|
||||
if let Some(nr) =
|
||||
routing_table.register_node_with_signed_node_info(k, pi.signed_node_info)
|
||||
routing_table.register_node_with_signed_node_info(k, pi.signed_node_info, false)
|
||||
{
|
||||
// Add this our futures to process in parallel
|
||||
let routing_table = routing_table.clone();
|
||||
@@ -288,6 +288,7 @@ impl NetworkManager {
|
||||
dial_info_detail_list: v.dial_info_details, // Dial info is as specified in the bootstrap list
|
||||
relay_peer_info: None, // Bootstraps never require a relay themselves
|
||||
}),
|
||||
true,
|
||||
) {
|
||||
// Add this our futures to process in parallel
|
||||
let routing_table = routing_table.clone();
|
||||
@@ -458,6 +459,7 @@ impl NetworkManager {
|
||||
if let Some(nr) = routing_table.register_node_with_signed_node_info(
|
||||
outbound_relay_peerinfo.node_id.key,
|
||||
outbound_relay_peerinfo.signed_node_info,
|
||||
false,
|
||||
) {
|
||||
info!("Outbound relay node selected: {}", nr);
|
||||
inner.relay_node = Some(nr);
|
||||
@@ -531,4 +533,28 @@ impl NetworkManager {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Clean up the public address check tables, removing entries that have timed out
|
||||
#[instrument(level = "trace", skip(self), err)]
|
||||
pub(super) async fn public_address_check_task_routine(
|
||||
self,
|
||||
stop_token: StopToken,
|
||||
_last_ts: u64,
|
||||
cur_ts: u64,
|
||||
) -> EyreResult<()> {
|
||||
// go through public_address_inconsistencies_table and time out things that have expired
|
||||
let mut inner = self.inner.lock();
|
||||
for (_, pait_v) in &mut inner.public_address_inconsistencies_table {
|
||||
let mut expired = Vec::new();
|
||||
for (addr, exp_ts) in pait_v.iter() {
|
||||
if *exp_ts <= cur_ts {
|
||||
expired.push(*addr);
|
||||
}
|
||||
}
|
||||
for exp in expired {
|
||||
pait_v.remove(&exp);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user