record work
This commit is contained in:
@@ -1,14 +1,10 @@
|
||||
mod keys;
|
||||
mod record;
|
||||
mod record_data;
|
||||
mod record_store;
|
||||
mod record_store_limits;
|
||||
mod tasks;
|
||||
mod types;
|
||||
|
||||
use keys::*;
|
||||
use record::*;
|
||||
use record_data::*;
|
||||
use record_store::*;
|
||||
use record_store_limits::*;
|
||||
|
||||
@@ -28,10 +24,12 @@ const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1;
|
||||
struct StorageManagerInner {
|
||||
/// If we are started up
|
||||
initialized: bool,
|
||||
/// Records that have been 'created' or 'opened' by this node
|
||||
local_record_store: Option<RecordStore>,
|
||||
/// Records that have been pushed to this node for distribution by other nodes
|
||||
remote_record_store: Option<RecordStore>,
|
||||
/// Records that have been 'opened' and are not yet closed
|
||||
opened_records: HashMap<TypedKey, OpenedRecord>,
|
||||
/// Records that have ever been 'created' or 'opened' by this node, things we care about that we must republish to keep alive
|
||||
local_record_store: Option<RecordStore<LocalRecordDetail>>,
|
||||
/// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish
|
||||
remote_record_store: Option<RecordStore<RemoteRecordDetail>>,
|
||||
/// RPC processor if it is available
|
||||
rpc_processor: Option<RPCProcessor>,
|
||||
/// Background processing task (not part of attachment manager tick tree so it happens when detached too)
|
||||
@@ -75,6 +73,7 @@ impl StorageManager {
|
||||
fn new_inner() -> StorageManagerInner {
|
||||
StorageManagerInner {
|
||||
initialized: false,
|
||||
opened_records: HashMap::new(),
|
||||
local_record_store: None,
|
||||
remote_record_store: None,
|
||||
rpc_processor: None,
|
||||
@@ -201,7 +200,7 @@ impl StorageManager {
|
||||
}
|
||||
|
||||
/// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ]
|
||||
fn get_key(&self, vcrypto: CryptoSystemVersion, record: &Record) -> TypedKey {
|
||||
fn get_key<D>(vcrypto: CryptoSystemVersion, record: &Record<D>) -> TypedKey {
|
||||
let compiled = record.descriptor().schema_data();
|
||||
let mut hash_data = Vec::<u8>::with_capacity(PUBLIC_KEY_LENGTH + 4 + compiled.len());
|
||||
hash_data.extend_from_slice(&vcrypto.kind().0);
|
||||
@@ -211,20 +210,12 @@ impl StorageManager {
|
||||
TypedKey::new(vcrypto.kind(), hash)
|
||||
}
|
||||
|
||||
async fn new_local_record(
|
||||
&self,
|
||||
vcrypto: CryptoSystemVersion,
|
||||
record: Record,
|
||||
) -> Result<TypedKey, VeilidAPIError> {
|
||||
// add value record to record store
|
||||
let mut inner = self.inner.lock().await;
|
||||
async fn lock(&self) -> Result<AsyncMutexGuardArc<StorageManagerInner>, VeilidAPIError> {
|
||||
let inner = asyncmutex_lock_arc!(&self.inner);
|
||||
if !inner.initialized {
|
||||
apibail_generic!("not initialized");
|
||||
}
|
||||
let local_record_store = inner.local_record_store.as_mut().unwrap();
|
||||
let key = self.get_key(vcrypto.clone(), &record);
|
||||
local_record_store.new_record(key, record).await?;
|
||||
Ok(key)
|
||||
Ok(inner)
|
||||
}
|
||||
|
||||
pub async fn create_record(
|
||||
@@ -232,7 +223,9 @@ impl StorageManager {
|
||||
kind: CryptoKind,
|
||||
schema: DHTSchema,
|
||||
safety_selection: SafetySelection,
|
||||
) -> Result<TypedKey, VeilidAPIError> {
|
||||
) -> Result<DHTRecordDescriptor, VeilidAPIError> {
|
||||
let mut inner = self.lock().await?;
|
||||
|
||||
// Get cryptosystem
|
||||
let Some(vcrypto) = self.unlocked_inner.crypto.get(kind) else {
|
||||
apibail_generic!("unsupported cryptosystem");
|
||||
@@ -254,34 +247,113 @@ impl StorageManager {
|
||||
|
||||
// Add new local value record
|
||||
let cur_ts = get_aligned_timestamp();
|
||||
let record = Record::new(
|
||||
cur_ts,
|
||||
signed_value_descriptor,
|
||||
Some(owner.secret),
|
||||
safety_selection,
|
||||
)?;
|
||||
let dht_key = self
|
||||
.new_local_record(vcrypto, record)
|
||||
.await
|
||||
.map_err(VeilidAPIError::internal)?;
|
||||
let local_record_detail = LocalRecordDetail { safety_selection };
|
||||
let record =
|
||||
Record::<LocalRecordDetail>::new(cur_ts, signed_value_descriptor, local_record_detail)?;
|
||||
|
||||
Ok(dht_key)
|
||||
let local_record_store = inner.local_record_store.as_mut().unwrap();
|
||||
let dht_key = Self::get_key(vcrypto.clone(), &record);
|
||||
local_record_store.new_record(dht_key, record).await?;
|
||||
|
||||
// Open the record
|
||||
self.open_record_inner(inner, dht_key, Some(owner), safety_selection)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn open_record_inner(
|
||||
&self,
|
||||
mut inner: AsyncMutexGuardArc<StorageManagerInner>,
|
||||
key: TypedKey,
|
||||
writer: Option<KeyPair>,
|
||||
safety_selection: SafetySelection,
|
||||
) -> Result<DHTRecordDescriptor, VeilidAPIError> {
|
||||
// Get cryptosystem
|
||||
let Some(vcrypto) = self.unlocked_inner.crypto.get(key.kind) else {
|
||||
apibail_generic!("unsupported cryptosystem");
|
||||
};
|
||||
|
||||
// See if we have a local record already or not
|
||||
let cb = |r: &Record<LocalRecordDetail>| {
|
||||
// Process local record
|
||||
(r.owner().clone(), r.schema())
|
||||
};
|
||||
if let Some((owner, schema)) = inner.local_record_store.unwrap().with_record(key, cb) {
|
||||
// Had local record
|
||||
|
||||
// If the writer we chose is also the owner, we have the owner secret
|
||||
// Otherwise this is just another subkey writer
|
||||
let owner_secret = if let Some(writer) = writer {
|
||||
if writer.key == owner {
|
||||
Some(writer.secret)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Write open record
|
||||
inner.opened_records.insert(key, OpenedRecord { writer });
|
||||
|
||||
// Make DHT Record Descriptor to return
|
||||
let descriptor = DHTRecordDescriptor {
|
||||
key,
|
||||
owner,
|
||||
owner_secret,
|
||||
schema,
|
||||
};
|
||||
Ok(descriptor)
|
||||
} else {
|
||||
// No record yet
|
||||
|
||||
// Make DHT Record Descriptor to return
|
||||
let descriptor = DHTRecordDescriptor {
|
||||
key,
|
||||
owner,
|
||||
owner_secret,
|
||||
schema,
|
||||
};
|
||||
Ok(descriptor)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn open_record(
|
||||
&self,
|
||||
key: TypedKey,
|
||||
secret: Option<SecretKey>,
|
||||
writer: Option<KeyPair>,
|
||||
safety_selection: SafetySelection,
|
||||
) -> Result<DHTRecordDescriptor, VeilidAPIError> {
|
||||
unimplemented!();
|
||||
let inner = self.lock().await?;
|
||||
self.open_record_inner(inner, key, writer, safety_selection)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn close_record_inner(
|
||||
&self,
|
||||
mut inner: AsyncMutexGuardArc<StorageManagerInner>,
|
||||
key: TypedKey,
|
||||
) -> Result<(), VeilidAPIError> {
|
||||
let Some(opened_record) = inner.opened_records.remove(&key) else {
|
||||
apibail_generic!("record not open");
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn close_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> {
|
||||
unimplemented!();
|
||||
let inner = self.lock().await?;
|
||||
self.close_record_inner(inner, key).await
|
||||
}
|
||||
|
||||
pub async fn delete_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> {
|
||||
let inner = self.lock().await?;
|
||||
|
||||
// Ensure the record is closed
|
||||
if inner.opened_records.contains_key(&key) {
|
||||
self.close_record_inner(inner, key).await?;
|
||||
}
|
||||
|
||||
// Remove
|
||||
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
@@ -291,6 +363,7 @@ impl StorageManager {
|
||||
subkey: ValueSubkey,
|
||||
force_refresh: bool,
|
||||
) -> Result<Option<ValueData>, VeilidAPIError> {
|
||||
let inner = self.lock().await?;
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
@@ -300,6 +373,7 @@ impl StorageManager {
|
||||
subkey: ValueSubkey,
|
||||
data: Vec<u8>,
|
||||
) -> Result<Option<ValueData>, VeilidAPIError> {
|
||||
let inner = self.lock().await?;
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
@@ -310,6 +384,7 @@ impl StorageManager {
|
||||
expiration: Timestamp,
|
||||
count: u32,
|
||||
) -> Result<Timestamp, VeilidAPIError> {
|
||||
let inner = self.lock().await?;
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
@@ -318,6 +393,7 @@ impl StorageManager {
|
||||
key: TypedKey,
|
||||
subkeys: &[ValueSubkeyRange],
|
||||
) -> Result<bool, VeilidAPIError> {
|
||||
let inner = self.lock().await?;
|
||||
unimplemented!();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user