Skip to content

Commit

Permalink
fix: use shard key from database from epoch manager
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Nov 25, 2022
1 parent 6467091 commit 2fdb0e2
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub struct BaseLayerEpochManager {
node_identity: Arc<NodeIdentity>,
validator_node_config: ValidatorNodeConfig,
validator_node_client_factory: TariCommsValidatorNodeClientFactory,
current_shard_key: Option<ShardId>,
}

impl BaseLayerEpochManager {
Expand All @@ -87,6 +88,7 @@ impl BaseLayerEpochManager {
node_identity,
validator_node_config,
validator_node_client_factory,
current_shard_key: None,
}
}

Expand Down Expand Up @@ -149,6 +151,7 @@ impl BaseLayerEpochManager {
return Ok(());
},
};
self.current_shard_key = Some(vn_shard_key);
info!(
target: LOG_TARGET,
"🖊 Validator node is registered for epoch {}, shard key: {} ", epoch, vn_shard_key
Expand Down Expand Up @@ -240,14 +243,21 @@ impl BaseLayerEpochManager {
self.current_epoch
}

pub async fn get_shard_id(&mut self, epoch: Epoch, public_key: &PublicKey) -> Result<ShardId, EpochManagerError> {
// TODO: Cache the assigned shard key for the epoch (ideally the epoch validity range)
let shard_id = self
.base_node_client
.get_shard_key(epoch.to_height(), public_key)
.await?
.ok_or(EpochManagerError::ValidatorNodeNotRegistered)?;
Ok(shard_id)
pub fn get_validator_shard_key(
&mut self,
epoch: Epoch,
public_key: &PublicKey,
) -> Result<ShardId, EpochManagerError> {
let db = self.db_factory.get_or_create_global_db()?;
let tx = db
.create_transaction()
.map_err(|e| EpochManagerError::StorageError(e.into()))?;
let vn = db
.validator_nodes(&tx)
.get(epoch.0, public_key.as_bytes())
.map_err(|e| EpochManagerError::StorageError(e.into()))?;

Ok(ShardId::from_bytes(&vn.shard_key).expect("Invalid Shard Key, Database is corrupt"))
}

pub async fn last_registration_epoch(&self) -> Result<Option<Epoch>, EpochManagerError> {
Expand Down Expand Up @@ -369,7 +379,7 @@ impl BaseLayerEpochManager {
.map_err(|e| EpochManagerError::StorageError(e.into()))?;
let db_vns = db
.validator_nodes(&tx)
.get_validator_nodes_per_epoch(epoch.0)
.get_all_per_epoch(epoch.0)
.map_err(|e| EpochManagerError::StorageError(e.into()))?;
let vns = db_vns
.into_iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub enum EpochManagerRequest {
CurrentEpoch {
reply: Reply<Epoch>,
},
GetShardId {
GetValidatorShardKey {
epoch: Epoch,
addr: CommsPublicKey,
reply: Reply<ShardId>,
Expand Down Expand Up @@ -180,8 +180,8 @@ impl EpochManagerService {
async fn handle_request(&mut self, req: EpochManagerRequest) {
match req {
EpochManagerRequest::CurrentEpoch { reply } => handle(reply, Ok(self.inner.current_epoch())),
EpochManagerRequest::GetShardId { epoch, addr, reply } => {
handle(reply, self.inner.get_shard_id(epoch, &addr).await)
EpochManagerRequest::GetValidatorShardKey { epoch, addr, reply } => {
handle(reply, self.inner.get_validator_shard_key(epoch, &addr))
},
EpochManagerRequest::UpdateEpoch { tip_info, reply } => {
handle(reply, self.inner.update_epoch(tip_info).await);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ impl EpochManager<CommsPublicKey> for EpochManagerHandle {
rx.await.map_err(|_| EpochManagerError::ReceiveError)?
}

async fn get_shard_id(&self, epoch: Epoch, addr: CommsPublicKey) -> Result<ShardId, EpochManagerError> {
async fn get_validator_shard_key(&self, epoch: Epoch, addr: CommsPublicKey) -> Result<ShardId, EpochManagerError> {
let (tx, rx) = oneshot::channel();
self.tx_request
.send(EpochManagerRequest::GetShardId { epoch, addr, reply: tx })
.send(EpochManagerRequest::GetValidatorShardKey { epoch, addr, reply: tx })
.await
.map_err(|_| EpochManagerError::SendError)?;

Expand Down
4 changes: 2 additions & 2 deletions dan_layer/core/src/services/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub enum EpochManagerError {
// TODO: Rename to reflect that it's a read only interface (e.g. EpochReader, EpochQuery)
pub trait EpochManager<TAddr: NodeAddressable>: Clone {
async fn current_epoch(&self) -> Result<Epoch, EpochManagerError>;
async fn get_shard_id(&self, epoch: Epoch, addr: TAddr) -> Result<ShardId, EpochManagerError>;
async fn get_validator_shard_key(&self, epoch: Epoch, addr: TAddr) -> Result<ShardId, EpochManagerError>;
async fn is_epoch_valid(&self, epoch: Epoch) -> Result<bool, EpochManagerError>;
async fn get_committees(
&self,
Expand Down Expand Up @@ -168,7 +168,7 @@ impl<TAddr: NodeAddressable> EpochManager<TAddr> for RangeEpochManager<TAddr> {
Ok(self.current_epoch)
}

async fn get_shard_id(&self, epoch: Epoch, addr: TAddr) -> Result<ShardId, EpochManagerError> {
async fn get_validator_shard_key(&self, epoch: Epoch, addr: TAddr) -> Result<ShardId, EpochManagerError> {
self.registered_vns
.iter()
.find_map(|(e, vns)| {
Expand Down
9 changes: 9 additions & 0 deletions dan_layer/core/src/storage/mocks/global_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ impl GlobalDbAdapter for MockGlobalDbBackupAdapter {
todo!()
}

fn get_validator_node(
&self,
_tx: &Self::DbTransaction,
_epoch: u64,
_public_key: &[u8],
) -> Result<DbValidatorNode, Self::Error> {
todo!()
}

fn insert_epoch(&self, _tx: &Self::DbTransaction, _epoch: DbEpoch) -> Result<(), Self::Error> {
todo!()
}
Expand Down
25 changes: 16 additions & 9 deletions dan_layer/core/src/workers/hotstuff_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,10 @@ where
.position(|vn| vn.public_key == self.public_key)
.expect("The VN is not registered");

let node_shard_id = self.get_shard_id(node.epoch(), self.public_key.clone()).await?;
let node_shard_key = self
.epoch_manager
.get_validator_shard_key(node.epoch(), self.public_key.clone())
.await?;
let vn_mmr = self.epoch_manager.get_validator_node_mmr(node.epoch()).await?;

{
Expand All @@ -419,7 +422,11 @@ where
return Ok(());
}

tx.save_leader_proposals(shard, node.payload_id(), node.payload_height(), node.clone())?;
tx.save_leader_proposals(node.shard(), node.payload_id(), node.payload_height(), node.clone())?;
assert!(
involved_shards.contains(&node.shard()),
"Received a proposal for a shard that is not involved"
);

let mut leader_proposals = vec![];
for s in &involved_shards {
Expand All @@ -434,6 +441,11 @@ where
}
}
if leader_proposals.len() == involved_shards.len() {
info!(
target: LOG_TARGET,
"🔥 Received enough proposals to vote on the message: {}",
leader_proposals.len()
);
// Execute the payload!
let shard_pledges: HashMap<ShardId, Option<ObjectPledge>> = leader_proposals
.iter()
Expand Down Expand Up @@ -473,14 +485,14 @@ where
&finalize_result,
)?;

vote_msg.sign_vote(&self.signing_service, node_shard_id, &vn_mmr, vn_mmr_leaf_index as u64)?;
vote_msg.sign_vote(&self.signing_service, node_shard_key, &vn_mmr, vn_mmr_leaf_index as u64)?;

votes_to_send.push((vote_msg, local_node.proposed_by().clone()));
}
} else {
info!(
target: LOG_TARGET,
"🔥 Not enough votes to vote on the message, votes: {}, involved_shards: {}",
"🔥 Not enough proposals to vote on the message, num proposals: {}, involved_shards: {}",
leader_proposals.len(),
involved_shards.len()
);
Expand All @@ -498,11 +510,6 @@ where
Ok(())
}

async fn get_shard_id(&self, epoch: Epoch, addr: TAddr) -> Result<ShardId, HotStuffError> {
let shard_id = self.epoch_manager.get_shard_id(epoch, addr).await?;
Ok(shard_id)
}

/// Step 6: The leader receives votes from the local shard, and once it has enough ($n - f$) votes, it commits a
/// high QC and sends the next round of proposals.
async fn on_receive_vote(&mut self, from: TAddr, msg: VoteMessage) -> Result<(), HotStuffError> {
Expand Down
6 changes: 6 additions & 0 deletions dan_layer/storage/src/global/backend_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ pub trait GlobalDbAdapter: AtomicDb + Send + Sync + Clone {
epoch: u64,
) -> Result<Vec<DbValidatorNode>, Self::Error>;

fn get_validator_node(
&self,
tx: &Self::DbTransaction,
epoch: u64,
public_key: &[u8],
) -> Result<DbValidatorNode, Self::Error>;
fn insert_epoch(&self, tx: &Self::DbTransaction, epoch: DbEpoch) -> Result<(), Self::Error>;
fn get_epoch(&self, tx: &Self::DbTransaction, epoch: u64) -> Result<Option<DbEpoch>, Self::Error>;
}
8 changes: 7 additions & 1 deletion dan_layer/storage/src/global/validator_node_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ impl<'a, TGlobalDbAdapter: GlobalDbAdapter> ValidatorNodeDb<'a, TGlobalDbAdapter
.map_err(TGlobalDbAdapter::Error::into)
}

pub fn get_validator_nodes_per_epoch(&self, epoch: u64) -> Result<Vec<DbValidatorNode>, TGlobalDbAdapter::Error> {
pub fn get(&self, epoch: u64, public_key: &[u8]) -> Result<DbValidatorNode, TGlobalDbAdapter::Error> {
self.backend
.get_validator_node(self.tx, epoch, public_key)
.map_err(TGlobalDbAdapter::Error::into)
}

pub fn get_all_per_epoch(&self, epoch: u64) -> Result<Vec<DbValidatorNode>, TGlobalDbAdapter::Error> {
self.backend
.get_validator_nodes_per_epoch(self.tx, epoch)
.map_err(TGlobalDbAdapter::Error::into)
Expand Down
24 changes: 22 additions & 2 deletions dan_layer/storage_sqlite/src/global/backend_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,26 @@ impl GlobalDbAdapter for SqliteGlobalDbAdapter {
Ok(())
}

fn get_validator_node(
&self,
tx: &Self::DbTransaction,
epoch: u64,
public_key: &[u8],
) -> Result<DbValidatorNode, Self::Error> {
use crate::global::schema::{validator_nodes, validator_nodes::dsl};

let vn = dsl::validator_nodes
.filter(validator_nodes::epoch.eq(epoch as i64))
.filter(validator_nodes::public_key.eq(public_key))
.first::<ValidatorNode>(tx.connection())
.map_err(|source| SqliteStorageError::DieselError {
source,
operation: "get::validator_nodes_per_epoch".to_string(),
})?;

Ok(vn.into())
}

fn get_validator_nodes_per_epoch(
&self,
tx: &Self::DbTransaction,
Expand All @@ -246,7 +266,7 @@ impl GlobalDbAdapter for SqliteGlobalDbAdapter {
use crate::global::schema::{validator_nodes, validator_nodes::dsl};

let sqlite_vns = dsl::validator_nodes
.filter(validator_nodes::epoch.eq(epoch as i32))
.filter(validator_nodes::epoch.eq(epoch as i64))
.load::<ValidatorNode>(tx.connection())
.map_err(|source| SqliteStorageError::DieselError {
source,
Expand Down Expand Up @@ -282,7 +302,7 @@ impl GlobalDbAdapter for SqliteGlobalDbAdapter {
use crate::global::schema::epochs::dsl;

let query_res: Option<Epoch> = dsl::epochs
.find(epoch as i32)
.find(epoch as i64)
.first(tx.connection())
.optional()
.map_err(|source| SqliteStorageError::DieselError {
Expand Down
6 changes: 3 additions & 3 deletions dan_layer/storage_sqlite/src/global/models/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::global::schema::*;

#[derive(Queryable)]
pub struct Epoch {
pub epoch: i32,
pub epoch: i64,
pub validator_node_mr: Vec<u8>,
}

Expand All @@ -42,14 +42,14 @@ impl From<Epoch> for DbEpoch {
#[derive(Insertable)]
#[table_name = "epochs"]
pub struct NewEpoch {
pub epoch: i32,
pub epoch: i64,
pub validator_node_mr: Vec<u8>,
}

impl From<DbEpoch> for NewEpoch {
fn from(e: DbEpoch) -> Self {
Self {
epoch: e.epoch as i32,
epoch: e.epoch as i64,
validator_node_mr: e.validator_node_mr,
}
}
Expand Down
6 changes: 3 additions & 3 deletions dan_layer/storage_sqlite/src/global/models/validator_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct ValidatorNode {
pub id: i32,
pub public_key: Vec<u8>,
pub shard_key: Vec<u8>,
pub epoch: i32,
pub epoch: i64,
}

impl From<ValidatorNode> for DbValidatorNode {
Expand All @@ -47,15 +47,15 @@ impl From<ValidatorNode> for DbValidatorNode {
pub struct NewValidatorNode {
pub public_key: Vec<u8>,
pub shard_key: Vec<u8>,
pub epoch: i32,
pub epoch: i64,
}

impl From<DbValidatorNode> for NewValidatorNode {
fn from(vn: DbValidatorNode) -> Self {
Self {
shard_key: vn.shard_key,
public_key: vn.public_key,
epoch: vn.epoch as i32,
epoch: vn.epoch as i64,
}
}
}
4 changes: 2 additions & 2 deletions dan_layer/storage_sqlite/src/global/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

table! {
epochs (epoch) {
epoch -> Integer,
epoch -> BigInt,
validator_node_mr -> Binary,
}
}
Expand All @@ -32,7 +32,7 @@ table! {
id -> Integer,
public_key -> Binary,
shard_key -> Binary,
epoch -> Integer,
epoch -> BigInt,
}
}

Expand Down

0 comments on commit 2fdb0e2

Please sign in to comment.