remove rkyv

This commit is contained in:
Christien Rioux
2023-07-15 16:18:13 -04:00
parent 4078c00098
commit 80cb23c0c6
86 changed files with 943 additions and 2442 deletions
+6 -9
View File
@@ -1,6 +1,5 @@
use super::*;
use core::sync::atomic::Ordering;
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
/// Routing Table Bucket
/// Stores map of public keys to entries, which may be in multiple routing tables per crypto kind
@@ -16,15 +15,13 @@ pub struct Bucket {
pub(super) type EntriesIter<'a> =
alloc::collections::btree_map::Iter<'a, PublicKey, Arc<BucketEntry>>;
#[derive(Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
#[derive(Debug, Serialize, Deserialize)]
struct SerializedBucketEntryData {
key: PublicKey,
value: u32, // index into serialized entries list
}
#[derive(Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
#[derive(Debug, Serialize, Deserialize)]
struct SerializedBucketData {
entries: Vec<SerializedBucketEntryData>,
}
@@ -50,7 +47,7 @@ impl Bucket {
data: Vec<u8>,
all_entries: &[Arc<BucketEntry>],
) -> EyreResult<()> {
let bucket_data: SerializedBucketData = from_rkyv(data)?;
let bucket_data: SerializedBucketData = deserialize_json_bytes(&data)?;
for e in bucket_data.entries {
self.entries
@@ -64,7 +61,7 @@ impl Bucket {
&self,
all_entries: &mut Vec<Arc<BucketEntry>>,
entry_map: &mut HashMap<*const BucketEntry, u32>,
) -> EyreResult<Vec<u8>> {
) -> Vec<u8> {
let mut entries = Vec::new();
for (k, v) in &self.entries {
let entry_index = entry_map.entry(Arc::as_ptr(v)).or_insert_with(|| {
@@ -78,8 +75,8 @@ impl Bucket {
});
}
let bucket_data = SerializedBucketData { entries };
let out = to_rkyv(&bucket_data)?;
Ok(out)
let out = serialize_json_bytes(&bucket_data);
out
}
/// Create a new entry with a node_id of this crypto kind and return it
+8 -11
View File
@@ -39,8 +39,7 @@ pub enum BucketEntryState {
pub struct LastConnectionKey(ProtocolType, AddressType);
/// Bucket entry information specific to the LocalNetwork RoutingDomain
#[derive(Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
#[derive(Debug, Serialize, Deserialize)]
pub struct BucketEntryPublicInternet {
/// The PublicInternet node info
signed_node_info: Option<Box<SignedNodeInfo>>,
@@ -51,8 +50,7 @@ pub struct BucketEntryPublicInternet {
}
/// Bucket entry information specific to the LocalNetwork RoutingDomain
#[derive(Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
#[derive(Debug, Serialize, Deserialize)]
pub struct BucketEntryLocalNetwork {
/// The LocalNetwork node info
signed_node_info: Option<Box<SignedNodeInfo>>,
@@ -63,8 +61,7 @@ pub struct BucketEntryLocalNetwork {
}
/// The data associated with each bucket entry
#[derive(Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
#[derive(Debug, Serialize, Deserialize)]
pub struct BucketEntryInner {
/// The node ids matching this bucket entry, with the cryptography versions supported by this node as the 'kind' field
validated_node_ids: TypedKeyGroup,
@@ -79,7 +76,7 @@ pub struct BucketEntryInner {
/// unreachable may now be reachable with the same SignedNodeInfo/DialInfo
updated_since_last_network_change: bool,
/// The last connection descriptors used to contact this node, per protocol type
#[with(Skip)]
#[serde(skip)]
last_connections: BTreeMap<LastConnectionKey, (ConnectionDescriptor, Timestamp)>,
/// The node info for this entry on the publicinternet routing domain
public_internet: BucketEntryPublicInternet,
@@ -88,18 +85,18 @@ pub struct BucketEntryInner {
/// Statistics gathered for the peer
peer_stats: PeerStats,
/// The accounting for the latency statistics
#[with(Skip)]
#[serde(skip)]
latency_stats_accounting: LatencyStatsAccounting,
/// The accounting for the transfer statistics
#[with(Skip)]
#[serde(skip)]
transfer_stats_accounting: TransferStatsAccounting,
/// Tracking identifier for NodeRef debugging
#[cfg(feature = "tracking")]
#[with(Skip)]
#[serde(skip)]
next_track_id: usize,
/// Backtraces for NodeRef debugging
#[cfg(feature = "tracking")]
#[with(Skip)]
#[serde(skip)]
node_ref_tracks: HashMap<usize, backtrace::Backtrace>,
}
+23 -18
View File
@@ -48,6 +48,12 @@ pub const PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS: u32 = 1;
// We should ping them with some frequency and 30 seconds is typical timeout
pub const CONNECTIONLESS_TIMEOUT_SECS: u32 = 29;
// Table store keys
const ALL_ENTRY_BYTES: &[u8] = b"all_entry_bytes";
const ROUTING_TABLE: &str = "routing_table";
const SERIALIZED_BUCKET_MAP: &[u8] = b"serialized_bucket_map";
const CACHE_VALIDITY_KEY: &[u8] = b"cache_validity_key";
pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>;
pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>;
#[derive(Clone, Debug)]
@@ -295,7 +301,7 @@ impl RoutingTable {
}
/// Serialize the routing table.
fn serialized_buckets(&self) -> EyreResult<(SerializedBucketMap, SerializedBuckets)> {
fn serialized_buckets(&self) -> (SerializedBucketMap, SerializedBuckets) {
// Since entries are shared by multiple buckets per cryptokind
// we need to get the list of all unique entries when serializing
let mut all_entries: Vec<Arc<BucketEntry>> = Vec::new();
@@ -309,7 +315,7 @@ impl RoutingTable {
let buckets = inner.buckets.get(&ck).unwrap();
let mut serialized_buckets = Vec::new();
for bucket in buckets.iter() {
serialized_buckets.push(bucket.save_bucket(&mut all_entries, &mut entry_map)?)
serialized_buckets.push(bucket.save_bucket(&mut all_entries, &mut entry_map))
}
serialized_bucket_map.insert(ck, serialized_buckets);
}
@@ -319,25 +325,25 @@ impl RoutingTable {
let mut all_entry_bytes = Vec::with_capacity(all_entries.len());
for entry in all_entries {
// Serialize entry
let entry_bytes = entry.with_inner(|e| to_rkyv(e))?;
let entry_bytes = entry.with_inner(|e| serialize_json_bytes(e));
all_entry_bytes.push(entry_bytes);
}
Ok((serialized_bucket_map, all_entry_bytes))
(serialized_bucket_map, all_entry_bytes)
}
/// Write the serialized routing table to the table store.
async fn save_buckets(&self) -> EyreResult<()> {
let (serialized_bucket_map, all_entry_bytes) = self.serialized_buckets()?;
let (serialized_bucket_map, all_entry_bytes) = self.serialized_buckets();
let table_store = self.unlocked_inner.network_manager().table_store();
let tdb = table_store.open("routing_table", 1).await?;
let tdb = table_store.open(ROUTING_TABLE, 1).await?;
let dbx = tdb.transact();
if let Err(e) = dbx.store_rkyv(0, b"serialized_bucket_map", &serialized_bucket_map) {
if let Err(e) = dbx.store_json(0, SERIALIZED_BUCKET_MAP, &serialized_bucket_map) {
dbx.rollback();
return Err(e.into());
}
if let Err(e) = dbx.store_rkyv(0, b"all_entry_bytes", &all_entry_bytes) {
if let Err(e) = dbx.store_json(0, ALL_ENTRY_BYTES, &all_entry_bytes) {
dbx.rollback();
return Err(e.into());
}
@@ -362,9 +368,9 @@ impl RoutingTable {
// Deserialize bucket map and all entries from the table store
let table_store = self.unlocked_inner.network_manager().table_store();
let db = table_store.open("routing_table", 1).await?;
let db = table_store.open(ROUTING_TABLE, 1).await?;
let caches_valid = match db.load(0, b"cache_validity_key").await? {
let caches_valid = match db.load(0, CACHE_VALIDITY_KEY).await? {
Some(v) => v == cache_validity_key,
None => false,
};
@@ -372,19 +378,18 @@ impl RoutingTable {
// Caches not valid, start over
log_rtab!(debug "cache validity key changed, emptying routing table");
drop(db);
table_store.delete("routing_table").await?;
let db = table_store.open("routing_table", 1).await?;
db.store(0, b"cache_validity_key", &cache_validity_key)
.await?;
table_store.delete(ROUTING_TABLE).await?;
let db = table_store.open(ROUTING_TABLE, 1).await?;
db.store(0, CACHE_VALIDITY_KEY, &cache_validity_key).await?;
return Ok(());
}
// Caches valid, load saved routing table
let Some(serialized_bucket_map): Option<SerializedBucketMap> = db.load_rkyv(0, b"serialized_bucket_map").await? else {
let Some(serialized_bucket_map): Option<SerializedBucketMap> = db.load_json(0, SERIALIZED_BUCKET_MAP).await? else {
log_rtab!(debug "no bucket map in saved routing table");
return Ok(());
};
let Some(all_entry_bytes): Option<SerializedBuckets> = db.load_rkyv(0, b"all_entry_bytes").await? else {
let Some(all_entry_bytes): Option<SerializedBuckets> = db.load_json(0, ALL_ENTRY_BYTES).await? else {
log_rtab!(debug "no all_entry_bytes in saved routing table");
return Ok(());
};
@@ -405,8 +410,8 @@ impl RoutingTable {
) -> EyreResult<()> {
let mut all_entries: Vec<Arc<BucketEntry>> = Vec::with_capacity(all_entry_bytes.len());
for entry_bytes in all_entry_bytes {
let entryinner =
from_rkyv(entry_bytes).wrap_err("failed to deserialize bucket entry")?;
let entryinner = deserialize_json_bytes(&entry_bytes)
.wrap_err("failed to deserialize bucket entry")?;
let entry = Arc::new(BucketEntry::new_with_inner(entryinner));
// Keep strong reference in table
@@ -1,7 +1,6 @@
use super::*;
#[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C, align(8)), derive(CheckBytes))]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RouteSpecDetail {
/// Crypto kind
pub crypto_kind: CryptoKind,
@@ -11,20 +10,18 @@ pub struct RouteSpecDetail {
pub hops: Vec<PublicKey>,
}
#[derive(Clone, Debug, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C, align(8)), derive(CheckBytes))]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RouteSetSpecDetail {
/// Route set per crypto kind
route_set: BTreeMap<PublicKey, RouteSpecDetail>,
/// Route noderefs
#[with(Skip)]
#[serde(skip)]
hop_node_refs: Vec<NodeRef>,
/// Published private route, do not reuse for ephemeral routes
/// Not serialized because all routes should be re-published when restarting
#[with(Skip)]
#[serde(skip)]
published: bool,
/// Directions this route is guaranteed to work in
#[with(RkyvEnumSet)]
directions: DirectionSet,
/// Stability preference (prefer reliable nodes over faster)
stability: Stability,
@@ -1,8 +1,7 @@
use super::*;
/// The core representation of the RouteSpecStore that can be serialized
#[derive(Debug, Clone, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C, align(8)), derive(CheckBytes))]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RouteSpecStoreContent {
/// All of the route sets we have allocated so far indexed by key
id_by_key: HashMap<PublicKey, RouteId>,
@@ -23,7 +22,7 @@ impl RouteSpecStoreContent {
let table_store = routing_table.network_manager().table_store();
let rsstdb = table_store.open("RouteSpecStore", 1).await?;
let mut content: RouteSpecStoreContent =
rsstdb.load_rkyv(0, b"content").await?.unwrap_or_default();
rsstdb.load_json(0, b"content").await?.unwrap_or_default();
// Look up all route hop noderefs since we can't serialize those
let mut dead_ids = Vec::new();
@@ -63,7 +62,7 @@ impl RouteSpecStoreContent {
// This skips #[with(Skip)] saving the secret keys, we save them in the protected store instead
let table_store = routing_table.network_manager().table_store();
let rsstdb = table_store.open("RouteSpecStore", 1).await?;
rsstdb.store_rkyv(0, b"content", self).await?;
rsstdb.store_json(0, b"content", self).await?;
Ok(())
}
@@ -96,9 +95,9 @@ impl RouteSpecStoreContent {
pub fn get_id_by_key(&self, key: &PublicKey) -> Option<RouteId> {
self.id_by_key.get(key).cloned()
}
pub fn iter_ids(&self) -> std::collections::hash_map::Keys<RouteId, RouteSetSpecDetail> {
self.details.keys()
}
// pub fn iter_ids(&self) -> std::collections::hash_map::Keys<RouteId, RouteSetSpecDetail> {
// self.details.keys()
// }
pub fn iter_details(&self) -> std::collections::hash_map::Iter<RouteId, RouteSetSpecDetail> {
self.details.iter()
}
@@ -1,34 +1,33 @@
use super::*;
#[derive(Clone, Debug, Default, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct RouteStats {
/// Consecutive failed to send count
#[with(Skip)]
#[serde(skip)]
pub failed_to_send: u32,
/// Questions lost
#[with(Skip)]
#[serde(skip)]
pub questions_lost: u32,
/// Timestamp of when the route was created
pub created_ts: Timestamp,
/// Timestamp of when the route was last checked for validity
#[with(Skip)]
#[serde(skip)]
pub last_tested_ts: Option<Timestamp>,
/// Timestamp of when the route was last sent to
#[with(Skip)]
#[serde(skip)]
pub last_sent_ts: Option<Timestamp>,
/// Timestamp of when the route was last received over
#[with(Skip)]
#[serde(skip)]
pub last_received_ts: Option<Timestamp>,
/// Transfers up and down
pub transfer_stats_down_up: TransferStatsDownUp,
/// Latency stats
pub latency_stats: LatencyStats,
/// Accounting mechanism for this route's RPC latency
#[with(Skip)]
#[serde(skip)]
latency_stats_accounting: LatencyStatsAccounting,
/// Accounting mechanism for the bandwidth across this route
#[with(Skip)]
#[serde(skip)]
transfer_stats_accounting: TransferStatsAccounting,
}
@@ -35,7 +35,7 @@ pub async fn test_routingtable_buckets_round_trip() {
// Add lots of routes to `original` here to exercise all various types.
let (serialized_bucket_map, all_entry_bytes) = original.serialized_buckets().unwrap();
let (serialized_bucket_map, all_entry_bytes) = original.serialized_buckets();
copy.populate_routing_table(
&mut copy.inner.write(),
@@ -1,21 +1,7 @@
use super::*;
// Keep member order appropriate for sorting < preference
#[derive(
Debug,
Clone,
PartialEq,
PartialOrd,
Ord,
Eq,
Hash,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
#[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
pub struct DialInfoDetail {
pub class: DialInfoClass,
pub dial_info: DialInfo,
@@ -1,22 +1,12 @@
#![allow(non_snake_case)]
use super::*;
#[allow(clippy::derive_hash_xor_eq)]
#[derive(
Debug,
PartialOrd,
Ord,
Hash,
EnumSetType,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
#[derive(Debug, PartialOrd, Ord, Hash, EnumSetType, Serialize, Deserialize)]
#[enumset(repr = "u8")]
#[archive_attr(repr(u8), derive(CheckBytes))]
pub enum Direction {
Inbound,
Outbound,
Inbound = 0,
Outbound = 1,
}
pub type DirectionSet = EnumSet<Direction>;
@@ -12,61 +12,10 @@ pub const CAP_APPMESSAGE: Capability = FourCC(*b"APPM");
#[cfg(feature = "unstable-blockstore")]
pub const CAP_BLOCKSTORE: Capability = FourCC(*b"BLOC");
cfg_if! {
if #[cfg(all(feature = "unstable-blockstore", feature="unstable-tunnels"))] {
const PUBLIC_INTERNET_CAPABILITIES_LEN: usize = 8;
} else if #[cfg(any(feature = "unstable-blockstore", feature="unstable-tunnels"))] {
const PUBLIC_INTERNET_CAPABILITIES_LEN: usize = 7;
} else {
const PUBLIC_INTERNET_CAPABILITIES_LEN: usize = 6;
}
}
pub const PUBLIC_INTERNET_CAPABILITIES: [Capability; PUBLIC_INTERNET_CAPABILITIES_LEN] = [
CAP_ROUTE,
#[cfg(feature = "unstable-tunnels")]
CAP_TUNNEL,
CAP_SIGNAL,
CAP_RELAY,
CAP_VALIDATE_DIAL_INFO,
CAP_DHT,
CAP_APPMESSAGE,
#[cfg(feature = "unstable-blockstore")]
CAP_BLOCKSTORE,
];
#[cfg(feature = "unstable-blockstore")]
const LOCAL_NETWORK_CAPABILITIES_LEN: usize = 4;
#[cfg(not(feature = "unstable-blockstore"))]
const LOCAL_NETWORK_CAPABILITIES_LEN: usize = 3;
pub const LOCAL_NETWORK_CAPABILITIES: [Capability; LOCAL_NETWORK_CAPABILITIES_LEN] = [
CAP_RELAY,
CAP_DHT,
CAP_APPMESSAGE,
#[cfg(feature = "unstable-blockstore")]
CAP_BLOCKSTORE,
];
pub const MAX_CAPABILITIES: usize = 64;
#[derive(
Clone,
Default,
PartialEq,
Eq,
Debug,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
#[derive(Clone, Default, PartialEq, Eq, Debug, Serialize, Deserialize)]
pub struct NodeInfo {
network_class: NetworkClass,
#[with(RkyvEnumSet)]
outbound_protocols: ProtocolTypeSet,
#[with(RkyvEnumSet)]
address_types: AddressTypeSet,
envelope_support: Vec<u8>,
crypto_support: Vec<CryptoKind>,
@@ -2,8 +2,7 @@ use super::*;
/// Non-nodeinfo status for each node is returned by the StatusA call
#[derive(Clone, Debug, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(C), derive(CheckBytes))]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NodeStatus {
// Reserved for expansion
}
@@ -1,9 +1,6 @@
use super::*;
#[derive(
Clone, Debug, Serialize, Deserialize, PartialEq, Eq, RkyvArchive, RkyvSerialize, RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct PeerInfo {
node_ids: TypedKeyGroup,
signed_node_info: SignedNodeInfo,
@@ -4,20 +4,8 @@ use super::*;
// Routing domain here is listed in order of preference, keep in order
#[allow(clippy::derive_hash_xor_eq)]
#[derive(
Debug,
Ord,
PartialOrd,
Hash,
EnumSetType,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
#[derive(Debug, Ord, PartialOrd, Hash, EnumSetType, Serialize, Deserialize)]
#[enumset(repr = "u8")]
#[archive_attr(repr(u8), derive(CheckBytes))]
pub enum RoutingDomain {
LocalNetwork = 0,
PublicInternet = 1,
@@ -1,10 +1,7 @@
use super::*;
/// Signed NodeInfo that can be passed around amongst peers and verifiable
#[derive(
Clone, Debug, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct SignedDirectNodeInfo {
node_info: NodeInfo,
timestamp: Timestamp,
@@ -1,9 +1,6 @@
use super::*;
#[derive(
Clone, Debug, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
)]
#[archive_attr(repr(u8), derive(CheckBytes))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum SignedNodeInfo {
Direct(SignedDirectNodeInfo),
Relayed(SignedRelayedNodeInfo),
@@ -1,10 +1,7 @@
use super::*;
/// Signed NodeInfo with a relay that can be passed around amongst peers and verifiable
#[derive(
Clone, Debug, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct SignedRelayedNodeInfo {
node_info: NodeInfo,
relay_ids: TypedKeyGroup,