diff --git a/Cargo.lock b/Cargo.lock index 8ab4ed3b..bc19f650 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1849,6 +1849,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" + [[package]] name = "keccak-hash" version = "0.8.0" @@ -3898,6 +3904,7 @@ dependencies = [ "jni", "jni-sys", "js-sys", + "json", "keyring-manager", "keyvaluedb-sqlite", "keyvaluedb-web", diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index 0bbd5b80..fbd5b38a 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -21,7 +21,6 @@ log = "^0" cfg-if = "^1" anyhow = "^1" thiserror = "^1" - hex = "^0" generic-array = "^0" secrecy = "^0" @@ -33,6 +32,7 @@ parking_lot = "^0" lazy_static = "^1" directories = "^4" once_cell = "^1" +json = "^0" ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] } x25519-dalek = { package = "x25519-dalek-ng", version = "^1", default_features = false, features = ["u64_backend"] } diff --git a/veilid-core/src/api_logger.rs b/veilid-core/src/api_logger.rs index 5bc896d5..42baeeb0 100644 --- a/veilid-core/src/api_logger.rs +++ b/veilid-core/src/api_logger.rs @@ -8,7 +8,7 @@ use once_cell::sync::OnceCell; struct ApiLoggerInner { level: LevelFilter, filter_ignore: Cow<'static, [Cow<'static, str>]>, - _join_handle: JoinHandle<()>, + join_handle: Option>, tx: async_channel::Sender<(VeilidLogLevel, String)>, } @@ -22,7 +22,7 @@ static API_LOGGER: OnceCell = OnceCell::new(); impl ApiLogger { fn new_inner(level: LevelFilter, update_callback: UpdateCallback) -> ApiLoggerInner { let (tx, rx) = async_channel::unbounded::<(VeilidLogLevel, String)>(); - let _join_handle: JoinHandle<()> = spawn(async move { + let join_handle: Option> = Some(spawn(async move { while let Ok(v) = rx.recv().await { (update_callback)(VeilidUpdate::Log { log_level: v.0, @@ -30,16 +30,16 @@ impl ApiLogger { }) .await; } - }); + })); ApiLoggerInner { level, filter_ignore: Default::default(), - _join_handle, + join_handle, tx, } } - pub fn init(log_level: LevelFilter, update_callback: UpdateCallback) { + pub async fn init(log_level: LevelFilter, update_callback: UpdateCallback) { set_max_level(log_level); let api_logger = API_LOGGER.get_or_init(|| { let api_logger = ApiLogger { @@ -48,15 +48,28 @@ impl ApiLogger { set_boxed_logger(Box::new(api_logger.clone())).expect("failed to set api logger"); api_logger }); - - let mut inner = api_logger.inner.lock(); - *inner = Some(Self::new_inner(log_level, update_callback)); + let apilogger_inner = Some(Self::new_inner(log_level, update_callback)); + *api_logger.inner.lock() = apilogger_inner; } - pub fn terminate() { + pub async fn terminate() { if let Some(api_logger) = API_LOGGER.get() { - let mut inner = api_logger.inner.lock(); - *inner = None; + let mut join_handle = None; + { + let mut inner = api_logger.inner.lock(); + + // Terminate channel + if let Some(inner) = (*inner).as_mut() { + inner.tx.close(); + join_handle = inner.join_handle.take(); + } + *inner = None; + } + if let Some(jh) = join_handle { + jh.await; + } + + // Clear everything and we're done set_max_level(LevelFilter::Off); } } diff --git a/veilid-core/src/attachment_manager.rs b/veilid-core/src/attachment_manager.rs index 3357500c..39b1a9a0 100644 --- a/veilid-core/src/attachment_manager.rs +++ b/veilid-core/src/attachment_manager.rs @@ -263,6 +263,7 @@ impl AttachmentManager { &self, state_change_callback: StateChangeCallback, ) -> Result<(), String> { + trace!("init"); let network_manager = { let inner = self.inner.lock(); inner @@ -311,84 +312,39 @@ impl AttachmentManager { } } - async fn process_input(&self, input: &AttachmentInput) -> bool { + async fn process_input(&self, input: &AttachmentInput) -> Result<(), String> { let attachment_machine = self.inner.lock().attachment_machine.clone(); let output = attachment_machine.consume(input).await; match output { - Err(_) => { - error!("invalid input for state machine: {:?}", input); - false - } + Err(e) => Err(format!( + "invalid input '{:?}' for state machine in state '{:?}': {:?}", + input, + attachment_machine.state(), + e + )), Ok(v) => { if let Some(o) = v { self.handle_output(&o).await; } - true + Ok(()) } } } - pub async fn request_attach(&self) { - if !self.is_detached() { - trace!("attach request ignored"); - return; - } - if self.process_input(&AttachmentInput::AttachRequested).await { - trace!("attach requested"); - } else { - error!("attach request failed"); - } + pub async fn request_attach(&self) -> Result<(), String> { + self.process_input(&AttachmentInput::AttachRequested) + .await + .map_err(|e| format!("Attach request failed: {}", e)) } - pub async fn request_detach(&self) { - if !self.is_attached() { - trace!("detach request ignored"); - return; - } - if self.process_input(&AttachmentInput::DetachRequested).await { - trace!("detach requested"); - } else { - error!("detach request failed"); - } + pub async fn request_detach(&self) -> Result<(), String> { + self.process_input(&AttachmentInput::DetachRequested) + .await + .map_err(|e| format!("Attach request failed: {}", e)) } pub fn get_state(&self) -> AttachmentState { let attachment_machine = self.inner.lock().attachment_machine.clone(); attachment_machine.state() } - - // pub async fn wait_for_state(&self, state: AttachmentState, timeout_ms: Option) -> bool { - // let start_time = intf::get_timestamp(); - - // loop { - // let (current_state, eventual) = self - // .inner - // .lock() - // .attachment_machine - // .state_eventual_instance(); - // if current_state == state { - // break; - // } - // if let Some(timeout_ms) = timeout_ms { - // let timeout_time = start_time + (timeout_ms as u64 * 1000); - // let cur_time = intf::get_timestamp(); - // if timeout_time > cur_time { - // let timeout_dur_ms = ((timeout_time - cur_time) / 1000) as u32; - - // if match intf::timeout(timeout_dur_ms, eventual).await { - // Ok(v) => v, - // Err(_) => return false, - // } == state - // { - // return true; - // } - // } else { - // return false; - // } - // } else if eventual.await == state { - // break; - // } - // } - // true - // } } diff --git a/veilid-core/src/callback_state_machine.rs b/veilid-core/src/callback_state_machine.rs index f6d04505..50c07e59 100644 --- a/veilid-core/src/callback_state_machine.rs +++ b/veilid-core/src/callback_state_machine.rs @@ -69,10 +69,10 @@ where // self.inner.lock().callback = None; // } - pub fn state_eventual_instance(&self) -> (T::State, EventualValueCloneFuture) { - let inner = self.inner.lock(); - (inner.state, inner.eventual.instance()) - } + // pub fn state_eventual_instance(&self) -> (T::State, EventualValueCloneFuture) { + // let inner = self.inner.lock(); + // (inner.state, inner.eventual.instance()) + // } pub async fn consume(&self, input: &T::Input) -> Result, ()> { let current_state = self.inner.lock().state; diff --git a/veilid-core/src/core_context.rs b/veilid-core/src/core_context.rs index a330ce37..f84936c2 100644 --- a/veilid-core/src/core_context.rs +++ b/veilid-core/src/core_context.rs @@ -14,14 +14,175 @@ cfg_if! { } } +struct ServicesContext { + pub config: VeilidConfig, + pub update_callback: UpdateCallback, + + pub protected_store: Option, + pub table_store: Option, + pub block_store: Option, + pub crypto: Option, + pub attachment_manager: Option, +} + +impl ServicesContext { + pub fn new_empty(config: VeilidConfig, update_callback: UpdateCallback) -> Self { + Self { + config, + update_callback, + protected_store: None, + table_store: None, + block_store: None, + crypto: None, + attachment_manager: None, + } + } + + pub fn new_full( + config: VeilidConfig, + update_callback: UpdateCallback, + protected_store: ProtectedStore, + table_store: TableStore, + block_store: BlockStore, + crypto: Crypto, + attachment_manager: AttachmentManager, + ) -> Self { + Self { + config, + update_callback, + protected_store: Some(protected_store), + table_store: Some(table_store), + block_store: Some(block_store), + crypto: Some(crypto), + attachment_manager: Some(attachment_manager), + } + } + + pub async fn startup(&mut self) -> Result<(), VeilidAPIError> { + let api_log_level: VeilidConfigLogLevel = self.config.get().api_log_level; + if api_log_level != VeilidConfigLogLevel::Off { + ApiLogger::init( + api_log_level.to_level_filter(), + self.update_callback.clone(), + ) + .await; + for ig in crate::DEFAULT_LOG_IGNORE_LIST { + ApiLogger::add_filter_ignore_str(ig); + } + + info!("Veilid API logging initialized"); + } + + trace!("startup starting"); + + // Set up protected store + trace!("init protected store"); + let protected_store = ProtectedStore::new(self.config.clone()); + if let Err(e) = protected_store.init().await { + self.shutdown().await; + return Err(VeilidAPIError::Internal { message: e }); + } + self.protected_store = Some(protected_store.clone()); + + // Init node id from config now that protected store is set up + if let Err(e) = self.config.init_node_id(protected_store.clone()).await { + self.shutdown().await; + return Err(VeilidAPIError::Internal { message: e }); + } + + // Set up tablestore + trace!("init table store"); + let table_store = TableStore::new(self.config.clone()); + if let Err(e) = table_store.init().await { + self.shutdown().await; + return Err(VeilidAPIError::Internal { message: e }); + } + self.table_store = Some(table_store.clone()); + + // Set up crypto + trace!("init crypto"); + let crypto = Crypto::new(self.config.clone(), table_store.clone()); + if let Err(e) = crypto.init().await { + self.shutdown().await; + return Err(VeilidAPIError::Internal { message: e }); + } + self.crypto = Some(crypto.clone()); + + // Set up block store + trace!("init block store"); + let block_store = BlockStore::new(self.config.clone()); + if let Err(e) = block_store.init().await { + self.shutdown().await; + return Err(VeilidAPIError::Internal { message: e }); + } + self.block_store = Some(block_store.clone()); + + // Set up attachment manager + trace!("init attachment manager"); + let update_callback_move = self.update_callback.clone(); + let attachment_manager = AttachmentManager::new(self.config.clone(), table_store, crypto); + if let Err(e) = attachment_manager + .init(Arc::new( + move |_old_state: AttachmentState, new_state: AttachmentState| { + update_callback_move(VeilidUpdate::Attachment { state: new_state }) + }, + )) + .await + { + self.shutdown().await; + return Err(VeilidAPIError::Internal { message: e }); + } + self.attachment_manager = Some(attachment_manager); + + trace!("startup complete"); + Ok(()) + } + + pub async fn shutdown(&mut self) { + trace!("shutdown starting"); + + if let Some(attachment_manager) = &mut self.attachment_manager { + trace!("terminate attachment manager"); + attachment_manager.terminate().await; + } + if let Some(block_store) = &mut self.block_store { + trace!("terminate block store"); + block_store.terminate().await; + } + if let Some(crypto) = &mut self.crypto { + trace!("terminate crypto"); + crypto.terminate().await; + } + if let Some(table_store) = &mut self.table_store { + trace!("terminate table store"); + table_store.terminate().await; + } + if let Some(protected_store) = &mut self.protected_store { + trace!("terminate protected store"); + protected_store.terminate().await; + } + + trace!("shutdown complete"); + + // api logger terminate is idempotent + ApiLogger::terminate().await; + + // send final shutdown update + (self.update_callback)(VeilidUpdate::Shutdown).await; + } +} + +///////////////////////////////////////////////////////////////////////////// +/// pub struct VeilidCoreContext { pub config: VeilidConfig, + pub update_callback: UpdateCallback, + // Services pub protected_store: ProtectedStore, pub table_store: TableStore, pub block_store: BlockStore, pub crypto: Crypto, pub attachment_manager: AttachmentManager, - pub update_callback: UpdateCallback, } impl VeilidCoreContext { @@ -30,9 +191,9 @@ impl VeilidCoreContext { config_callback: ConfigCallback, ) -> Result { // Set up config from callback - trace!("VeilidCoreContext::new_with_config_callback init config"); + trace!("setup config with callback"); let mut config = VeilidConfig::new(); - if let Err(e) = config.init(config_callback).await { + if let Err(e) = config.setup(config_callback) { return Err(VeilidAPIError::Internal { message: e }); } @@ -44,12 +205,11 @@ impl VeilidCoreContext { config_json: String, ) -> Result { // Set up config from callback - trace!("VeilidCoreContext::new_with_config_json init config"); + trace!("setup config with json"); let mut config = VeilidConfig::new(); - if let Err(e) = config.init_from_json(config_json).await { + if let Err(e) = config.setup_from_json(config_json) { return Err(VeilidAPIError::Internal { message: e }); } - Self::new_common(update_callback, config).await } @@ -67,114 +227,31 @@ impl VeilidCoreContext { } } - // Start up api logging - let api_log_level: VeilidConfigLogLevel = config.get().api_log_level; - if api_log_level != VeilidConfigLogLevel::Off { - ApiLogger::init(api_log_level.to_level_filter(), update_callback.clone()); - for ig in crate::DEFAULT_LOG_IGNORE_LIST { - ApiLogger::add_filter_ignore_str(ig); - } - info!("Veilid API logging initialized"); - } - - // Set up protected store - trace!("VeilidCoreContext::new init protected store"); - let protected_store = ProtectedStore::new(config.clone()); - if let Err(e) = protected_store.init().await { - config.terminate().await; - ApiLogger::terminate(); - return Err(VeilidAPIError::Internal { message: e }); - } - - // Init node id from config now that protected store is set up - if let Err(e) = config.init_node_id(protected_store.clone()).await { - protected_store.terminate().await; - config.terminate().await; - ApiLogger::terminate(); - return Err(VeilidAPIError::Internal { message: e }); - } - - // Set up tablestore - trace!("VeilidCoreContext::new init table store"); - let table_store = TableStore::new(config.clone()); - if let Err(e) = table_store.init().await { - protected_store.terminate().await; - config.terminate().await; - ApiLogger::terminate(); - return Err(VeilidAPIError::Internal { message: e }); - } - - // Set up crypto - trace!("VeilidCoreContext::new init crypto"); - let crypto = Crypto::new(config.clone(), table_store.clone()); - if let Err(e) = crypto.init().await { - table_store.terminate().await; - protected_store.terminate().await; - config.terminate().await; - ApiLogger::terminate(); - return Err(VeilidAPIError::Internal { message: e }); - } - - // Set up block store - trace!("VeilidCoreContext::new init block store"); - let block_store = BlockStore::new(config.clone()); - if let Err(e) = block_store.init().await { - crypto.terminate().await; - table_store.terminate().await; - protected_store.terminate().await; - config.terminate().await; - ApiLogger::terminate(); - return Err(VeilidAPIError::Internal { message: e }); - } - - // Set up attachment manager - trace!("VeilidCoreContext::new init attachment manager"); - let update_callback_move = update_callback.clone(); - let attachment_manager = - AttachmentManager::new(config.clone(), table_store.clone(), crypto.clone()); - if let Err(e) = attachment_manager - .init(Arc::new( - move |_old_state: AttachmentState, new_state: AttachmentState| { - update_callback_move(VeilidUpdate::Attachment { state: new_state }) - }, - )) - .await - { - block_store.terminate().await; - crypto.terminate().await; - table_store.terminate().await; - protected_store.terminate().await; - config.terminate().await; - ApiLogger::terminate(); - return Err(VeilidAPIError::Internal { message: e }); - } + let mut sc = ServicesContext::new_empty(config.clone(), update_callback); + sc.startup().await?; Ok(VeilidCoreContext { - config, - protected_store, - table_store, - block_store, - crypto, - attachment_manager, - update_callback, + update_callback: sc.update_callback, + config: sc.config, + protected_store: sc.protected_store.unwrap(), + table_store: sc.table_store.unwrap(), + block_store: sc.block_store.unwrap(), + crypto: sc.crypto.unwrap(), + attachment_manager: sc.attachment_manager.unwrap(), }) } async fn shutdown(self) { - trace!("VeilidCoreContext::terminate_core_context starting"); - - self.attachment_manager.terminate().await; - self.block_store.terminate().await; - self.crypto.terminate().await; - self.table_store.terminate().await; - self.protected_store.terminate().await; - self.config.terminate().await; - - // send final shutdown update - (self.update_callback)(VeilidUpdate::Shutdown).await; - - trace!("VeilidCoreContext::shutdown complete"); - ApiLogger::terminate(); + let mut sc = ServicesContext::new_full( + self.config.clone(), + self.update_callback.clone(), + self.protected_store, + self.table_store, + self.block_store, + self.crypto, + self.attachment_manager, + ); + sc.shutdown().await; } } @@ -225,7 +302,7 @@ pub async fn api_startup_json( Ok(veilid_api) } -pub async fn api_shutdown(context: VeilidCoreContext) { +pub(crate) async fn api_shutdown(context: VeilidCoreContext) { let mut initialized_lock = INITIALIZED.lock().await; context.shutdown().await; *initialized_lock = false; diff --git a/veilid-core/src/dht/crypto.rs b/veilid-core/src/dht/crypto.rs index ba71dbf8..a85d9dd5 100644 --- a/veilid-core/src/dht/crypto.rs +++ b/veilid-core/src/dht/crypto.rs @@ -145,7 +145,6 @@ impl Crypto { let db = table_store.open("crypto_caches", 1).await?; db.store(0, b"dh_cache", &cache_bytes).await?; - Ok(()) } diff --git a/veilid-core/src/intf/native/network/mod.rs b/veilid-core/src/intf/native/network/mod.rs index 7d011a2d..97c3aa5b 100644 --- a/veilid-core/src/intf/native/network/mod.rs +++ b/veilid-core/src/intf/native/network/mod.rs @@ -52,10 +52,12 @@ struct NetworkInner { wss_port: u16, interfaces: NetworkInterfaces, // udp + bound_first_udp: BTreeMap, inbound_udp_protocol_handlers: BTreeMap, outbound_udpv4_protocol_handler: Option, outbound_udpv6_protocol_handler: Option, //tcp + bound_first_tcp: BTreeMap, tls_acceptor: Option, listener_states: BTreeMap>>, } @@ -91,9 +93,11 @@ impl Network { ws_port: 0u16, wss_port: 0u16, interfaces: NetworkInterfaces::new(), + bound_first_udp: BTreeMap::new(), inbound_udp_protocol_handlers: BTreeMap::new(), outbound_udpv4_protocol_handler: None, outbound_udpv6_protocol_handler: None, + bound_first_tcp: BTreeMap::new(), tls_acceptor: None, listener_states: BTreeMap::new(), } @@ -420,6 +424,10 @@ impl Network { if protocol_config.tcp_listen { self.start_tcp_listeners().await?; } + // release caches of available listener ports + // this releases the 'first bound' ports we use to guarantee + // that we have ports available to us + self.free_bound_first_ports(); info!("network started"); self.inner.lock().network_started = true; diff --git a/veilid-core/src/intf/native/network/protocol/mod.rs b/veilid-core/src/intf/native/network/protocol/mod.rs index f69be868..7099eb4e 100644 --- a/veilid-core/src/intf/native/network/protocol/mod.rs +++ b/veilid-core/src/intf/native/network/protocol/mod.rs @@ -89,15 +89,17 @@ impl ProtocolNetworkConnection { pub fn new_unbound_shared_udp_socket(domain: Domain) -> Result { let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)) .map_err(|e| format!("Couldn't create UDP socket: {}", e))?; - - if let Err(e) = socket.set_reuse_address(true) { - log_net!(error "Couldn't set reuse address: {}", e); + if domain == Domain::IPV6 { + socket + .set_only_v6(true) + .map_err(|e| format!("Couldn't set IPV6_V6ONLY: {}", e))?; } + socket + .set_reuse_address(true) + .map_err(|e| format!("Couldn't set reuse address: {}", e))?; cfg_if! { if #[cfg(unix)] { - if let Err(e) = socket.set_reuse_port(true) { - log_net!(error "Couldn't set reuse port: {}", e); - } + socket.set_reuse_port(true).map_err(|e| format!("Couldn't set reuse port: {}", e))?; } } Ok(socket) @@ -116,6 +118,36 @@ pub fn new_bound_shared_udp_socket(local_address: SocketAddr) -> Result Result { + let domain = Domain::for_address(local_address); + let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)) + .map_err(|e| format!("Couldn't create UDP socket: {}", e))?; + if domain == Domain::IPV6 { + socket + .set_only_v6(true) + .map_err(|e| format!("Couldn't set IPV6_V6ONLY: {}", e))?; + } + // Bind the socket -first- before turning on 'reuse address' this way it will + // fail if the port is already taken + let socket2_addr = socket2::SockAddr::from(local_address); + socket + .bind(&socket2_addr) + .map_err(|e| format!("failed to bind UDP socket: {}", e))?; + + // Set 'reuse address' so future binds to this port will succeed + socket + .set_reuse_address(true) + .map_err(|e| format!("Couldn't set reuse address: {}", e))?; + cfg_if! { + if #[cfg(unix)] { + socket.set_reuse_port(true).map_err(|e| format!("Couldn't set reuse port: {}", e))?; + } + } + log_net!("created shared udp socket on {:?}", &local_address); + + Ok(socket) +} + pub fn new_unbound_shared_tcp_socket(domain: Domain) -> Result { let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)) .map_err(map_to_string) @@ -126,14 +158,17 @@ pub fn new_unbound_shared_tcp_socket(domain: Domain) -> Result Result Result { + let domain = Domain::for_address(local_address); + + let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)) + .map_err(map_to_string) + .map_err(logthru_net!("failed to create TCP socket"))?; + if let Err(e) = socket.set_linger(None) { + log_net!(error "Couldn't set TCP linger: {}", e); + } + if let Err(e) = socket.set_nodelay(true) { + log_net!(error "Couldn't set TCP nodelay: {}", e); + } + if domain == Domain::IPV6 { + socket + .set_only_v6(true) + .map_err(|e| format!("Couldn't set IPV6_V6ONLY: {}", e))?; + } + // Bind the socket -first- before turning on 'reuse address' this way it will + // fail if the port is already taken + + let socket2_addr = socket2::SockAddr::from(local_address); + socket + .bind(&socket2_addr) + .map_err(|e| format!("failed to bind TCP socket: {}", e))?; + + // Set 'reuse address' so future binds to this port will succeed + socket + .set_reuse_address(true) + .map_err(|e| format!("Couldn't set reuse address: {}", e))?; + cfg_if! { + if #[cfg(unix)] { + socket.set_reuse_port(true).map_err(|e| format!("Couldn't set reuse port: {}", e))?; + } + } + Ok(socket) +} diff --git a/veilid-core/src/intf/native/network/protocol/udp.rs b/veilid-core/src/intf/native/network/protocol/udp.rs index efaddfc0..cfdd0786 100644 --- a/veilid-core/src/intf/native/network/protocol/udp.rs +++ b/veilid-core/src/intf/native/network/protocol/udp.rs @@ -29,7 +29,6 @@ impl RawUdpProtocolHandler { remote_addr ); - // Process envelope let peer_addr = PeerAddress::new( SocketAddress::from_socket_addr(remote_addr), ProtocolType::UDP, diff --git a/veilid-core/src/intf/native/network/start_protocols.rs b/veilid-core/src/intf/native/network/start_protocols.rs index 3ad2648d..8eee6e50 100644 --- a/veilid-core/src/intf/native/network/start_protocols.rs +++ b/veilid-core/src/intf/native/network/start_protocols.rs @@ -1,11 +1,148 @@ use super::*; impl Network { - pub(super) async fn start_udp_listeners(&self) -> Result<(), String> { - // First, create outbound sockets and we'll listen on them too - self.create_udp_outbound_sockets().await?; + ///////////////////////////////////////////////////// + // Support for binding first on ports to ensure nobody binds ahead of us + // or two copies of the app don't accidentally collide. This is tricky + // because we use 'reuseaddr/port' and we can accidentally bind in front of ourselves :P - // Now create udp inbound sockets for whatever interfaces we're listening on + fn bind_first_udp_port(&self, udp_port: u16) -> bool { + let mut inner = self.inner.lock(); + if inner.bound_first_udp.contains_key(&udp_port) { + return true; + } + // If the address is specified, only use the specified port and fail otherwise + let mut bound_first_socket_v4 = None; + let mut bound_first_socket_v6 = None; + if let Ok(bfs4) = + new_bound_first_udp_socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), udp_port)) + { + if let Ok(bfs6) = new_bound_first_udp_socket(SocketAddr::new( + IpAddr::V6(Ipv6Addr::UNSPECIFIED), + udp_port, + )) { + bound_first_socket_v4 = Some(bfs4); + bound_first_socket_v6 = Some(bfs6); + } + } + if let (Some(bfs4), Some(bfs6)) = (bound_first_socket_v4, bound_first_socket_v6) { + inner.bound_first_udp.insert(udp_port, (bfs4, bfs6)); + true + } else { + false + } + } + + fn bind_first_tcp_port(&self, tcp_port: u16) -> bool { + let mut inner = self.inner.lock(); + if inner.bound_first_tcp.contains_key(&tcp_port) { + return true; + } + // If the address is specified, only use the specified port and fail otherwise + let mut bound_first_socket_v4 = None; + let mut bound_first_socket_v6 = None; + if let Ok(bfs4) = + new_bound_first_tcp_socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), tcp_port)) + { + if let Ok(bfs6) = new_bound_first_tcp_socket(SocketAddr::new( + IpAddr::V6(Ipv6Addr::UNSPECIFIED), + tcp_port, + )) { + bound_first_socket_v4 = Some(bfs4); + bound_first_socket_v6 = Some(bfs6); + } + } + if let (Some(bfs4), Some(bfs6)) = (bound_first_socket_v4, bound_first_socket_v6) { + inner.bound_first_tcp.insert(tcp_port, (bfs4, bfs6)); + true + } else { + false + } + } + + pub(super) fn free_bound_first_ports(&self) { + let mut inner = self.inner.lock(); + inner.bound_first_udp.clear(); + inner.bound_first_tcp.clear(); + } + + ///////////////////////////////////////////////////// + + fn find_available_udp_port(&self) -> Result { + // If the address is empty, iterate ports until we find one we can use. + let mut udp_port = 5150u16; + loop { + if self.bind_first_udp_port(udp_port) { + break; + } + if udp_port == 65535 { + return Err("Could not find free udp port to listen on".to_owned()); + } + udp_port += 1; + } + Ok(udp_port) + } + + fn find_available_tcp_port(&self) -> Result { + // If the address is empty, iterate ports until we find one we can use. + let mut tcp_port = 5150u16; + loop { + if self.bind_first_tcp_port(tcp_port) { + break; + } + if tcp_port == 65535 { + return Err("Could not find free tcp port to listen on".to_owned()); + } + tcp_port += 1; + } + Ok(tcp_port) + } + + async fn allocate_udp_port(&self, listen_address: String) -> Result { + if listen_address.is_empty() { + // If listen address is empty, find us a port iteratively + self.find_available_udp_port() + } else if let Some(sa) = listen_address + .to_socket_addrs() + .await + .map_err(|e| format!("Unable to resolve address: {}\n{}", listen_address, e))? + .next() + { + // If the address is specified, only use the specified port and fail otherwise + if self.bind_first_udp_port(sa.port()) { + Ok(sa.port()) + } else { + Err("Could not find free udp port to listen on".to_owned()) + } + } else { + Err(format!("No valid listen address: {}", listen_address)) + } + } + + async fn allocate_tcp_port(&self, listen_address: String) -> Result { + if listen_address.is_empty() { + // If listen address is empty, find us a port iteratively + self.find_available_tcp_port() + } else if let Some(sa) = listen_address + .to_socket_addrs() + .await + .map_err(|e| format!("Unable to resolve address: {}\n{}", listen_address, e))? + .next() + { + // If the address is specified, only use the specified port and fail otherwise + if self.bind_first_tcp_port(sa.port()) { + Ok(sa.port()) + } else { + Err("Could not find free tcp port to listen on".to_owned()) + } + } else { + Err(format!("No valid listen address: {}", listen_address)) + } + } + + ///////////////////////////////////////////////////// + + pub(super) async fn start_udp_listeners(&self) -> Result<(), String> { let routing_table = self.routing_table(); let (listen_address, public_address) = { let c = self.config.get(); @@ -14,15 +151,27 @@ impl Network { c.network.protocol.udp.public_address.clone(), ) }; + + // Pick out UDP port we're going to use everywhere + // Keep sockets around until the end of this function + // to keep anyone else from binding in front of us + let udp_port = self.allocate_udp_port(listen_address.clone()).await?; + + // Save the bound udp port for use later on + self.inner.lock().udp_port = udp_port; + + // First, create outbound sockets + // (unlike tcp where we create sockets for every connection) + // and we'll add protocol handlers for them too + self.create_udp_outbound_sockets().await?; + + // Now create udp inbound sockets for whatever interfaces we're listening on info!("UDP: starting listener at {:?}", listen_address); let dial_infos = self .create_udp_inbound_sockets(listen_address.clone()) .await?; let mut static_public = false; for di in &dial_infos { - // Pick out UDP port for outbound connections (they will all be the same) - self.inner.lock().udp_port = di.port(); - // Register local dial info only here if we specify a public address if public_address.is_none() && di.is_global() { // Register global dial info if no public address is specified @@ -73,6 +222,15 @@ impl Network { c.network.protocol.ws.path.clone(), ) }; + + // Pick out TCP port we're going to use everywhere + // Keep sockets around until the end of this function + // to keep anyone else from binding in front of us + let ws_port = self.allocate_tcp_port(listen_address.clone()).await?; + + // Save the bound ws port for use later on + self.inner.lock().ws_port = ws_port; + trace!("WS: starting listener at {:?}", listen_address); let socket_addresses = self .start_tcp_listener( @@ -85,9 +243,6 @@ impl Network { let mut static_public = false; for socket_address in socket_addresses { - // Pick out WS port for outbound connections (they will all be the same) - self.inner.lock().ws_port = socket_address.port(); - if url.is_none() && socket_address.address().is_global() { // Build global dial info request url let global_url = format!("ws://{}/{}", socket_address, path); @@ -155,8 +310,17 @@ impl Network { c.network.protocol.wss.url.clone(), ) }; + + // Pick out TCP port we're going to use everywhere + // Keep sockets around until the end of this function + // to keep anyone else from binding in front of us + let wss_port = self.allocate_tcp_port(listen_address.clone()).await?; + + // Save the bound wss port for use later on + self.inner.lock().wss_port = wss_port; + trace!("WSS: starting listener at {}", listen_address); - let socket_addresses = self + let _socket_addresses = self .start_tcp_listener( listen_address.clone(), true, @@ -169,11 +333,6 @@ impl Network { // If the hostname is specified, it is the public dialinfo via the URL. If no hostname // is specified, then TLS won't validate, so no local dialinfo is possible. // This is not the case with unencrypted websockets, which can be specified solely by an IP address - // - if let Some(socket_address) = socket_addresses.first() { - // Pick out WSS port for outbound connections (they will all be the same) - self.inner.lock().wss_port = socket_address.port(); - } // Add static public dialinfo if it's configured if let Some(url) = url.as_ref() { @@ -217,6 +376,15 @@ impl Network { c.network.protocol.tcp.public_address.clone(), ) }; + + // Pick out TCP port we're going to use everywhere + // Keep sockets around until the end of this function + // to keep anyone else from binding in front of us + let tcp_port = self.allocate_tcp_port(listen_address.clone()).await?; + + // Save the bound tcp port for use later on + self.inner.lock().tcp_port = tcp_port; + trace!("TCP: starting listener at {}", &listen_address); let socket_addresses = self .start_tcp_listener( @@ -229,9 +397,6 @@ impl Network { let mut static_public = false; for socket_address in socket_addresses { - // Pick out TCP port for outbound connections (they will all be the same) - self.inner.lock().tcp_port = socket_address.port(); - let di = DialInfo::tcp(socket_address); // Register local dial info only here if we specify a public address diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 76696a6f..ad8b131a 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -289,6 +289,23 @@ impl RoutingTable { *self.inner.lock() = Self::new_inner(self.network_manager()); } + // Attempt to empty the routing table + // should only be performed when there are no node_refs (detached) + pub fn purge(&self) { + let mut inner = self.inner.lock(); + log_rtab!( + "Starting routing table purge. Table currently has {} nodes", + inner.bucket_entry_count + ); + for bucket in &mut inner.buckets { + bucket.kick(0); + } + log_rtab!( + "Routing table purge complete. Routing table now has {} nodes", + inner.bucket_entry_count + ); + } + // Attempt to settle buckets and remove entries down to the desired number // which may not be possible due extant NodeRefs fn kick_bucket(inner: &mut RoutingTableInner, idx: usize) { diff --git a/veilid-core/src/tests/common/test_veilid_config.rs b/veilid-core/src/tests/common/test_veilid_config.rs index 9d569dc7..ac2e770a 100644 --- a/veilid-core/src/tests/common/test_veilid_config.rs +++ b/veilid-core/src/tests/common/test_veilid_config.rs @@ -195,7 +195,7 @@ fn config_callback(key: String) -> ConfigCallbackReturn { "network.connection_initial_timeout_ms" => Ok(Box::new(2_000u32)), "network.node_id" => Ok(Box::new(dht::key::DHTKey::default())), "network.node_id_secret" => Ok(Box::new(dht::key::DHTKeySecret::default())), - "network.bootstrap" => Ok(Box::new(vec![String::from("asdf"), String::from("qwer")])), + "network.bootstrap" => Ok(Box::new(Vec::::new())), "network.rpc.concurrency" => Ok(Box::new(2u32)), "network.rpc.queue_size" => Ok(Box::new(128u32)), "network.rpc.max_timestamp_behind_ms" => Ok(Box::new(Some(10_000u32))), @@ -265,7 +265,7 @@ fn config_callback(key: String) -> ConfigCallbackReturn { pub async fn test_config() { let mut vc = VeilidConfig::new(); - match vc.init(Arc::new(config_callback)).await { + match vc.setup(Arc::new(config_callback)) { Ok(()) => (), Err(e) => { error!("Error: {}", e); @@ -299,10 +299,7 @@ pub async fn test_config() { assert_eq!(inner.network.connection_initial_timeout_ms, 2_000u32); assert!(!inner.network.node_id.valid); assert!(!inner.network.node_id_secret.valid); - assert_eq!( - inner.network.bootstrap, - vec![String::from("asdf"), String::from("qwer")] - ); + assert_eq!(inner.network.bootstrap, Vec::::new()); assert_eq!(inner.network.rpc.concurrency, 2u32); assert_eq!(inner.network.rpc.queue_size, 128u32); assert_eq!(inner.network.rpc.timeout_ms, 10_000u32); diff --git a/veilid-core/src/tests/common/test_veilid_core.rs b/veilid-core/src/tests/common/test_veilid_core.rs index 49d97fa0..e84828a0 100644 --- a/veilid-core/src/tests/common/test_veilid_core.rs +++ b/veilid-core/src/tests/common/test_veilid_core.rs @@ -39,7 +39,7 @@ pub async fn test_attach_detach() { let api = api_startup(update_callback, config_callback) .await .expect("startup failed"); - api.detach().await.unwrap(); + assert!(api.detach().await.is_err()); api.shutdown().await; } diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 01f90fee..13448c8a 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -63,11 +63,12 @@ fn get_debug_argument_at Option>( } impl VeilidAPI { - async fn debug_buckets(&self, debug_args: &[String]) -> Result { + async fn debug_buckets(&self, args: String) -> Result { + let args: Vec = args.split_whitespace().map(|s| s.to_owned()).collect(); let mut min_state = BucketEntryState::Unreliable; - if debug_args.len() == 1 { + if args.len() == 1 { min_state = get_debug_argument( - &debug_args[0], + &args[0], "debug_buckets", "min_state", get_bucket_entry_state, @@ -79,26 +80,28 @@ impl VeilidAPI { Ok(routing_table.debug_info_buckets(min_state)) } - async fn debug_dialinfo(&self, _debug_args: &[String]) -> Result { + async fn debug_dialinfo(&self, _args: String) -> Result { // Dump routing table dialinfo let rpc = self.rpc_processor()?; let routing_table = rpc.routing_table(); Ok(routing_table.debug_info_dialinfo()) } - async fn debug_entries(&self, debug_args: &[String]) -> Result { + async fn debug_entries(&self, args: String) -> Result { + let args: Vec = args.split_whitespace().map(|s| s.to_owned()).collect(); + let mut min_state = BucketEntryState::Unreliable; let mut limit = 20; - for arg in debug_args { - if let Some(ms) = get_bucket_entry_state(arg) { + for arg in args { + if let Some(ms) = get_bucket_entry_state(&arg) { min_state = ms; - } else if let Some(lim) = get_number(arg) { + } else if let Some(lim) = get_number(&arg) { limit = lim; } else { return Err(VeilidAPIError::InvalidArgument { context: "debug_entries".to_owned(), argument: "unknown".to_owned(), - value: arg.clone(), + value: arg, }); } } @@ -109,8 +112,10 @@ impl VeilidAPI { Ok(routing_table.debug_info_entries(limit, min_state)) } - async fn debug_entry(&self, debug_args: &[String]) -> Result { - let node_id = get_debug_argument_at(debug_args, 0, "debug_entry", "node_id", get_dht_key)?; + async fn debug_entry(&self, args: String) -> Result { + let args: Vec = args.split_whitespace().map(|s| s.to_owned()).collect(); + + let node_id = get_debug_argument_at(&args, 0, "debug_entry", "node_id", get_dht_key)?; // Dump routing table entry let rpc = self.rpc_processor()?; @@ -118,41 +123,149 @@ impl VeilidAPI { Ok(routing_table.debug_info_entry(node_id)) } - async fn debug_nodeinfo(&self, _debug_args: &[String]) -> Result { + async fn debug_nodeinfo(&self, _args: String) -> Result { // Dump routing table entry let rpc = self.rpc_processor()?; let routing_table = rpc.routing_table(); Ok(routing_table.debug_info_nodeinfo()) } - pub async fn debug(&self, what: String) -> Result { - trace!("VeilidCore::debug"); - let debug_args: Vec = what - .split_ascii_whitespace() - .map(|s| s.to_owned()) - .collect(); - if debug_args.is_empty() { - return Ok(r#">>> Debug commands: - buckets [dead|reliable] - dialinfo - entries [dead|reliable] [limit] - entry [node_id] - nodeinfo -"# - .to_owned()); + async fn debug_config(&self, args: String) -> Result { + let config = self.config()?; + let args = args.trim_start(); + if args.is_empty() { + return config + .get_key_json("") + .map_err(|e| VeilidAPIError::Internal { message: e }); } + let (arg, rest) = args.split_once(' ').unwrap_or((args, "")); + let rest = rest.trim_start().to_owned(); + + // Must be detached + if matches!( + self.get_state().await?.attachment, + AttachmentState::Detached | AttachmentState::Detaching + ) { + return Err(VeilidAPIError::Internal { + message: "Must be detached to change config".to_owned(), + }); + } + + // One argument is 'config get' + if rest.is_empty() { + return config + .get_key_json(arg) + .map_err(|e| VeilidAPIError::Internal { message: e }); + } + config + .set_key_json(arg, &rest) + .map_err(|e| VeilidAPIError::Internal { message: e })?; + Ok("Config value set".to_owned()) + } + + async fn debug_purge(&self, args: String) -> Result { + let args: Vec = args.split_whitespace().map(|s| s.to_owned()).collect(); + if !args.is_empty() { + if args[0] == "buckets" { + // Must be detached + if matches!( + self.get_state().await?.attachment, + AttachmentState::Detached | AttachmentState::Detaching + ) { + return Err(VeilidAPIError::Internal { + message: "Must be detached to purge".to_owned(), + }); + } + self.network_manager()?.routing_table().purge(); + Ok("Buckets purged".to_owned()) + } else { + Err(VeilidAPIError::InvalidArgument { + context: "debug_purge".to_owned(), + argument: "parameter".to_owned(), + value: args[0].clone(), + }) + } + } else { + Err(VeilidAPIError::MissingArgument { + context: "debug_purge".to_owned(), + argument: "parameter".to_owned(), + }) + } + } + + async fn debug_attach(&self, _args: String) -> Result { + if !matches!( + self.get_state().await?.attachment, + AttachmentState::Detached + ) { + return Err(VeilidAPIError::Internal { + message: "Not detached".to_owned(), + }); + }; + + self.attach().await?; + + Ok("Attached".to_owned()) + } + + async fn debug_detach(&self, _args: String) -> Result { + if matches!( + self.get_state().await?.attachment, + AttachmentState::Detaching + ) { + return Err(VeilidAPIError::Internal { + message: "Not attached".to_owned(), + }); + }; + + self.detach().await?; + + Ok("Detached".to_owned()) + } + + pub async fn debug_help(&self, _args: String) -> Result { + Ok(r#">>> Debug commands: + buckets [dead|reliable] + dialinfo + entries [dead|reliable] [limit] + entry [node_id] + nodeinfo + config [key [new value]] + purge buckets + attach + detach + "# + .to_owned()) + } + + pub async fn debug(&self, args: String) -> Result { + let args = args.trim_start(); + if args.is_empty() { + // No arguments runs help command + return self.debug_help("".to_owned()).await; + } + let (arg, rest) = args.split_once(' ').unwrap_or((args, "")); + let rest = rest.trim_start().to_owned(); + let mut out = String::new(); - let arg = &debug_args[0]; if arg == "buckets" { - out += self.debug_buckets(&debug_args[1..]).await?.as_str(); + out += self.debug_buckets(rest).await?.as_str(); } else if arg == "dialinfo" { - out += self.debug_dialinfo(&debug_args[1..]).await?.as_str(); + out += self.debug_dialinfo(rest).await?.as_str(); } else if arg == "entries" { - out += self.debug_entries(&debug_args[1..]).await?.as_str(); + out += self.debug_entries(rest).await?.as_str(); } else if arg == "entry" { - out += self.debug_entry(&debug_args[1..]).await?.as_str(); + out += self.debug_entry(rest).await?.as_str(); } else if arg == "nodeinfo" { - out += self.debug_nodeinfo(&debug_args[1..]).await?.as_str(); + out += self.debug_nodeinfo(rest).await?.as_str(); + } else if arg == "purge" { + out += self.debug_purge(rest).await?.as_str(); + } else if arg == "attach" { + out += self.debug_attach(rest).await?.as_str(); + } else if arg == "detach" { + out += self.debug_detach(rest).await?.as_str(); + } else if arg == "config" { + out += self.debug_config(rest).await?.as_str(); } else { out += ">>> Unknown command\n"; } diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 6092bb1f..45b623af 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -1194,7 +1194,6 @@ impl VeilidAPI { // get a full copy of the current state pub async fn get_state(&self) -> Result { - trace!("VeilidCore::get_state"); let attachment_manager = self.attachment_manager()?; Ok(VeilidState { attachment: attachment_manager.get_state(), @@ -1203,18 +1202,20 @@ impl VeilidAPI { // connect to the network pub async fn attach(&self) -> Result<(), VeilidAPIError> { - trace!("VeilidCore::attach"); let attachment_manager = self.attachment_manager()?; - attachment_manager.request_attach().await; - Ok(()) + attachment_manager + .request_attach() + .await + .map_err(|e| VeilidAPIError::Internal { message: e }) } // disconnect from the network pub async fn detach(&self) -> Result<(), VeilidAPIError> { - trace!("VeilidCore::detach"); let attachment_manager = self.attachment_manager()?; - attachment_manager.request_detach().await; - Ok(()) + attachment_manager + .request_detach() + .await + .map_err(|e| VeilidAPIError::Internal { message: e }) } // Change api logging level if it is enabled diff --git a/veilid-core/src/veilid_config.rs b/veilid-core/src/veilid_config.rs index 98131514..e50a3dc0 100644 --- a/veilid-core/src/veilid_config.rs +++ b/veilid-core/src/veilid_config.rs @@ -235,19 +235,19 @@ impl VeilidConfig { } } - pub async fn init_from_json(&mut self, config: String) -> Result<(), String> { + pub fn setup_from_json(&mut self, config: String) -> Result<(), String> { { let mut inner = self.inner.write(); *inner = serde_json::from_str(&config).map_err(map_to_string)?; } // Validate settings - self.validate().await?; + self.validate()?; Ok(()) } - pub async fn init(&mut self, cb: ConfigCallback) -> Result<(), String> { + pub fn setup(&mut self, cb: ConfigCallback) -> Result<(), String> { macro_rules! get_config { ($key:expr) => { let keyname = &stringify!($key)[6..]; @@ -258,7 +258,6 @@ impl VeilidConfig { })?; }; } - { let mut inner = self.inner.write(); get_config!(inner.program_name); @@ -345,20 +344,78 @@ impl VeilidConfig { get_config!(inner.network.leases.max_client_relay_leases); } // Validate settings - self.validate().await?; + self.validate()?; Ok(()) } - pub async fn terminate(&self) { - // - } - pub fn get(&self) -> RwLockReadGuard { self.inner.read() } - async fn validate(&self) -> Result<(), String> { + pub fn get_mut(&self) -> RwLockWriteGuard { + self.inner.write() + } + + pub fn get_key_json(&self, key: &str) -> Result { + let c = self.get(); + // Split key into path parts + let keypath: Vec<&str> = key.split('.').collect(); + + // Generate json from whole config + let jc = serde_json::to_string(&*c).map_err(map_to_string)?; + let jvc = json::parse(&jc).map_err(map_to_string)?; + + // Find requested subkey + let mut out = &jvc; + for k in keypath { + if !jvc.has_key(k) { + return Err(format!("invalid subkey '{}' in key '{}'", k, key)); + } + out = &jvc[k]; + } + Ok(out.to_string()) + } + pub fn set_key_json(&self, key: &str, value: &str) -> Result<(), String> { + let mut c = self.get_mut(); + + // Split key into path parts + let keypath: Vec<&str> = key.split('.').collect(); + + // Convert value into jsonvalue + let newval = json::parse(value).map_err(map_to_string)?; + + // Generate json from whole config + let jc = serde_json::to_string(&*c).map_err(map_to_string)?; + let mut jvc = json::parse(&jc).map_err(map_to_string)?; + + // Find requested subkey + let newconfigstring = if let Some((objkeyname, objkeypath)) = keypath.split_last() { + // Replace subkey + let mut out = &mut jvc; + for k in objkeypath { + if !jvc.has_key(*k) { + return Err(format!("invalid subkey '{}' in key '{}'", *k, key)); + } + out = &mut jvc[*k]; + } + if !out.has_key(objkeyname) { + return Err(format!("invalid subkey '{}' in key '{}'", objkeyname, key)); + } + out[*objkeyname] = newval; + jvc.to_string() + } else { + newval.to_string() + }; + // Generate and validate new config + let mut newconfig = VeilidConfig::new(); + newconfig.setup_from_json(newconfigstring)?; + // Replace whole config + *c = newconfig.get().clone(); + Ok(()) + } + + fn validate(&self) -> Result<(), String> { let inner = self.inner.read(); if inner.program_name.is_empty() { diff --git a/veilid-flutter/example/lib/config.dart b/veilid-flutter/example/lib/config.dart index 40c2c3f8..b320ab2e 100644 --- a/veilid-flutter/example/lib/config.dart +++ b/veilid-flutter/example/lib/config.dart @@ -88,21 +88,21 @@ Future getDefaultVeilidConfig() async { udp: VeilidConfigUDP( enabled: !kIsWeb, socketPoolSize: 0, - listenAddress: "[::]:5150", + listenAddress: "", publicAddress: null, ), tcp: VeilidConfigTCP( connect: !kIsWeb, listen: !kIsWeb, maxConnections: 32, - listenAddress: "[::]:5150", + listenAddress: "", publicAddress: null, ), ws: VeilidConfigWS( connect: true, listen: !kIsWeb, maxConnections: 16, - listenAddress: "[::]:5150", + listenAddress: "", path: "ws", url: null, ), @@ -110,7 +110,7 @@ Future getDefaultVeilidConfig() async { connect: true, listen: false, maxConnections: 16, - listenAddress: "[::]:5150", + listenAddress: "", path: "ws", url: null, ), diff --git a/veilid-flutter/example/lib/main.dart b/veilid-flutter/example/lib/main.dart index fad65daf..f8e7bd89 100644 --- a/veilid-flutter/example/lib/main.dart +++ b/veilid-flutter/example/lib/main.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:typed_data'; import 'package:flutter/material.dart'; import 'package:flutter/services.dart'; @@ -24,6 +25,7 @@ LogOptions getLogOptions(LogLevel? level) { } void setRootLogLevel(LogLevel? level) { + print("setRootLogLevel: $level"); Loggy('').level = getLogOptions(level); } @@ -62,6 +64,7 @@ class MyApp extends StatefulWidget { class _MyAppState extends State with UiLoggy { String _veilidVersion = 'Unknown'; Stream? _updateStream; + Future? _updateProcessor; @override void initState() { @@ -95,6 +98,39 @@ class _MyAppState extends State with UiLoggy { }); } + Future processUpdateLog(VeilidUpdateLog update) async { + switch (update.logLevel) { + case VeilidLogLevel.error: + loggy.error(update.message); + break; + case VeilidLogLevel.warn: + loggy.warning(update.message); + break; + case VeilidLogLevel.info: + loggy.info(update.message); + break; + case VeilidLogLevel.debug: + loggy.debug(update.message); + break; + case VeilidLogLevel.trace: + loggy.trace(update.message); + break; + } + } + + Future processUpdates() async { + var stream = _updateStream; + if (stream != null) { + await for (final update in stream) { + if (update is VeilidUpdateLog) { + await processUpdateLog(update); + } else { + loggy.trace("Update: " + update.toString()); + } + } + } + } + @override Widget build(BuildContext context) { final ButtonStyle buttonStyle = @@ -116,16 +152,32 @@ class _MyAppState extends State with UiLoggy { child: Row(children: [ ElevatedButton( style: buttonStyle, - onPressed: () async { - //var await Veilid.instance.startupVeilidCore(await getDefaultVeilidConfig()) - // setState(() { - // }; - }, + onPressed: _updateStream != null + ? null + : () async { + var updateStream = Veilid.instance.startupVeilidCore( + await getDefaultVeilidConfig()); + setState(() { + _updateStream = updateStream; + _updateProcessor = processUpdates(); + }); + }, child: const Text('Startup'), ), ElevatedButton( style: buttonStyle, - onPressed: () {}, + onPressed: _updateStream == null + ? null + : () async { + await Veilid.instance.shutdownVeilidCore(); + if (_updateProcessor != null) { + await _updateProcessor; + } + setState(() { + _updateProcessor = null; + _updateStream = null; + }); + }, child: const Text('Shutdown'), ), ])), diff --git a/veilid-flutter/lib/veilid_ffi.dart b/veilid-flutter/lib/veilid_ffi.dart index a36bb74c..0e4c2acc 100644 --- a/veilid-flutter/lib/veilid_ffi.dart +++ b/veilid-flutter/lib/veilid_ffi.dart @@ -191,39 +191,47 @@ Future processFutureVoid(Future future) { } Stream processStreamJson( - T Function(Map) jsonConstructor, Stream stream) { - return stream.map((value) { - final list = value as List; - switch (list[0] as int) { - case messageErr: - { - throw VeilidAPIExceptionInternal("Internal API Error: ${list[1]}"); - } - case messageOkJson: - { - if (list[1] == null) { - throw VeilidAPIExceptionInternal("Null MESSAGE_OK_JSON value"); + T Function(Map) jsonConstructor, ReceivePort port) async* { + try { + await for (var value in port) { + final list = value as List; + switch (list[0] as int) { + case messageStreamItemJson: + { + if (list[1] == null) { + throw VeilidAPIExceptionInternal( + "Null MESSAGE_STREAM_ITEM_JSON value"); + } + var ret = jsonDecode(list[1] as String); + yield jsonConstructor(ret); + break; } - var ret = jsonDecode(list[1] as String); - return jsonConstructor(ret); - } - case messageErrJson: - { - throw VeilidAPIException.fromJson(jsonDecode(list[1])); - } - default: - { - throw VeilidAPIExceptionInternal( - "Unexpected async return message type: ${list[0]}"); - } + case messageStreamAbort: + { + port.close(); + throw VeilidAPIExceptionInternal("Internal API Error: ${list[1]}"); + } + case messageStreamAbortJson: + { + port.close(); + throw VeilidAPIException.fromJson(jsonDecode(list[1])); + } + case messageStreamClose: + { + port.close(); + break; + } + default: + { + throw VeilidAPIExceptionInternal( + "Unexpected async return message type: ${list[0]}"); + } + } } - }).handleError((e) { + } catch (e) { // Wrap all other errors in VeilidAPIExceptionInternal throw VeilidAPIExceptionInternal(e.toString()); - }, test: (e) { - // Pass errors that are already VeilidAPIException through without wrapping - return e is! VeilidAPIException; - }); + } } // FFI implementation of high level Veilid API @@ -287,7 +295,7 @@ class VeilidFFI implements Veilid { final recvPort = ReceivePort("get_veilid_state"); final sendPort = recvPort.sendPort; _getVeilidState(sendPort.nativePort); - return processFutureJson(VeilidState.fromJson, recvPort.single); + return processFutureJson(VeilidState.fromJson, recvPort.first); } @override @@ -297,7 +305,7 @@ class VeilidFFI implements Veilid { final sendPort = recvPort.sendPort; _changeApiLogLevel(sendPort.nativePort, nativeLogLevel); malloc.free(nativeLogLevel); - return processFutureVoid(recvPort.single); + return processFutureVoid(recvPort.first); } @override @@ -305,7 +313,7 @@ class VeilidFFI implements Veilid { final recvPort = ReceivePort("shutdown_veilid_core"); final sendPort = recvPort.sendPort; _shutdownVeilidCore(sendPort.nativePort); - return processFutureVoid(recvPort.single); + return processFutureVoid(recvPort.first); } @override @@ -314,7 +322,7 @@ class VeilidFFI implements Veilid { final recvPort = ReceivePort("debug"); final sendPort = recvPort.sendPort; _debug(sendPort.nativePort, nativeCommand); - return processFuturePlain(recvPort.single); + return processFuturePlain(recvPort.first); } @override diff --git a/veilid-flutter/rust/src/dart_isolate_wrapper.rs b/veilid-flutter/rust/src/dart_isolate_wrapper.rs index 3d3c1ec5..51277c94 100644 --- a/veilid-flutter/rust/src/dart_isolate_wrapper.rs +++ b/veilid-flutter/rust/src/dart_isolate_wrapper.rs @@ -51,36 +51,36 @@ impl DartIsolateWrapper { }); } - pub fn result(&self, result: Result) -> bool { + pub fn result(self, result: Result) -> bool { match result { Ok(v) => self.ok(v), Err(e) => self.err_json(e), } } - pub fn result_json(&self, result: Result) -> bool { + pub fn result_json(self, result: Result) -> bool { match result { Ok(v) => self.ok_json(v), Err(e) => self.err_json(e), } } - pub fn ok(&self, value: T) -> bool { + pub fn ok(self, value: T) -> bool { self.isolate .post(vec![MESSAGE_OK.into_dart(), value.into_dart()]) } - pub fn ok_json(&self, value: T) -> bool { + pub fn ok_json(self, value: T) -> bool { self.isolate.post(vec![ MESSAGE_OK_JSON.into_dart(), serialize_json(value).into_dart(), ]) } - // pub fn err(&self, error: E) -> bool { + // pub fn err(self, error: E) -> bool { // self.isolate // .post(vec![MESSAGE_ERR.into_dart(), error.into_dart()]) // } - pub fn err_json(&self, error: E) -> bool { + pub fn err_json(self, error: E) -> bool { self.isolate.post(vec![ MESSAGE_ERR_JSON.into_dart(), serialize_json(error).into_dart(), @@ -88,21 +88,35 @@ impl DartIsolateWrapper { } } +struct DartIsolateStreamInner { + pub isolate: Option, +} + +impl Drop for DartIsolateStreamInner { + fn drop(&mut self) { + if let Some(isolate) = self.isolate { + isolate.post(vec![MESSAGE_STREAM_CLOSE.into_dart()]); + } + } +} + #[derive(Clone)] pub struct DartIsolateStream { - isolate: Arc>>, + inner: Arc>, } impl DartIsolateStream { pub fn new(port: i64) -> Self { DartIsolateStream { - isolate: Arc::new(Mutex::new(Some(Isolate::new(port)))), + inner: Arc::new(Mutex::new(DartIsolateStreamInner { + isolate: Some(Isolate::new(port)), + })), } } // pub fn item(&self, value: T) -> bool { - // let isolate = self.isolate.lock(); - // if let Some(isolate) = &*isolate { + // let mut inner = self.inner.lock(); + // if let Some(isolate) = inner.isolate.take() { // isolate.post(vec![MESSAGE_STREAM_ITEM.into_dart(), value.into_dart()]) // } else { // false @@ -110,8 +124,8 @@ impl DartIsolateStream { // } pub fn item_json(&self, value: T) -> bool { - let isolate = self.isolate.lock(); - if let Some(isolate) = &*isolate { + let inner = self.inner.lock(); + if let Some(isolate) = &inner.isolate { isolate.post(vec![ MESSAGE_STREAM_ITEM_JSON.into_dart(), serialize_json(value).into_dart(), @@ -122,8 +136,8 @@ impl DartIsolateStream { } // pub fn abort(self, error: E) -> bool { - // let mut isolate = self.isolate.lock(); - // if let Some(isolate) = isolate.take() { + // let mut inner = self.inner.lock(); + // if let Some(isolate) = inner.isolate.take() { // isolate.post(vec![MESSAGE_STREAM_ABORT.into_dart(), error.into_dart()]) // } else { // false @@ -131,8 +145,8 @@ impl DartIsolateStream { // } pub fn abort_json(self, error: E) -> bool { - let mut isolate = self.isolate.lock(); - if let Some(isolate) = isolate.take() { + let mut inner = self.inner.lock(); + if let Some(isolate) = inner.isolate.take() { isolate.post(vec![ MESSAGE_STREAM_ABORT_JSON.into_dart(), serialize_json(error).into_dart(), @@ -143,20 +157,11 @@ impl DartIsolateStream { } pub fn close(self) -> bool { - let mut isolate = self.isolate.lock(); - if let Some(isolate) = isolate.take() { + let mut inner = self.inner.lock(); + if let Some(isolate) = inner.isolate.take() { isolate.post(vec![MESSAGE_STREAM_CLOSE.into_dart()]) } else { false } } } - -impl Drop for DartIsolateStream { - fn drop(&mut self) { - let mut isolate = self.isolate.lock(); - if let Some(isolate) = isolate.take() { - isolate.post(vec![MESSAGE_STREAM_CLOSE.into_dart()]); - } - } -}