From eed79ce7210203e8b306e17d663315fd024a83a0 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sun, 3 Jul 2022 23:20:30 -0400 Subject: [PATCH] checkpoint --- veilid-core/proto/veilid.capnp | 111 +++++---- veilid-core/src/rpc_processor/coders/mod.rs | 2 + .../rpc_processor/coders/operations/answer.rs | 136 +++++++++++ .../rpc_processor/coders/operations/mod.rs | 16 +- .../coders/operations/operation.rs | 168 +++++++------- .../operations/operation_cancel_tunnel.rs | 62 +++++ .../operations/operation_complete_tunnel.rs | 89 ++++++++ .../coders/operations/operation_detail.rs | 214 ------------------ .../operations/operation_start_tunnel.rs | 83 +++++++ .../coders/operations/question.rs | 148 ++++++++++++ .../coders/operations/respond_to.rs | 19 +- .../coders/operations/statement.rs | 109 +++++++++ .../src/rpc_processor/coders/tunnel.rs | 106 +++++++++ veilid-core/src/rpc_processor/debug.rs | 70 ------ veilid-core/src/rpc_processor/mod.rs | 202 ++++++----------- veilid-core/src/veilid_api/mod.rs | 16 +- 16 files changed, 997 insertions(+), 554 deletions(-) create mode 100644 veilid-core/src/rpc_processor/coders/operations/answer.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs delete mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_detail.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/question.rs create mode 100644 veilid-core/src/rpc_processor/coders/operations/statement.rs create mode 100644 veilid-core/src/rpc_processor/coders/tunnel.rs diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 00de7bb5..91593498 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -356,7 +356,7 @@ enum TunnelError { struct TunnelEndpoint { mode @0 :TunnelEndpointMode; # what kind of endpoint this is - peerInfo @1 :PeerInfo; # node id and dialinfo + description @1 :Text; # endpoint description (TODO) } struct FullTunnel { @@ -400,7 +400,7 @@ struct OperationCompleteTunnelA { } struct OperationCancelTunnelQ { - tunnel @0 :TunnelID; # the tunnel id to cancel + id @0 :TunnelID; # the tunnel id to cancel } struct OperationCancelTunnelA { @@ -410,49 +410,74 @@ struct OperationCancelTunnelA { } } +# Things that want an answer +struct Question { + respondTo :union { + sender @0 :Void; # sender without node info + senderWithInfo @1 :SignedNodeInfo; # some envelope-sender signed node info to be used for reply + privateRoute @2 :PrivateRoute; # embedded private route to be used for reply + } + detail :union { + # Direct operations + statusQ @3 :OperationStatusQ; + findNodeQ @4 :OperationFindNodeQ; + + # Routable operations + getValueQ @5 :OperationGetValueQ; + setValueQ @6 :OperationSetValueQ; + watchValueQ @7 :OperationWatchValueQ; + supplyBlockQ @8 :OperationSupplyBlockQ; + findBlockQ @9 :OperationFindBlockQ; + + # Tunnel operations + startTunnelQ @10 :OperationStartTunnelQ; + completeTunnelQ @11 :OperationCompleteTunnelQ; + cancelTunnelQ @12 :OperationCancelTunnelQ; + } +} + +# Things that don't want an answer +struct Statement { + detail :union { + # Direct operations + validateDialInfo @0 :OperationValidateDialInfo; + route @1 :OperationRoute; + nodeInfoUpdate @2 :OperationNodeInfoUpdate; + + # Routable operations + valueChanged @3 :OperationValueChanged; + signal @4 :OperationSignal; + returnReceipt @5 :OperationReturnReceipt; + } +} + +# Things that are answers +struct Answer { + detail :union { + # Direct operations + statusA @0 :OperationStatusA; + findNodeA @1 :OperationFindNodeA; + + # Routable operations + getValueA @2 :OperationGetValueA; + setValueA @3 :OperationSetValueA; + watchValueA @4 :OperationWatchValueA; + supplyBlockA @5 :OperationSupplyBlockA; + findBlockA @6 :OperationFindBlockA; + + # Tunnel operations + startTunnelA @7 :OperationStartTunnelA; + completeTunnelA @8 :OperationCompleteTunnelA; + cancelTunnelA @9 :OperationCancelTunnelA; + } +} + struct Operation { opId @0 :UInt64; # Random RPC ID. Must be random to foil reply forgery attacks. - respondTo :union { - none @1 :Void; # no response is desired - sender @2 :Void; # sender without node info - senderWithInfo @3 :SignedNodeInfo; # some envelope-sender signed node info to be used for reply - privateRoute @4 :PrivateRoute; # embedded private route to be used for reply - } - - detail :union { - # Direct operations - statusQ @5 :OperationStatusQ; - statusA @6 :OperationStatusA; - validateDialInfo @7 :OperationValidateDialInfo; - findNodeQ @8 :OperationFindNodeQ; - findNodeA @9 :OperationFindNodeA; - route @10 :OperationRoute; - nodeInfoUpdate @11 :OperationNodeInfoUpdate; - - # Routable operations - getValueQ @12 :OperationGetValueQ; - getValueA @13 :OperationGetValueA; - setValueQ @14 :OperationSetValueQ; - setValueA @15 :OperationSetValueA; - watchValueQ @16 :OperationWatchValueQ; - watchValueA @17 :OperationWatchValueA; - valueChanged @18 :OperationValueChanged; - - supplyBlockQ @19 :OperationSupplyBlockQ; - supplyBlockA @20 :OperationSupplyBlockA; - findBlockQ @21 :OperationFindBlockQ; - findBlockA @22 :OperationFindBlockA; - - signal @23 :OperationSignal; - returnReceipt @24 :OperationReturnReceipt; - - # Tunnel operations - startTunnelQ @25 :OperationStartTunnelQ; - startTunnelA @26 :OperationStartTunnelA; - completeTunnelQ @27 :OperationCompleteTunnelQ; - completeTunnelA @28 :OperationCompleteTunnelA; - cancelTunnelQ @29 :OperationCancelTunnelQ; - cancelTunnelA @30 :OperationCancelTunnelA; + kind :union { + question @1 :Question; + statement @2 :Statement; + answer @3 :Answer; } } diff --git a/veilid-core/src/rpc_processor/coders/mod.rs b/veilid-core/src/rpc_processor/coders/mod.rs index 09d7dbf2..72590763 100644 --- a/veilid-core/src/rpc_processor/coders/mod.rs +++ b/veilid-core/src/rpc_processor/coders/mod.rs @@ -18,6 +18,7 @@ mod signal_info; mod signature; mod signed_node_info; mod socket_address; +mod tunnel; mod value_data; mod value_key; @@ -41,6 +42,7 @@ pub use signal_info::*; pub use signature::*; pub use signed_node_info::*; pub use socket_address::*; +pub use tunnel::*; pub use value_data::*; pub use value_key::*; diff --git a/veilid-core/src/rpc_processor/coders/operations/answer.rs b/veilid-core/src/rpc_processor/coders/operations/answer.rs new file mode 100644 index 00000000..c0b305ed --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/answer.rs @@ -0,0 +1,136 @@ +use super::*; +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCAnswer { + detail: RPCAnswerDetail, +} + +impl RPCAnswer { + pub fn new(detail: RPCAnswerDetail) -> Self { + Self { detail } + } + pub fn detail(&self) -> &RPCAnswerDetail { + &self.detail + } + pub fn desc(&self) -> &'static str { + self.detail.desc() + } + pub fn decode(reader: &veilid_capnp::answer::Reader) -> Result { + let d_reader = reader.get_detail(); + let detail = RPCAnswerDetail::decode(&d_reader)?; + Ok(RPCAnswer { detail }) + } + pub fn encode(&self, builder: &mut veilid_capnp::answer::Builder) -> Result<(), RPCError> { + self.detail.encode(&mut builder.init_detail())?; + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub enum RPCAnswerDetail { + StatusA(RPCOperationStatusA), + FindNodeA(RPCOperationFindNodeA), + GetValueA(RPCOperationGetValueA), + SetValueA(RPCOperationSetValueA), + WatchValueA(RPCOperationWatchValueA), + SupplyBlockA(RPCOperationSupplyBlockA), + FindBlockA(RPCOperationFindBlockA), + StartTunnelA(RPCOperationStartTunnelA), + CompleteTunnelA(RPCOperationCompleteTunnelA), + CancelTunnelA(RPCOperationCancelTunnelA), +} + +impl RPCAnswerDetail { + pub fn desc(&self) -> &'static str { + match self { + RPCAnswerDetail::StatusA(_) => "StatusA", + RPCAnswerDetail::FindNodeA(_) => "FindNodeA", + RPCAnswerDetail::GetValueA(_) => "GetValueA", + RPCAnswerDetail::SetValueA(_) => "SetValueA", + RPCAnswerDetail::WatchValueA(_) => "WatchValueA", + RPCAnswerDetail::SupplyBlockA(_) => "SupplyBlockA", + RPCAnswerDetail::FindBlockA(_) => "FindBlockA", + RPCAnswerDetail::StartTunnelA(_) => "StartTunnelA", + RPCAnswerDetail::CompleteTunnelA(_) => "CompleteTunnelA", + RPCAnswerDetail::CancelTunnelA(_) => "CancelTunnelA", + } + } + + pub fn decode( + reader: &veilid_capnp::answer::detail::Reader, + ) -> Result { + let which_reader = reader.which().map_err(map_error_capnp_notinschema!())?; + let out = match which_reader { + veilid_capnp::answer::detail::StatusA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationStatusA::decode(&op_reader)?; + RPCAnswerDetail::StatusA(out) + } + veilid_capnp::answer::detail::FindNodeA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationFindNodeA::decode(&op_reader)?; + RPCAnswerDetail::FindNodeA(out) + } + veilid_capnp::answer::detail::GetValueA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationGetValueA::decode(&op_reader)?; + RPCAnswerDetail::GetValueA(out) + } + veilid_capnp::answer::detail::SetValueA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationSetValueA::decode(&op_reader)?; + RPCAnswerDetail::SetValueA(out) + } + veilid_capnp::answer::detail::WatchValueA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationWatchValueA::decode(&op_reader)?; + RPCAnswerDetail::WatchValueA(out) + } + veilid_capnp::answer::detail::SupplyBlockA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationSupplyBlockA::decode(&op_reader)?; + RPCAnswerDetail::SupplyBlockA(out) + } + veilid_capnp::answer::detail::FindBlockA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationFindBlockA::decode(&op_reader)?; + RPCAnswerDetail::FindBlockA(out) + } + veilid_capnp::answer::detail::StartTunnelA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationStartTunnelA::decode(&op_reader)?; + RPCAnswerDetail::StartTunnelA(out) + } + veilid_capnp::answer::detail::CompleteTunnelA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationCompleteTunnelA::decode(&op_reader)?; + RPCAnswerDetail::CompleteTunnelA(out) + } + veilid_capnp::answer::detail::CancelTunnelA(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationCancelTunnelA::decode(&op_reader)?; + RPCAnswerDetail::CancelTunnelA(out) + } + }; + Ok(out) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::answer::detail::Builder, + ) -> Result<(), RPCError> { + match self { + RPCAnswerDetail::StatusA(d) => d.encode(&mut builder.init_status_a()), + RPCAnswerDetail::FindNodeA(d) => d.encode(&mut builder.init_find_node_a()), + RPCAnswerDetail::GetValueA(d) => d.encode(&mut builder.init_get_value_a()), + RPCAnswerDetail::SetValueA(d) => d.encode(&mut builder.init_set_value_a()), + RPCAnswerDetail::WatchValueA(d) => d.encode(&mut builder.init_watch_value_a()), + RPCAnswerDetail::SupplyBlockA(d) => d.encode(&mut builder.init_supply_block_a()), + RPCAnswerDetail::FindBlockA(d) => d.encode(&mut builder.init_find_block_a()), + RPCAnswerDetail::StartTunnelA(d) => d.encode(&mut builder.init_start_tunnel_a()), + RPCAnswerDetail::CompleteTunnelA(d) => d.encode(&mut builder.init_complete_tunnel_a()), + RPCAnswerDetail::CancelTunnelA(d) => d.encode(&mut builder.init_cancel_tunnel_a()), + } + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/mod.rs b/veilid-core/src/rpc_processor/coders/operations/mod.rs index 148bc82d..8426ff1b 100644 --- a/veilid-core/src/rpc_processor/coders/operations/mod.rs +++ b/veilid-core/src/rpc_processor/coders/operations/mod.rs @@ -1,5 +1,7 @@ +mod answer; mod operation; -mod operation_detail; +mod operation_cancel_tunnel; +mod operation_complete_tunnel; mod operation_find_block; mod operation_find_node; mod operation_get_value; @@ -8,16 +10,20 @@ mod operation_return_receipt; mod operation_route; mod operation_set_value; mod operation_signal; +mod operation_start_tunnel; mod operation_status; mod operation_supply_block; mod operation_validate_dial_info; mod operation_value_changed; mod operation_watch_value; - +mod question; mod respond_to; +mod statement; +pub use answer::*; pub use operation::*; -pub use operation_detail::*; +pub use operation_cancel_tunnel::*; +pub use operation_complete_tunnel::*; pub use operation_find_block::*; pub use operation_find_node::*; pub use operation_get_value::*; @@ -26,10 +32,12 @@ pub use operation_return_receipt::*; pub use operation_route::*; pub use operation_set_value::*; pub use operation_signal::*; +pub use operation_start_tunnel::*; pub use operation_status::*; pub use operation_supply_block::*; pub use operation_validate_dial_info::*; pub use operation_value_changed::*; pub use operation_watch_value::*; - +pub use question::*; pub use respond_to::*; +pub use statement::*; diff --git a/veilid-core/src/rpc_processor/coders/operations/operation.rs b/veilid-core/src/rpc_processor/coders/operations/operation.rs index b7f59dca..1d0d0cf6 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation.rs @@ -2,97 +2,113 @@ use crate::*; use rpc_processor::*; #[derive(Debug, Clone)] -struct RPCOperation { +pub enum RPCOperationKind { + Question(RPCQuestion), + Statement(RPCStatement), + Answer(RPCAnswer), +} + +impl RPCOperationKind { + pub fn desc(&self) -> &'static str { + match self { + RPCOperationKind::Question(q) => q.desc(), + RPCOperationKind::Statement(s) => s.desc(), + RPCOperationKind::Answer(a) => a.desc(), + } + } + + pub fn decode( + kind_reader: &veilid_capnp::operation::kind::Reader, + sender_node_id: &DHTKey, + ) -> Result { + let which_reader = kind_reader + .which() + .map_err(map_error_capnp_notinschema!())?; + let out = match which_reader { + veilid_capnp::operation::kind::Which::Question(r) => { + let q_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCQuestion::decode(&q_reader, sender_node_id)?; + RPCOperationKind::Question(out) + } + veilid_capnp::operation::kind::Which::Statement(r) => { + let q_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCStatement::decode(&q_reader, sender_node_id)?; + RPCOperationKind::Statement(out) + } + veilid_capnp::operation::kind::Which::Answer(r) => { + let q_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCAnswer::decode(&q_reader)?; + RPCOperationKind::Answer(out) + } + }; + + Ok(out) + } + + pub fn encode( + &self, + builder: &mut veilid_capnp::operation::kind::Builder, + ) -> Result<(), RPCError> { + match self { + RPCOperationKind::Question(k) => k.encode(&mut builder.init_question()), + RPCOperationKind::Statement(k) => k.encode(&mut builder.init_statement()), + RPCOperationKind::Answer(k) => k.encode(&mut builder.init_answer()), + }; + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub struct RPCOperation { op_id: u64, - // index: u32, - // is_q: bool, - // wants_answer: bool, - respond_to: RespondTo, - detail: RPCOperationDetail, + kind: RPCOperationKind, } impl RPCOperation { + pub fn new_question(question: RPCQuestion) -> Self { + Self { + op_id: intf::get_random_u64(), + kind: RPCOperationKind::Question(question), + } + } + pub fn new_statement(statement: RPCStatement) -> Self { + Self { + op_id: intf::get_random_u64(), + kind: RPCOperationKind::Statement(statement), + } + } + + pub fn new_answer(request: &RPCOperation, answer: RPCAnswer) -> Self { + Self { + op_id: request.op_id, + kind: RPCOperationKind::Answer(answer), + } + } + + pub fn op_id(&self) -> u64 { + self.op_id + } + + pub fn kind(&self) -> &RPCOperationKind { + &self.kind + } + pub fn decode( operation_reader: &veilid_capnp::operation::Reader, sender_node_id: &DHTKey, ) -> Result { let op_id = operation_reader.get_op_id(); - let respond_to_reader = operation_reader.get_respond_to(); - let respond_to = RespondTo::decode(&respond_to_reader, sender_node_id)?; + let kind_reader = operation_reader.get_kind(); + let kind = RPCOperationKind::decode(&kind_reader, sender_node_id)?; - let detail_reader = operation_reader.get_detail(); - let detail = RPCOperationDetail::decode(&detail_reader, sender_node_id)?; - - Ok(RPCOperation { - op_id, - respond_to, - detail, - }) + Ok(RPCOperation { op_id, kind }) } pub fn encode(&self, builder: &mut veilid_capnp::operation::Builder) -> Result<(), RPCError> { builder.set_op_id(self.op_id); - let rt_builder = builder.init_respond_to(); - self.respond_to.encode(&mut rt_builder)?; - let d_builder = builder.init_detail(); - self.detail.encode(&mut d_builder)?; + let k_builder = builder.init_kind(); + self.kind.encode(&mut k_builder)?; Ok(()) } } - -// let out = match which_reader { -// veilid_capnp::operation::detail::StatusQ(_) => Self { name: "StatusQ", op_id, index: 0, is_q: true, wants_answer: true, respond_to }, -// veilid_capnp::operation::detail::StatusA(_) => Self { name: "StatusA", op_id, index: 1, is_q: false, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::ValidateDialInfo(_) => Self { name: "ValidateDialInfo", op_id, index: 2, is_q: true, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::FindNodeQ(_) => Self { name: "FindNodeQ", op_id, index: 3, is_q: true, wants_answer: true, respond_to }, -// veilid_capnp::operation::detail::FindNodeA(_) => Self { name: "FindNodeA", op_id, index: 4, is_q: false, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::Route(_) => Self { name: "Route", op_id, index: 5, is_q: true, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::NodeInfoUpdate(_) => Self { name: "NodeInfoUpdate", op_id, index: 6, is_q: true, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::GetValueQ(_) => Self { name: "GetValueQ", op_id, index: 7, is_q: true, wants_answer: true, respond_to }, -// veilid_capnp::operation::detail::GetValueA(_) => Self { name: "GetValueA", op_id, index: 8, is_q: false, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::SetValueQ(_) => Self { name: "SetValueQ", op_id, index: 9, is_q: true, wants_answer: true, respond_to }, -// veilid_capnp::operation::detail::SetValueA(_) => Self { name: "SetValueA", op_id, index: 10, is_q: false, wants_answer: false, respond_to}, -// veilid_capnp::operation::detail::WatchValueQ(_) => Self { name: "WatchValueQ", op_id, index: 11, is_q: true, wants_answer: true, respond_to}, -// veilid_capnp::operation::detail::WatchValueA(_) => Self { name: "WatchValueA", op_id, index: 12, is_q: false, wants_answer: false, respond_to}, -// veilid_capnp::operation::detail::ValueChanged(_) => Self { name: "ValueChanged", op_id, index: 13, is_q: true, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::SupplyBlockQ(_) => Self { name: "SupplyBlockQ", op_id, index: 14, is_q: true, wants_answer: true, respond_to }, -// veilid_capnp::operation::detail::SupplyBlockA(_) => Self { name: "SupplyBlockA", op_id, index: 15, is_q: false, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::FindBlockQ(_) => Self { name: "FindBlockQ", op_id, index: 16, is_q: true, wants_answer: true, respond_to}, -// veilid_capnp::operation::detail::FindBlockA(_) =>Self { name: "FindBlockA", op_id, index: 17, is_q: false, wants_answer: false, respond_to}, -// veilid_capnp::operation::detail::Signal(_) => Self { name: "Signal", op_id, index: 18, is_q: true, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::ReturnReceipt(_) => Self { name: "ReturnReceipt", op_id, index: 19, is_q: true, wants_answer: false, respond_to}, -// veilid_capnp::operation::detail::StartTunnelQ(_) => Self { name: "StartTunnelQ", op_id, index: 20, is_q: true, wants_answer: true, respond_to }, -// veilid_capnp::operation::detail::StartTunnelA(_) => Self { name: "StartTunnelA", op_id, index: 21, is_q: false, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::CompleteTunnelQ(_) =>Self { name: "CompleteTunnelQ", op_id, index: 22, is_q: true, wants_answer: true, respond_to}, -// veilid_capnp::operation::detail::CompleteTunnelA(_) => Self { name: "CompleteTunnelA", op_id, index: 23, is_q: false, wants_answer: false, respond_to}, -// veilid_capnp::operation::detail::CancelTunnelQ(_) => Self { name: "CancelTunnelQ", op_id, index: 24, is_q: true, wants_answer: true, respond_to}, -// veilid_capnp::operation::detail::CancelTunnelA(_) => Self { name: "CancelTunnelA", op_id, index: 25, is_q: false, wants_answer: false, respond_to}, -// }; - -// veilid_capnp::operation::detail::StatusQ(_) => Self { name: "StatusQ", op_id, index: 0, is_q: true, wants_answer: true, respond_to }, -// veilid_capnp::operation::detail::StatusA(_) => Self { name: "StatusA", op_id, index: 1, is_q: false, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::ValidateDialInfo(_) => Self { name: "ValidateDialInfo", op_id, index: 2, is_q: true, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::FindNodeQ(_) => Self { name: "FindNodeQ", op_id, index: 3, is_q: true, wants_answer: true, respond_to }, -// veilid_capnp::operation::detail::FindNodeA(_) => Self { name: "FindNodeA", op_id, index: 4, is_q: false, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::Route(_) => Self { name: "Route", op_id, index: 5, is_q: true, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::NodeInfoUpdate(_) => Self { name: "NodeInfoUpdate", op_id, index: 6, is_q: true, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::GetValueQ(_) => Self { name: "GetValueQ", op_id, index: 7, is_q: true, wants_answer: true, respond_to }, -// veilid_capnp::operation::detail::GetValueA(_) => Self { name: "GetValueA", op_id, index: 8, is_q: false, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::SetValueQ(_) => Self { name: "SetValueQ", op_id, index: 9, is_q: true, wants_answer: true, respond_to }, -// veilid_capnp::operation::detail::SetValueA(_) => Self { name: "SetValueA", op_id, index: 10, is_q: false, wants_answer: false, respond_to}, -// veilid_capnp::operation::detail::WatchValueQ(_) => Self { name: "WatchValueQ", op_id, index: 11, is_q: true, wants_answer: true, respond_to}, -// veilid_capnp::operation::detail::WatchValueA(_) => Self { name: "WatchValueA", op_id, index: 12, is_q: false, wants_answer: false, respond_to}, -// veilid_capnp::operation::detail::ValueChanged(_) => Self { name: "ValueChanged", op_id, index: 13, is_q: true, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::SupplyBlockQ(_) => Self { name: "SupplyBlockQ", op_id, index: 14, is_q: true, wants_answer: true, respond_to }, -// veilid_capnp::operation::detail::SupplyBlockA(_) => Self { name: "SupplyBlockA", op_id, index: 15, is_q: false, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::FindBlockQ(_) => Self { name: "FindBlockQ", op_id, index: 16, is_q: true, wants_answer: true, respond_to}, -// veilid_capnp::operation::detail::FindBlockA(_) =>Self { name: "FindBlockA", op_id, index: 17, is_q: false, wants_answer: false, respond_to}, -// veilid_capnp::operation::detail::Signal(_) => Self { name: "Signal", op_id, index: 18, is_q: true, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::ReturnReceipt(_) => Self { name: "ReturnReceipt", op_id, index: 19, is_q: true, wants_answer: false, respond_to}, -// veilid_capnp::operation::detail::StartTunnelQ(_) => Self { name: "StartTunnelQ", op_id, index: 20, is_q: true, wants_answer: true, respond_to }, -// veilid_capnp::operation::detail::StartTunnelA(_) => Self { name: "StartTunnelA", op_id, index: 21, is_q: false, wants_answer: false, respond_to }, -// veilid_capnp::operation::detail::CompleteTunnelQ(_) =>Self { name: "CompleteTunnelQ", op_id, index: 22, is_q: true, wants_answer: true, respond_to}, -// veilid_capnp::operation::detail::CompleteTunnelA(_) => Self { name: "CompleteTunnelA", op_id, index: 23, is_q: false, wants_answer: false, respond_to}, -// veilid_capnp::operation::detail::CancelTunnelQ(_) => Self { name: "CancelTunnelQ", op_id, index: 24, is_q: true, wants_answer: true, respond_to}, -// veilid_capnp::operation::detail::CancelTunnelA(_) => Self { name: "CancelTunnelA", op_id, index: 25, is_q: false, wants_answer: false, respond_to}, diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs b/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs new file mode 100644 index 00000000..becd3fea --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_cancel_tunnel.rs @@ -0,0 +1,62 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationCancelTunnelQ { + id: TunnelId, +} + +impl RPCOperationCancelTunnelQ { + pub fn decode( + reader: &veilid_capnp::operation_cancel_tunnel_q::Reader, + ) -> Result { + let id = reader.get_id(); + + Ok(RPCOperationCancelTunnelQ { id }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_cancel_tunnel_q::Builder, + ) -> Result<(), RPCError> { + builder.set_id(self.id); + + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub enum RPCOperationCancelTunnelA { + Tunnel(TunnelId), + Error(TunnelError), +} + +impl RPCOperationCancelTunnelA { + pub fn decode( + reader: &veilid_capnp::operation_cancel_tunnel_a::Reader, + ) -> Result { + match reader.which().map_err(map_error_capnp_notinschema!())? { + veilid_capnp::operation_cancel_tunnel_a::Which::Tunnel(r) => { + Ok(RPCOperationCancelTunnelA::Tunnel(r)) + } + veilid_capnp::operation_cancel_tunnel_a::Which::Error(r) => { + let tunnel_error = decode_tunnel_error(r.map_err(map_error_capnp_notinschema!())?); + Ok(RPCOperationCancelTunnelA::Error(tunnel_error)) + } + } + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_cancel_tunnel_a::Builder, + ) -> Result<(), RPCError> { + match self { + RPCOperationCancelTunnelA::Tunnel(p) => { + builder.set_tunnel(*p); + } + RPCOperationCancelTunnelA::Error(e) => { + builder.set_error(encode_tunnel_error(*e)); + } + } + + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs b/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs new file mode 100644 index 00000000..bc11b474 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_complete_tunnel.rs @@ -0,0 +1,89 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationCompleteTunnelQ { + id: TunnelId, + local_mode: TunnelMode, + depth: u8, + endpoint: TunnelEndpoint, +} + +impl RPCOperationCompleteTunnelQ { + pub fn decode( + reader: &veilid_capnp::operation_complete_tunnel_q::Reader, + ) -> Result { + let id = reader.get_id(); + let local_mode = match reader + .get_local_mode() + .map_err(map_error_capnp_notinschema!())? + { + veilid_capnp::TunnelEndpointMode::Raw => TunnelMode::Raw, + veilid_capnp::TunnelEndpointMode::Turn => TunnelMode::Turn, + }; + let depth = reader.get_depth(); + let te_reader = reader.get_endpoint().map_err(map_error_capnp_error!())?; + let endpoint = decode_tunnel_endpoint(&te_reader)?; + + Ok(RPCOperationCompleteTunnelQ { + id, + local_mode, + depth, + endpoint, + }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_complete_tunnel_q::Builder, + ) -> Result<(), RPCError> { + builder.set_id(self.id); + builder.set_local_mode(match self.local_mode { + TunnelMode::Raw => veilid_capnp::TunnelEndpointMode::Raw, + TunnelMode::Turn => veilid_capnp::TunnelEndpointMode::Turn, + }); + builder.set_depth(self.depth); + let te_builder = builder.init_endpoint(); + encode_tunnel_endpoint(&self.endpoint, &mut te_builder)?; + + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub enum RPCOperationCompleteTunnelA { + Tunnel(FullTunnel), + Error(TunnelError), +} + +impl RPCOperationCompleteTunnelA { + pub fn decode( + reader: &veilid_capnp::operation_complete_tunnel_a::Reader, + ) -> Result { + match reader.which().map_err(map_error_capnp_notinschema!())? { + veilid_capnp::operation_complete_tunnel_a::Which::Tunnel(r) => { + let ft_reader = r.map_err(map_error_capnp_error!())?; + let full_tunnel = decode_full_tunnel(&ft_reader)?; + Ok(RPCOperationCompleteTunnelA::Tunnel(full_tunnel)) + } + veilid_capnp::operation_complete_tunnel_a::Which::Error(r) => { + let tunnel_error = decode_tunnel_error(r.map_err(map_error_capnp_notinschema!())?); + Ok(RPCOperationCompleteTunnelA::Error(tunnel_error)) + } + } + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_complete_tunnel_a::Builder, + ) -> Result<(), RPCError> { + match self { + RPCOperationCompleteTunnelA::Tunnel(p) => { + encode_full_tunnel(p, &mut builder.init_tunnel())?; + } + RPCOperationCompleteTunnelA::Error(e) => { + builder.set_error(encode_tunnel_error(*e)); + } + } + + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_detail.rs b/veilid-core/src/rpc_processor/coders/operations/operation_detail.rs deleted file mode 100644 index 55671bfc..00000000 --- a/veilid-core/src/rpc_processor/coders/operations/operation_detail.rs +++ /dev/null @@ -1,214 +0,0 @@ -use super::*; -use crate::*; -use rpc_processor::*; - -#[derive(Debug, Clone)] -pub enum RPCOperationDetail { - StatusQ(RPCOperationStatusQ), - StatusA(RPCOperationStatusA), - ValidateDialInfo(RPCOperationValidateDialInfo), - FindNodeQ(RPCOperationFindNodeQ), - FindNodeA(RPCOperationFindNodeA), - Route(RPCOperationRoute), - NodeInfoUpdate(RPCOperationNodeInfoUpdate), - GetValueQ(RPCOperationGetValueQ), - GetValueA(RPCOperationGetValueA), - SetValueQ(RPCOperationSetValueQ), - SetValueA(RPCOperationSetValueA), - WatchValueQ(RPCOperationWatchValueQ), - WatchValueA(RPCOperationWatchValueA), - ValueChanged(RPCOperationValueChanged), - SupplyBlockQ(RPCOperationSupplyBlockQ), - SupplyBlockA(RPCOperationSupplyBlockA), - FindBlockQ(RPCOperationFindBlockQ), - FindBlockA(RPCOperationFindBlockA), - Signal(RPCOperationSignal), - ReturnReceipt(RPCOperationReturnReceipt), - StartTunnelQ(RPCOperationStartTunnelQ), - StartTunnelA(RPCOperationStartTunnelA), - CompleteTunnelQ(RPCOperationCompleteTunnelQ), - CompleteTunnelA(RPCOperationCompleteTunnelA), - CancelTunnelQ(CancelTunnelQ), - CancelTunnelA(CancelTunnelA), -} - -impl RPCOperationDetail { - pub fn decode( - reader: &veilid_capnp::operation::detail::Reader, - sender_node_id: &DHTKey, - ) -> Result { - let which_reader = reader.which().map_err(map_error_capnp_notinschema!())?; - let out = match which_reader { - veilid_capnp::operation::detail::StatusQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationStatusQ::decode(&op_reader)?; - RPCOperationDetail::StatusQ(out) - } - veilid_capnp::operation::detail::StatusA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationStatusA::decode(&op_reader)?; - RPCOperationDetail::StatusA(out) - } - veilid_capnp::operation::detail::ValidateDialInfo(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationValidateDialInfo::decode(&op_reader)?; - RPCOperationDetail::ValidateDialInfo(out) - } - veilid_capnp::operation::detail::FindNodeQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationFindNodeQ::decode(&op_reader)?; - RPCOperationDetail::FindNodeQ(out) - } - veilid_capnp::operation::detail::FindNodeA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationFindNodeA::decode(&op_reader)?; - RPCOperationDetail::FindNodeA(out) - } - veilid_capnp::operation::detail::Route(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationRoute::decode(&op_reader)?; - RPCOperationDetail::Route(out) - } - veilid_capnp::operation::detail::NodeInfoUpdate(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationNodeInfoUpdate::decode(&op_reader, sender_node_id)?; - RPCOperationDetail::NodeInfoUpdate(out) - } - veilid_capnp::operation::detail::GetValueQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationGetValueQ::decode(&op_reader)?; - RPCOperationDetail::GetValueQ(out) - } - veilid_capnp::operation::detail::GetValueA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationGetValueA::decode(&op_reader)?; - RPCOperationDetail::GetValueA(out) - } - veilid_capnp::operation::detail::SetValueQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationSetValueQ::decode(&op_reader)?; - RPCOperationDetail::SetValueQ(out) - } - veilid_capnp::operation::detail::SetValueA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationSetValueA::decode(&op_reader)?; - RPCOperationDetail::SetValueA(out) - } - veilid_capnp::operation::detail::WatchValueQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationWatchValueQ::decode(&op_reader)?; - RPCOperationDetail::WatchValueQ(out) - } - veilid_capnp::operation::detail::WatchValueA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationWatchValueA::decode(&op_reader)?; - RPCOperationDetail::WatchValueA(out) - } - veilid_capnp::operation::detail::ValueChanged(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationValueChanged::decode(&op_reader)?; - RPCOperationDetail::ValueChanged(out) - } - veilid_capnp::operation::detail::SupplyBlockQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationSupplyBlockQ::decode(&op_reader)?; - RPCOperationDetail::SupplyBlockQ(out) - } - veilid_capnp::operation::detail::SupplyBlockA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationSupplyBlockA::decode(&op_reader)?; - RPCOperationDetail::SupplyBlockA(out) - } - veilid_capnp::operation::detail::FindBlockQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationFindBlockQ::decode(&op_reader)?; - RPCOperationDetail::FindBlockQ(out) - } - veilid_capnp::operation::detail::FindBlockA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationFindBlockA::decode(&op_reader)?; - RPCOperationDetail::FindBlockA(out) - } - veilid_capnp::operation::detail::Signal(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationSignal::decode(&op_reader)?; - RPCOperationDetail::Signal(out) - } - veilid_capnp::operation::detail::ReturnReceipt(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationReturnReceipt::decode(&op_reader)?; - RPCOperationDetail::ReturnReceipt(out) - } - veilid_capnp::operation::detail::StartTunnelQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationStartTunnelQ::decode(&op_reader)?; - RPCOperationDetail::StartTunnelQ(out) - } - veilid_capnp::operation::detail::StartTunnelA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationStartTunnelA::decode(&op_reader)?; - RPCOperationDetail::StartTunnelA(out) - } - veilid_capnp::operation::detail::CompleteTunnelQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationCompleteTunnelQ::decode(&op_reader)?; - RPCOperationDetail::CompleteTunnelQ(out) - } - veilid_capnp::operation::detail::CompleteTunnelA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationCompleteTunnelA::decode(&op_reader)?; - RPCOperationDetail::CompleteTunnelA(out) - } - veilid_capnp::operation::detail::CancelTunnelQ(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationCancelTunnelQ::decode(&op_reader)?; - RPCOperationDetail::CancelTunnelQ(out) - } - veilid_capnp::operation::detail::CancelTunnelA(r) => { - let op_reader = r.map_err(map_error_capnp_notinschema!())?; - let out = RPCOperationCancelTunnelA::decode(&op_reader)?; - RPCOperationDetail::CancelTunnelA(out) - } - }; - Ok(out) - } - pub fn encode( - &self, - builder: &mut veilid_capnp::operation::detail::Builder, - ) -> Result<(), RPCError> { - match self { - RPCOperationDetail::StatusQ(d) => d.encode(&mut builder.init_status_q()), - RPCOperationDetail::StatusA(d) => d.encode(&mut builder.init_status_a()), - RPCOperationDetail::ValidateDialInfo(d) => { - d.encode(&mut builder.init_validate_dial_info()) - } - RPCOperationDetail::FindNodeQ(d) => d.encode(&mut builder.init_find_node_q()), - RPCOperationDetail::FindNodeA(d) => d.encode(&mut builder.init_find_node_a()), - RPCOperationDetail::Route(d) => d.encode(&mut builder.init_route()), - RPCOperationDetail::NodeInfoUpdate(d) => d.encode(&mut builder.init_node_info_update()), - RPCOperationDetail::GetValueQ(d) => d.encode(&mut builder.init_get_value_q()), - RPCOperationDetail::GetValueA(d) => d.encode(&mut builder.init_get_value_a()), - RPCOperationDetail::SetValueQ(d) => d.encode(&mut builder.init_set_value_q()), - RPCOperationDetail::SetValueA(d) => d.encode(&mut builder.init_set_value_a()), - RPCOperationDetail::WatchValueQ(d) => d.encode(&mut builder.init_watch_value_q()), - RPCOperationDetail::WatchValueA(d) => d.encode(&mut builder.init_watch_value_a()), - RPCOperationDetail::ValueChanged(d) => d.encode(&mut builder.init_value_changed()), - RPCOperationDetail::SupplyBlockQ(d) => d.encode(&mut builder.init_supply_block_q()), - RPCOperationDetail::SupplyBlockA(d) => d.encode(&mut builder.init_supply_block_a()), - RPCOperationDetail::FindBlockQ(d) => d.encode(&mut builder.init_find_block_q()), - RPCOperationDetail::FindBlockA(d) => d.encode(&mut builder.init_find_block_a()), - RPCOperationDetail::Signal(d) => d.encode(&mut builder.init_signal()), - RPCOperationDetail::ReturnReceipt(d) => d.encode(&mut builder.init_return_receipt()), - RPCOperationDetail::StartTunnelQ(d) => d.encode(&mut builder.init_start_tunnel_q()), - RPCOperationDetail::StartTunnelA(d) => d.encode(&mut builder.init_start_tunnel_a()), - RPCOperationDetail::CompleteTunnelQ(d) => { - d.encode(&mut builder.init_complete_tunnel_q()) - } - RPCOperationDetail::CompleteTunnelA(d) => { - d.encode(&mut builder.init_complete_tunnel_a()) - } - RPCOperationDetail::CancelTunnelQ(d) => d.encode(&mut builder.init_cancel_tunnel_q()), - RPCOperationDetail::CancelTunnelA(d) => d.encode(&mut builder.init_cancel_tunnel_a()), - } - } -} diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs b/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs new file mode 100644 index 00000000..2600775a --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/operation_start_tunnel.rs @@ -0,0 +1,83 @@ +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCOperationStartTunnelQ { + id: TunnelId, + local_mode: TunnelMode, + depth: u8, +} + +impl RPCOperationStartTunnelQ { + pub fn decode( + reader: &veilid_capnp::operation_start_tunnel_q::Reader, + ) -> Result { + let id = reader.get_id(); + let local_mode = match reader + .get_local_mode() + .map_err(map_error_capnp_notinschema!())? + { + veilid_capnp::TunnelEndpointMode::Raw => TunnelMode::Raw, + veilid_capnp::TunnelEndpointMode::Turn => TunnelMode::Turn, + }; + let depth = reader.get_depth(); + + Ok(RPCOperationStartTunnelQ { + id, + local_mode, + depth, + }) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_start_tunnel_q::Builder, + ) -> Result<(), RPCError> { + builder.set_id(self.id); + builder.set_local_mode(match self.local_mode { + TunnelMode::Raw => veilid_capnp::TunnelEndpointMode::Raw, + TunnelMode::Turn => veilid_capnp::TunnelEndpointMode::Turn, + }); + builder.set_depth(self.depth); + + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub enum RPCOperationStartTunnelA { + Partial(PartialTunnel), + Error(TunnelError), +} + +impl RPCOperationStartTunnelA { + pub fn decode( + reader: &veilid_capnp::operation_start_tunnel_a::Reader, + ) -> Result { + match reader.which().map_err(map_error_capnp_notinschema!())? { + veilid_capnp::operation_start_tunnel_a::Which::Partial(r) => { + let pt_reader = r.map_err(map_error_capnp_error!())?; + let partial_tunnel = decode_partial_tunnel(&pt_reader)?; + Ok(RPCOperationStartTunnelA::Partial(partial_tunnel)) + } + veilid_capnp::operation_start_tunnel_a::Which::Error(r) => { + let tunnel_error = decode_tunnel_error(r.map_err(map_error_capnp_notinschema!())?); + Ok(RPCOperationStartTunnelA::Error(tunnel_error)) + } + } + } + pub fn encode( + &self, + builder: &mut veilid_capnp::operation_start_tunnel_a::Builder, + ) -> Result<(), RPCError> { + match self { + RPCOperationStartTunnelA::Partial(p) => { + encode_partial_tunnel(p, &mut builder.init_partial())?; + } + RPCOperationStartTunnelA::Error(e) => { + builder.set_error(encode_tunnel_error(*e)); + } + } + + Ok(()) + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/question.rs b/veilid-core/src/rpc_processor/coders/operations/question.rs new file mode 100644 index 00000000..570b3677 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/question.rs @@ -0,0 +1,148 @@ +use super::*; +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCQuestion { + respond_to: RespondTo, + detail: RPCQuestionDetail, +} + +impl RPCQuestion { + pub fn new(respond_to: RespondTo, detail: RPCQuestionDetail) -> Self { + Self { respond_to, detail } + } + pub fn respond_to(&self) -> &RespondTo { + &self.respond_to + } + pub fn detail(&self) -> &RPCQuestionDetail { + &self.detail + } + pub fn desc(&self) -> &'static str { + self.detail.desc() + } + pub fn decode( + reader: &veilid_capnp::question::Reader, + sender_node_id: &DHTKey, + ) -> Result { + let rt_reader = reader.get_respond_to(); + let respond_to = RespondTo::decode(&rt_reader, sender_node_id)?; + let d_reader = reader.get_detail(); + let detail = RPCQuestionDetail::decode(&d_reader)?; + Ok(RPCQuestion { respond_to, detail }) + } + pub fn encode(&self, builder: &mut veilid_capnp::question::Builder) -> Result<(), RPCError> { + self.respond_to.encode(&mut builder.init_respond_to())?; + self.detail.encode(&mut builder.init_detail())?; + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub enum RPCQuestionDetail { + StatusQ(RPCOperationStatusQ), + FindNodeQ(RPCOperationFindNodeQ), + GetValueQ(RPCOperationGetValueQ), + SetValueQ(RPCOperationSetValueQ), + WatchValueQ(RPCOperationWatchValueQ), + SupplyBlockQ(RPCOperationSupplyBlockQ), + FindBlockQ(RPCOperationFindBlockQ), + StartTunnelQ(RPCOperationStartTunnelQ), + CompleteTunnelQ(RPCOperationCompleteTunnelQ), + CancelTunnelQ(RPCOperationCancelTunnelQ), +} + +impl RPCQuestionDetail { + pub fn desc(&self) -> &'static str { + match self { + RPCQuestionDetail::StatusQ(_) => "StatusQ", + RPCQuestionDetail::FindNodeQ(_) => "FindNodeQ", + RPCQuestionDetail::GetValueQ(_) => "GetValueQ", + RPCQuestionDetail::SetValueQ(_) => "SetValueQ", + RPCQuestionDetail::WatchValueQ(_) => "WatchValueQ", + RPCQuestionDetail::SupplyBlockQ(_) => "SupplyBlockQ", + RPCQuestionDetail::FindBlockQ(_) => "FindBlockQ", + RPCQuestionDetail::StartTunnelQ(_) => "StartTunnelQ", + RPCQuestionDetail::CompleteTunnelQ(_) => "CompleteTunnelQ", + RPCQuestionDetail::CancelTunnelQ(_) => "CancelTunnelQ", + } + } + + pub fn decode( + reader: &veilid_capnp::question::detail::Reader, + ) -> Result { + let which_reader = reader.which().map_err(map_error_capnp_notinschema!())?; + let out = match which_reader { + veilid_capnp::question::detail::StatusQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationStatusQ::decode(&op_reader)?; + RPCQuestionDetail::StatusQ(out) + } + veilid_capnp::question::detail::FindNodeQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationFindNodeQ::decode(&op_reader)?; + RPCQuestionDetail::FindNodeQ(out) + } + veilid_capnp::question::detail::GetValueQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationGetValueQ::decode(&op_reader)?; + RPCQuestionDetail::GetValueQ(out) + } + veilid_capnp::question::detail::SetValueQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationSetValueQ::decode(&op_reader)?; + RPCQuestionDetail::SetValueQ(out) + } + veilid_capnp::question::detail::WatchValueQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationWatchValueQ::decode(&op_reader)?; + RPCQuestionDetail::WatchValueQ(out) + } + veilid_capnp::question::detail::SupplyBlockQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationSupplyBlockQ::decode(&op_reader)?; + RPCQuestionDetail::SupplyBlockQ(out) + } + veilid_capnp::question::detail::FindBlockQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationFindBlockQ::decode(&op_reader)?; + RPCQuestionDetail::FindBlockQ(out) + } + veilid_capnp::question::detail::StartTunnelQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationStartTunnelQ::decode(&op_reader)?; + RPCQuestionDetail::StartTunnelQ(out) + } + veilid_capnp::question::detail::CompleteTunnelQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationCompleteTunnelQ::decode(&op_reader)?; + RPCQuestionDetail::CompleteTunnelQ(out) + } + veilid_capnp::question::detail::CancelTunnelQ(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationCancelTunnelQ::decode(&op_reader)?; + RPCQuestionDetail::CancelTunnelQ(out) + } + }; + Ok(out) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::question::detail::Builder, + ) -> Result<(), RPCError> { + match self { + RPCQuestionDetail::StatusQ(d) => d.encode(&mut builder.init_status_q()), + RPCQuestionDetail::FindNodeQ(d) => d.encode(&mut builder.init_find_node_q()), + RPCQuestionDetail::GetValueQ(d) => d.encode(&mut builder.init_get_value_q()), + RPCQuestionDetail::SetValueQ(d) => d.encode(&mut builder.init_set_value_q()), + RPCQuestionDetail::WatchValueQ(d) => d.encode(&mut builder.init_watch_value_q()), + RPCQuestionDetail::SupplyBlockQ(d) => d.encode(&mut builder.init_supply_block_q()), + RPCQuestionDetail::FindBlockQ(d) => d.encode(&mut builder.init_find_block_q()), + RPCQuestionDetail::StartTunnelQ(d) => d.encode(&mut builder.init_start_tunnel_q()), + RPCQuestionDetail::CompleteTunnelQ(d) => { + d.encode(&mut builder.init_complete_tunnel_q()) + } + RPCQuestionDetail::CancelTunnelQ(d) => d.encode(&mut builder.init_cancel_tunnel_q()), + } + } +} diff --git a/veilid-core/src/rpc_processor/coders/operations/respond_to.rs b/veilid-core/src/rpc_processor/coders/operations/respond_to.rs index 308487a6..1a80b4b4 100644 --- a/veilid-core/src/rpc_processor/coders/operations/respond_to.rs +++ b/veilid-core/src/rpc_processor/coders/operations/respond_to.rs @@ -3,7 +3,6 @@ use rpc_processor::*; #[derive(Debug, Clone)] pub enum RespondTo { - None, Sender(Option), PrivateRoute(PrivateRoute), } @@ -11,12 +10,9 @@ pub enum RespondTo { impl RespondTo { pub fn encode( &self, - builder: &mut veilid_capnp::operation::respond_to::Builder, + builder: &mut veilid_capnp::question::respond_to::Builder, ) -> Result<(), RPCError> { match self { - Self::None => { - builder.set_none(()); - } Self::Sender(Some(sni)) => { let mut sni_builder = builder.reborrow().init_sender_with_info(); encode_signed_node_info(sni, &mut sni_builder)?; @@ -33,27 +29,26 @@ impl RespondTo { } pub fn decode( - reader: &veilid_capnp::operation::respond_to::Reader, + reader: &veilid_capnp::question::respond_to::Reader, sender_node_id: &DHTKey, ) -> Result { let respond_to = match reader.which().map_err(map_error_capnp_notinschema!())? { - veilid_capnp::operation::respond_to::None(_) => RespondTo::None, - veilid_capnp::operation::respond_to::Sender(_) => RespondTo::Sender(None), - veilid_capnp::operation::respond_to::SenderWithInfo(Ok(sender_ni_reader)) => { + veilid_capnp::question::respond_to::Sender(_) => RespondTo::Sender(None), + veilid_capnp::question::respond_to::SenderWithInfo(Ok(sender_ni_reader)) => { let sni = decode_signed_node_info(&sender_ni_reader, sender_node_id, true)?; RespondTo::Sender(Some(sni)) } - veilid_capnp::operation::respond_to::SenderWithInfo(Err(e)) => { + veilid_capnp::question::respond_to::SenderWithInfo(Err(e)) => { return Err(rpc_error_protocol(format!( "invalid signed node info: {}", e ))) } - veilid_capnp::operation::respond_to::PrivateRoute(Ok(pr_reader)) => { + veilid_capnp::question::respond_to::PrivateRoute(Ok(pr_reader)) => { let pr = decode_private_route(&pr_reader)?; RespondTo::PrivateRoute(pr) } - veilid_capnp::operation::respond_to::PrivateRoute(Err(e)) => { + veilid_capnp::question::respond_to::PrivateRoute(Err(e)) => { return Err(rpc_error_protocol(format!("invalid private route: {}", e))); } }; diff --git a/veilid-core/src/rpc_processor/coders/operations/statement.rs b/veilid-core/src/rpc_processor/coders/operations/statement.rs new file mode 100644 index 00000000..c5947702 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/operations/statement.rs @@ -0,0 +1,109 @@ +use super::*; +use crate::*; +use rpc_processor::*; + +#[derive(Debug, Clone)] +pub struct RPCStatement { + detail: RPCStatementDetail, +} + +impl RPCStatement { + pub fn new(detail: RPCStatementDetail) -> Self { + Self { detail } + } + pub fn detail(&self) -> &RPCStatementDetail { + &self.detail + } + pub fn desc(&self) -> &'static str { + self.detail.desc() + } + pub fn decode( + reader: &veilid_capnp::statement::Reader, + sender_node_id: &DHTKey, + ) -> Result { + let d_reader = reader.get_detail(); + let detail = RPCStatementDetail::decode(&d_reader, sender_node_id)?; + Ok(RPCStatement { detail }) + } + pub fn encode(&self, builder: &mut veilid_capnp::statement::Builder) -> Result<(), RPCError> { + self.detail.encode(&mut builder.init_detail())?; + Ok(()) + } +} + +#[derive(Debug, Clone)] +pub enum RPCStatementDetail { + ValidateDialInfo(RPCOperationValidateDialInfo), + Route(RPCOperationRoute), + NodeInfoUpdate(RPCOperationNodeInfoUpdate), + ValueChanged(RPCOperationValueChanged), + Signal(RPCOperationSignal), + ReturnReceipt(RPCOperationReturnReceipt), +} + +impl RPCStatementDetail { + pub fn desc(&self) -> &'static str { + match self { + RPCStatementDetail::ValidateDialInfo(_) => "ValidateDialInfo", + RPCStatementDetail::Route(_) => "Route", + RPCStatementDetail::NodeInfoUpdate(_) => "NodeInfoUpdate", + RPCStatementDetail::ValueChanged(_) => "ValueChanged", + RPCStatementDetail::Signal(_) => "Signal", + RPCStatementDetail::ReturnReceipt(_) => "ReturnReceipt", + } + } + pub fn decode( + reader: &veilid_capnp::statement::detail::Reader, + sender_node_id: &DHTKey, + ) -> Result { + let which_reader = reader.which().map_err(map_error_capnp_notinschema!())?; + let out = match which_reader { + veilid_capnp::statement::detail::ValidateDialInfo(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationValidateDialInfo::decode(&op_reader)?; + RPCStatementDetail::ValidateDialInfo(out) + } + veilid_capnp::statement::detail::Route(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationRoute::decode(&op_reader)?; + RPCStatementDetail::Route(out) + } + veilid_capnp::statement::detail::NodeInfoUpdate(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationNodeInfoUpdate::decode(&op_reader, sender_node_id)?; + RPCStatementDetail::NodeInfoUpdate(out) + } + veilid_capnp::statement::detail::ValueChanged(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationValueChanged::decode(&op_reader)?; + RPCStatementDetail::ValueChanged(out) + } + veilid_capnp::statement::detail::Signal(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationSignal::decode(&op_reader)?; + RPCStatementDetail::Signal(out) + } + veilid_capnp::statement::detail::ReturnReceipt(r) => { + let op_reader = r.map_err(map_error_capnp_notinschema!())?; + let out = RPCOperationReturnReceipt::decode(&op_reader)?; + RPCStatementDetail::ReturnReceipt(out) + } + }; + Ok(out) + } + pub fn encode( + &self, + builder: &mut veilid_capnp::statement::detail::Builder, + ) -> Result<(), RPCError> { + match self { + RPCStatementDetail::ValidateDialInfo(d) => { + d.encode(&mut builder.init_validate_dial_info()) + } + RPCStatementDetail::Route(d) => d.encode(&mut builder.init_route()), + RPCStatementDetail::NodeInfoUpdate(d) => d.encode(&mut builder.init_node_info_update()), + RPCStatementDetail::ValueChanged(d) => d.encode(&mut builder.init_value_changed()), + RPCStatementDetail::Signal(d) => d.encode(&mut builder.init_signal()), + RPCStatementDetail::ReturnReceipt(d) => d.encode(&mut builder.init_return_receipt()), + } + } +} diff --git a/veilid-core/src/rpc_processor/coders/tunnel.rs b/veilid-core/src/rpc_processor/coders/tunnel.rs new file mode 100644 index 00000000..2f413fa7 --- /dev/null +++ b/veilid-core/src/rpc_processor/coders/tunnel.rs @@ -0,0 +1,106 @@ +use crate::*; +use rpc_processor::*; + +pub fn encode_tunnel_mode(tunnel_mode: TunnelMode) -> veilid_capnp::TunnelEndpointMode { + match tunnel_mode { + TunnelMode::Raw => veilid_capnp::TunnelEndpointMode::Raw, + TunnelMode::Turn => veilid_capnp::TunnelEndpointMode::Turn, + } +} + +pub fn decode_tunnel_mode(tunnel_endpoint_mode: veilid_capnp::TunnelEndpointMode) -> TunnelMode { + match tunnel_endpoint_mode { + veilid_capnp::TunnelEndpointMode::Raw => TunnelMode::Raw, + veilid_capnp::TunnelEndpointMode::Turn => TunnelMode::Turn, + } +} + +pub fn encode_tunnel_error(tunnel_error: TunnelError) -> veilid_capnp::TunnelError { + match tunnel_error { + TunnelError::BadId => veilid_capnp::TunnelError::BadId, + TunnelError::NoEndpoint => veilid_capnp::TunnelError::NoEndpoint, + TunnelError::RejectedMode => veilid_capnp::TunnelError::RejectedMode, + TunnelError::NoCapacity => veilid_capnp::TunnelError::NoCapacity, + } +} + +pub fn decode_tunnel_error(tunnel_error: veilid_capnp::TunnelError) -> TunnelError { + match tunnel_error { + veilid_capnp::TunnelError::BadId => TunnelError::BadId, + veilid_capnp::TunnelError::NoEndpoint => TunnelError::NoEndpoint, + veilid_capnp::TunnelError::RejectedMode => TunnelError::RejectedMode, + veilid_capnp::TunnelError::NoCapacity => TunnelError::NoCapacity, + } +} + +pub fn encode_tunnel_endpoint( + tunnel_endpoint: &TunnelEndpoint, + builder: &mut veilid_capnp::tunnel_endpoint::Builder, +) -> Result<(), RPCError> { + builder.set_mode(encode_tunnel_mode(tunnel_endpoint.mode)); + builder.set_description(&tunnel_endpoint.description); + + Ok(()) +} + +pub fn decode_tunnel_endpoint( + reader: &veilid_capnp::tunnel_endpoint::Reader, +) -> Result { + let mode = decode_tunnel_mode(reader.get_mode().map_err(map_error_capnp_notinschema!())?); + let description = reader.get_description(); + + Ok(TunnelEndpoint { mode, description }) +} + +pub fn encode_full_tunnel( + full_tunnel: &FullTunnel, + builder: &mut veilid_capnp::full_tunnel::Builder, +) -> Result<(), RPCError> { + builder.set_id(full_tunnel.id); + builder.set_timeout(full_tunnel.timeout); + let l_builder = builder.init_local(); + encode_tunnel_endpoint(&full_tunnel.local, &mut l_builder)?; + let r_builder = builder.init_remote(); + encode_tunnel_endpoint(&full_tunnel.remote, &mut r_builder)?; + Ok(()) +} + +pub fn decode_full_tunnel( + reader: &veilid_capnp::full_tunnel::Reader, +) -> Result { + let id = reader.get_id(); + let timeout = reader.get_timeout(); + let l_reader = reader.get_local().map_err(map_error_capnp_error!())?; + let local = decode_tunnel_endpoint(&l_reader).map_err(map_error_capnp_error!())?; + let r_reader = reader.get_remote().map_err(map_error_capnp_error!())?; + let remote = decode_tunnel_endpoint(&r_reader).map_err(map_error_capnp_error!())?; + + Ok(FullTunnel { + id, + timeout, + local, + remote, + }) +} + +pub fn encode_partial_tunnel( + partial_tunnel: &PartialTunnel, + builder: &mut veilid_capnp::partial_tunnel::Builder, +) -> Result<(), RPCError> { + builder.set_id(partial_tunnel.id); + builder.set_timeout(partial_tunnel.timeout); + let l_builder = builder.init_local(); + encode_tunnel_endpoint(&partial_tunnel.local, &mut l_builder)?; + Ok(()) +} + +pub fn decode_partial_tunnel( + reader: &veilid_capnp::partial_tunnel::Reader, +) -> Result { + let id = reader.get_id(); + let timeout = reader.get_timeout(); + let l_reader = reader.get_local().map_err(map_error_capnp_error!())?; + let local = decode_tunnel_endpoint(&l_reader).map_err(map_error_capnp_error!())?; + + Ok(PartialTunnel { id, timeout, local }) +} diff --git a/veilid-core/src/rpc_processor/debug.rs b/veilid-core/src/rpc_processor/debug.rs index fb433950..36610f63 100644 --- a/veilid-core/src/rpc_processor/debug.rs +++ b/veilid-core/src/rpc_processor/debug.rs @@ -111,74 +111,4 @@ impl RPCProcessor { Self::get_rpc_operation_detail_debug_info(&detail) ) } - - struct RpcOperationDetailInfo { - name: &'static str, - index: u32, - is_q: bool, - } - - pub(super) fn get_rpc_operation_detail_info( - detail: &veilid_capnp::operation::detail::WhichReader, - ) -> String { - match detail { - veilid_capnp::operation::detail::StatusQ(_) => "StatusQ".to_owned(), - veilid_capnp::operation::detail::StatusA(_) => "StatusA".to_owned(), - veilid_capnp::operation::detail::ValidateDialInfo(_) => "ValidateDialInfo".to_owned(), - veilid_capnp::operation::detail::FindNodeQ(_) => "FindNodeQ".to_owned(), - veilid_capnp::operation::detail::FindNodeA(_) => "FindNodeA".to_owned(), - veilid_capnp::operation::detail::Route(_) => "Route".to_owned(), - veilid_capnp::operation::detail::NodeInfoUpdate(_) => "NodeInfoUpdate".to_owned(), - veilid_capnp::operation::detail::GetValueQ(_) => "GetValueQ".to_owned(), - veilid_capnp::operation::detail::GetValueA(_) => "GetValueA".to_owned(), - veilid_capnp::operation::detail::SetValueQ(_) => "SetValueQ".to_owned(), - veilid_capnp::operation::detail::SetValueA(_) => "SetValueA".to_owned(), - veilid_capnp::operation::detail::WatchValueQ(_) => "WatchValueQ".to_owned(), - veilid_capnp::operation::detail::WatchValueA(_) => "WatchValueA".to_owned(), - veilid_capnp::operation::detail::ValueChanged(_) => "ValueChanged".to_owned(), - veilid_capnp::operation::detail::SupplyBlockQ(_) => "SupplyBlockQ".to_owned(), - veilid_capnp::operation::detail::SupplyBlockA(_) => "SupplyBlockA".to_owned(), - veilid_capnp::operation::detail::FindBlockQ(_) => "FindBlockQ".to_owned(), - veilid_capnp::operation::detail::FindBlockA(_) => "FindBlockA".to_owned(), - veilid_capnp::operation::detail::Signal(_) => "Signal".to_owned(), - veilid_capnp::operation::detail::ReturnReceipt(_) => "ReturnReceipt".to_owned(), - veilid_capnp::operation::detail::StartTunnelQ(_) => "StartTunnelQ".to_owned(), - veilid_capnp::operation::detail::StartTunnelA(_) => "StartTunnelA".to_owned(), - veilid_capnp::operation::detail::CompleteTunnelQ(_) => "CompleteTunnelQ".to_owned(), - veilid_capnp::operation::detail::CompleteTunnelA(_) => "CompleteTunnelA".to_owned(), - veilid_capnp::operation::detail::CancelTunnelQ(_) => "CancelTunnelQ".to_owned(), - veilid_capnp::operation::detail::CancelTunnelA(_) => "CancelTunnelA".to_owned(), - } - } - - pub(super) fn get_rpc_operation_detail_d( - let (which, is_q) = match which_reader - { - veilid_capnp::operation::detail::StatusQ(_) => (0u32, true), - veilid_capnp::operation::detail::StatusA(_) => (1u32, false), - veilid_capnp::operation::detail::ValidateDialInfo(_) => (2u32, true), - veilid_capnp::operation::detail::FindNodeQ(_) => (3u32, true), - veilid_capnp::operation::detail::FindNodeA(_) => (4u32, false), - veilid_capnp::operation::detail::Route(_) => (5u32, true), - veilid_capnp::operation::detail::NodeInfoUpdate(_) => (6u32, true), - veilid_capnp::operation::detail::GetValueQ(_) => (7u32, true), - veilid_capnp::operation::detail::GetValueA(_) => (8u32, false), - veilid_capnp::operation::detail::SetValueQ(_) => (9u32, true), - veilid_capnp::operation::detail::SetValueA(_) => (10u32, false), - veilid_capnp::operation::detail::WatchValueQ(_) => (11u32, true), - veilid_capnp::operation::detail::WatchValueA(_) => (12u32, false), - veilid_capnp::operation::detail::ValueChanged(_) => (13u32, true), - veilid_capnp::operation::detail::SupplyBlockQ(_) => (14u32, true), - veilid_capnp::operation::detail::SupplyBlockA(_) => (15u32, false), - veilid_capnp::operation::detail::FindBlockQ(_) => (16u32, true), - veilid_capnp::operation::detail::FindBlockA(_) => (17u32, false), - veilid_capnp::operation::detail::Signal(_) => (18u32, true), - veilid_capnp::operation::detail::ReturnReceipt(_) => (19u32, true), - veilid_capnp::operation::detail::StartTunnelQ(_) => (20u32, true), - veilid_capnp::operation::detail::StartTunnelA(_) => (21u32, false), - veilid_capnp::operation::detail::CompleteTunnelQ(_) => (22u32, true), - veilid_capnp::operation::detail::CompleteTunnelA(_) => (23u32, false), - veilid_capnp::operation::detail::CancelTunnelQ(_) => (24u32, true), - veilid_capnp::operation::detail::CancelTunnelA(_) => (25u32, false), - }; } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 75b3e04d..717ccff2 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -110,56 +110,6 @@ struct WaitableReply { send_data_kind: SendDataKind, } -struct RPCOperationInfo { - name: &'static str, - op_id: u64, - index: u32, - is_q: bool, - wants_answer: bool, - respond_to: RespondTo -} - -impl RPCOperationInfo { - pub fn parse(operation_reader: &veilid_capnp::operation::Reader, sender_node_id: &DHTKey) -> Result { - let which_reader = operation_reader.get_detail().which().expect("missing which operation"); - let op_id = operation_reader.get_op_id(); - - let respond_to_reader = operation_reader.get_respond_to(); - let respond_to = RespondTo::decode(&respond_to_reader, sender_node_id)?; - - let out = match which_reader { - veilid_capnp::operation::detail::StatusQ(_) => Self { name: "StatusQ", op_id, index: 0, is_q: true, wants_answer: true, respond_to }, - veilid_capnp::operation::detail::StatusA(_) => Self { name: "StatusA", op_id, index: 1, is_q: false, wants_answer: false, respond_to}, - veilid_capnp::operation::detail::ValidateDialInfo(_) => Self { name: "ValidateDialInfo", op_id, index: 2, is_q: true, wants_answer: false, respond_to }, - veilid_capnp::operation::detail::FindNodeQ(_) => Self { name: "FindNodeQ", op_id, index: 3, is_q: true, wants_answer: true, respond_to }, - veilid_capnp::operation::detail::FindNodeA(_) => Self { name: "FindNodeA", op_id, index: 4, is_q: false, wants_answer: false, respond_to }, - veilid_capnp::operation::detail::Route(_) => Self { name: "Route", op_id, index: 5, is_q: true, wants_answer: false, respond_to }, - veilid_capnp::operation::detail::NodeInfoUpdate(_) => Self { name: "NodeInfoUpdate", op_id, index: 6, is_q: true, wants_answer: false, respond_to }, - veilid_capnp::operation::detail::GetValueQ(_) => Self { name: "GetValueQ", op_id, index: 7, is_q: true, wants_answer: true, respond_to }, - veilid_capnp::operation::detail::GetValueA(_) => Self { name: "GetValueA", op_id, index: 8, is_q: false, wants_answer: false, respond_to }, - veilid_capnp::operation::detail::SetValueQ(_) => Self { name: "SetValueQ", op_id, index: 9, is_q: true, wants_answer: true, respond_to }, - veilid_capnp::operation::detail::SetValueA(_) => Self { name: "SetValueA", op_id, index: 10, is_q: false, wants_answer: false, respond_to}, - veilid_capnp::operation::detail::WatchValueQ(_) => Self { name: "WatchValueQ", op_id, index: 11, is_q: true, wants_answer: true, respond_to}, - veilid_capnp::operation::detail::WatchValueA(_) => Self { name: "WatchValueA", op_id, index: 12, is_q: false, wants_answer: false, respond_to}, - veilid_capnp::operation::detail::ValueChanged(_) => Self { name: "ValueChanged", op_id, index: 13, is_q: true, wants_answer: false, respond_to }, - veilid_capnp::operation::detail::SupplyBlockQ(_) => Self { name: "SupplyBlockQ", op_id, index: 14, is_q: true, wants_answer: true, respond_to }, - veilid_capnp::operation::detail::SupplyBlockA(_) => Self { name: "SupplyBlockA", op_id, index: 15, is_q: false, wants_answer: false, respond_to }, - veilid_capnp::operation::detail::FindBlockQ(_) => Self { name: "FindBlockQ", op_id, index: 16, is_q: true, wants_answer: true, respond_to}, - veilid_capnp::operation::detail::FindBlockA(_) =>Self { name: "FindBlockA", op_id, index: 17, is_q: false, wants_answer: false, respond_to}, - veilid_capnp::operation::detail::Signal(_) => Self { name: "Signal", op_id, index: 18, is_q: true, wants_answer: false, respond_to }, - veilid_capnp::operation::detail::ReturnReceipt(_) => Self { name: "ReturnReceipt", op_id, index: 19, is_q: true, wants_answer: false, respond_to}, - veilid_capnp::operation::detail::StartTunnelQ(_) => Self { name: "StartTunnelQ", op_id, index: 20, is_q: true, wants_answer: true, respond_to }, - veilid_capnp::operation::detail::StartTunnelA(_) => Self { name: "StartTunnelA", op_id, index: 21, is_q: false, wants_answer: false, respond_to }, - veilid_capnp::operation::detail::CompleteTunnelQ(_) =>Self { name: "CompleteTunnelQ", op_id, index: 22, is_q: true, wants_answer: true, respond_to}, - veilid_capnp::operation::detail::CompleteTunnelA(_) => Self { name: "CompleteTunnelA", op_id, index: 23, is_q: false, wants_answer: false, respond_to}, - veilid_capnp::operation::detail::CancelTunnelQ(_) => Self { name: "CancelTunnelQ", op_id, index: 24, is_q: true, wants_answer: true, respond_to}, - veilid_capnp::operation::detail::CancelTunnelA(_) => Self { name: "CancelTunnelA", op_id, index: 25, is_q: false, wants_answer: false, respond_to}, - }; - - Ok(out) - } -} - ///////////////////////////////////////////////////////////////////// #[derive(Clone, Debug, Default)] @@ -175,6 +125,12 @@ pub struct FindNodeAnswer { pub peers: Vec, // the list of closer peers } +struct RenderedOperation { + out: Vec, // The rendered operation bytes + out_node_id: DHTKey, // Node id we're sending to + out_noderef: Option, // Node to send envelope to (may not be destination node id in case of relay) + hopcount: usize, // Total safety + private route hop count +} ///////////////////////////////////////////////////////////////////// pub struct RPCProcessorInner { @@ -244,10 +200,6 @@ impl RPCProcessor { ////////////////////////////////////////////////////////////////////// - fn get_next_op_id(&self) -> OperationId { - intf::get_random_u64() - } - fn filter_peer_scope(&self, node_info: &NodeInfo) -> bool { // if local peer scope is enabled, then don't reject any peer info if self.enable_local_peer_scope { @@ -420,35 +372,18 @@ impl RPCProcessor { out } - // Issue a request over the network, possibly using an anonymized route - #[instrument(level = "debug", skip(self, message, safety_route_spec), err)] - async fn request( - &self, - dest: Destination, - message: capnp::message::Reader, - safety_route_spec: Option<&SafetyRouteSpec>, - ) -> Result, RPCError> { - - let info = { - let operation = message - .get_root::() - .map_err(map_error_internal!("invalid operation")) - .map_err(logthru_rpc!(error))?; - RPCOperationInfo::parse() - } - let (op_id, wants_answer) = { - - let op_id = operation.get_op_id(); - let wants_answer = self.wants_answer(&operation).map_err(logthru_rpc!())?; - - (op_id, wants_answer) - }; + #[instrument(level = "debug", skip(self, operation, safety_route_spec), err)] + fn render_operation(&self, dest: Destination, operation: &RPCOperation, safety_route_spec: Option<&SafetyRouteSpec>) -> Result + { + // Encode message to a builder and make a message reader for it + let mut msg_builder = ::capnp::message::Builder::new_default(); + let mut op_builder = msg_builder.init_root::(); + operation.encode(&mut op_builder)?; + // Create envelope data let out_node_id; // Envelope Node Id let mut out_noderef: Option = None; // Node to send envelope to let hopcount: usize; // Total safety + private route hop count - - // Create envelope data let out = { let out; // Envelope data @@ -471,7 +406,7 @@ impl RPCProcessor { None => { // If no safety route is being used, and we're not sending to a private // route, we can use a direct envelope instead of routing - out = reader_to_vec(&message)?; + out = builder_to_vec(msg_builder)?; // Message goes directly to the node out_node_id = node_id; @@ -485,7 +420,7 @@ impl RPCProcessor { let private_route = self.new_stub_private_route(node_id, &mut pr_builder)?; - let message_vec = reader_to_vec(&message)?; + let message_vec = builder_to_vec(msg_builder)?; // first out_node_id = sr .hops @@ -511,7 +446,7 @@ impl RPCProcessor { let pr_reader = pr_builder.into_reader(); // Reply with 'route' operation - let message_vec = reader_to_vec(&message)?; + let message_vec = builder_to_vec(msg_builder)?; out_node_id = match safety_route_spec { None => { // If no safety route, the first node is the first hop of the private route @@ -546,9 +481,35 @@ impl RPCProcessor { if hopcount > self.inner.lock().max_route_hop_count { return Err(rpc_error_internal("hop count too long for route")) .map_err(logthru_rpc!(warn)); - } - // calculate actual timeout - // timeout is number of hops times the timeout per hop + } + + Ok(RenderedOperation { + out, + out_node_id, + out_noderef, + hopcount, + }) + } + + // Issue a question over the network, possibly using an anonymized route + #[instrument(level = "debug", skip(self, question, safety_route_spec), err)] + async fn question( + &self, + dest: Destination, + question: RPCQuestion, + safety_route_spec: Option<&SafetyRouteSpec>, + ) -> Result { + + // Wrap question in operation + let operation = RPCOperation::new_question(question); + + // Produce rendered operation + let RenderedOperation { + out, out_node_id, out_noderef, hopcount + } = self.render_operation(dest, &operation, safety_route_spec)?; + + // Calculate answer timeout + // Timeout is number of hops times the timeout per hop let timeout = self.inner.lock().timeout * (hopcount as u64); // if we need to resolve the first hop, do it @@ -565,19 +526,14 @@ impl RPCProcessor { } }; - // set up op id eventual - let eventual = if wants_answer { - Some(self.add_op_id_waiter(op_id)) - } else { - None - }; + // Set up op id eventual + let op_id = operation.op_id(); + let eventual = self.add_op_id_waiter(op_id); - // Log rpc receive - debug!(target: "rpc_message", dir = "send", is_q, kind = Self::get_rpc_operation_detail_debug_info(&which_reader), op_id = operation.get_op_id(), sender_id = msg.header.envelope.get_sender_id().encode()); + // Log rpc send + debug!(target: "rpc_message", dir = "send", kind = "question", op_id, desc = operation.kind().desc()); - log_rpc!(debug "==>> REQUEST({}) -> {:?}", self.get_rpc_message_debug_info(&message), dest); - - // send question + // Send question let bytes = out.len() as u64; let send_ts = intf::get_timestamp(); let send_data_kind = match self @@ -589,12 +545,10 @@ impl RPCProcessor { Ok(v) => v, Err(e) => { // Make sure to clean up op id waiter in case of error - if eventual.is_some() { - self.cancel_op_id_waiter(op_id); - } + self.cancel_op_id_waiter(op_id); self.routing_table() - .stats_failed_to_send(node_ref, send_ts, wants_answer); + .stats_failed_to_send(node_ref, send_ts, true); return Err(e); } @@ -602,29 +556,25 @@ impl RPCProcessor { // Successfully sent self.routing_table() - .stats_question_sent(node_ref.clone(), send_ts, bytes, wants_answer); + .stats_question_sent(node_ref.clone(), send_ts, bytes, true); // Pass back waitable reply completion - match eventual { - None => { - // if we don't want an answer, don't wait for one - Ok(None) - } - Some(eventual) => Ok(Some(WaitableReply { - op_id, - eventual, - timeout, - node_ref, - send_ts, - send_data_kind, - })), - } + Ok(WaitableReply { + op_id, + eventual, + timeout, + node_ref, + send_ts, + send_data_kind, + }) } + xxxx continue here, make 'statement' sender + // Issue a reply over the network, possibly using an anonymized route // The request must want a response, or this routine fails #[instrument(level = "debug", skip(self, request_rpcreader, reply_msg, safety_route_spec), err)] - async fn reply( + async fn answer( &self, request_rpcreader: RPCMessageReader, reply_msg: capnp::message::Reader, @@ -1497,25 +1447,17 @@ impl RPCProcessor { // Send StatusQ RPC request, receive StatusA answer // Can be sent via relays, but not via routes pub async fn rpc_call_status(self, peer: NodeRef) -> Result { - let status_q_msg = { - let mut status_q_msg = ::capnp::message::Builder::new_default(); - let mut question = status_q_msg.init_root::(); - question.set_op_id(self.get_next_op_id()); - let mut respond_to = question.reborrow().init_respond_to(); - self.make_respond_to_sender(peer.clone()) - .encode(&mut respond_to)?; - let detail = question.reborrow().init_detail(); - let mut sqb = detail.init_status_q(); - let mut node_status_builder = sqb.reborrow().init_node_status(); - let node_status = self.network_manager().generate_node_status(); - encode_node_status(&node_status, &mut node_status_builder)?; - status_q_msg.into_reader() + let node_status = self.network_manager().generate_node_status(); + let status_q = RPCOperationStatusQ { + node_status }; + let respond_to = self.make_respond_to_sender(peer.clone()); + let operation = RPCOperation::new_question(RPCQuestion::new(respond_to, RPCQuestionDetail::StatusQ(status_q))); // Send the info request let waitable_reply = self - .request(Destination::Direct(peer.clone()), status_q_msg, None) + .request(Destination::Direct(peer.clone()), operation, None) .await? .unwrap(); diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 0679a15c..073fc706 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -1588,21 +1588,27 @@ pub enum TunnelMode { Turn, } -type TunnelId = u64; +#[derive(Clone, Debug, PartialOrd, PartialEq, Eq, Ord, Serialize, Deserialize)] +pub enum TunnelError { + BadId, // Tunnel ID was rejected + NoEndpoint, // Endpoint was unreachable + RejectedMode, // Endpoint couldn't provide mode + NoCapacity, // Endpoint is full +} + +pub type TunnelId = u64; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct TunnelEndpoint { - pub node_id: NodeId, // the node id of the tunnel endpoint - pub dial_info: Vec, // multiple ways of how to get to the node pub mode: TunnelMode, + pub description: String, // XXX: TODO } impl Default for TunnelEndpoint { fn default() -> Self { Self { - node_id: NodeId::default(), - dial_info: Vec::new(), mode: TunnelMode::Raw, + description: "".to_string(), } } }