Skip to content

Commit

Permalink
Generate Unique EcChronos ID #678
Browse files Browse the repository at this point in the history
- Generate eccrhonos_Id that should be unique (uuid + host name)
- on start up instance, first check nodes_metadata
table for ecchronosId, if it already exists, use that
one, otherwise generate now one.
- So due to persistence of eccronosId, it would be same
even instance get reinstalled or restart.
- when instance try to acquire nodes then add this
newly ecchronosId into nodes_metadata table with datacenter
and nodeId.
- The new table nodes_metadata schenma, datacenter is partition key
and nodeId is clustering colum and ecchrons_id coulum would hold the
Id.
  • Loading branch information
sajid riaz committed Sep 19, 2024
1 parent b5178cc commit 87b95c5
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class BeanConfigurator
private final AtomicReference<Security.CqlSecurity> cqlSecurity = new AtomicReference<>();
private final AtomicReference<Security.JmxSecurity> jmxSecurity = new AtomicReference<>();
private final ConfigRefresher configRefresher;
private final String ecChronosID;

/**
* Constructs a new {@code BeanConfigurator} and initializes the configuration and security settings. If the
Expand All @@ -91,7 +90,6 @@ public BeanConfigurator() throws ConfigurationException, UnknownHostException
Security security = getSecurityConfig();
cqlSecurity.set(security.getCqlSecurity());
jmxSecurity.set(security.getJmxSecurity());
ecChronosID = ECCHORONS_ID_PRE_STRING.concat(InetAddress.getLocalHost().getHostName());
}

/**
Expand Down Expand Up @@ -282,7 +280,6 @@ private EccNodesSync getEccNodesSync(
EccNodesSync myEccNodesSync = EccNodesSync.newBuilder()
.withInitialNodesList(distributedNativeConnectionProvider.getNodes())
.withSession(distributedNativeConnectionProvider.getCqlSession())
.withEcchronosID(ecChronosID)
.withConnectionDelayValue(connectionDelay.getTime())
.withConnectionDelayUnit(connectionDelay.getUnit())
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2024 Telefonaktiebolaget LM Ericsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ericsson.bss.cassandra.ecchronos.data.sync;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;

public final class EccNodeMetadata
{

private static final Logger LOG = LoggerFactory.getLogger(EccNodeMetadata.class);
private static final String COLUMN_ECCHRONOS_ID = "ecchronos_id";
private static final String COLUMN_DC_NAME = "datacenter_name";
private static final String COLUMN_NODE_ID = "node_id";
private static final String KEYSPACE_NAME = "ecchronos";
private static final String NODE_METADATA_TABLE_NAME = "nodes_metadata";

private final PreparedStatement mySelectStatement;
private final PreparedStatement myCreateStatement;
private final PreparedStatement myTableUpdateStatement;
private final CqlSession mySession;
private final Node myNode;

public EccNodeMetadata(final List<Node> nodes, final CqlSession session)
{
mySession = session;
myNode = nodes.stream().findAny().get();
myCreateStatement = mySession.prepare(QueryBuilder.insertInto(KEYSPACE_NAME, NODE_METADATA_TABLE_NAME)
.value(COLUMN_DC_NAME, bindMarker())
.value(COLUMN_NODE_ID, bindMarker())
.value(COLUMN_ECCHRONOS_ID, bindMarker())
.build());
myTableUpdateStatement = mySession.prepare(QueryBuilder.update(KEYSPACE_NAME, NODE_METADATA_TABLE_NAME)
.setColumn(COLUMN_ECCHRONOS_ID, bindMarker())
.whereColumn(COLUMN_DC_NAME)
.isEqualTo(bindMarker())
.whereColumn(COLUMN_NODE_ID)
.isEqualTo(bindMarker())
.build()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM));
mySelectStatement = mySession.prepare(selectFrom(KEYSPACE_NAME, NODE_METADATA_TABLE_NAME)
.columns(COLUMN_ECCHRONOS_ID)
.whereColumn(COLUMN_DC_NAME)
.isEqualTo(bindMarker())
.build());
}

public String getEccChronosID() throws UnknownHostException
{
ResultSet resultSet = getResultSet();

Row row = resultSet.one();
if (row != null)
{
return row.getString(COLUMN_ECCHRONOS_ID);
}
return generateNewEcChronosID();
}

public ResultSet getResultSet()
{
BoundStatement boundStatement = mySelectStatement.bind(myNode.getDatacenter());
return mySession.execute(boundStatement);
}

public ResultSet insertNodeMetadataInfo(final String datacenterName,
final UUID nodeID,
final String eccChronosID)
{
BoundStatement insertMetadataInfo = myCreateStatement.bind(datacenterName, nodeID, eccChronosID);
ResultSet tmpResultSet = mySession.execute(insertMetadataInfo);
LOG.info("Preparing to insert node metadata with Datacenter {}, NodeId {} and EccChronosID {}",
datacenterName, nodeID, eccChronosID);
if (tmpResultSet.wasApplied())
{
LOG.info("Node metadata info successfully stored with EccChronosID {}", eccChronosID);
}
else
{
LOG.error("Unable to store metadata info with NodeId{}", nodeID);
}
return tmpResultSet;
}

public ResultSet updateNodeMetadataEcChronosIdStatement(final String ecChronosId,
final String datacenterName,
final UUID nodeID)
{
BoundStatement updateEcChronosID = myTableUpdateStatement.bind(ecChronosId, datacenterName, nodeID);
ResultSet tmpResultSet = mySession.execute(updateEcChronosID);
if (tmpResultSet.wasApplied())
{
LOG.info("Node metadata {} successfully updated", nodeID);
}
else
{
LOG.error("Unable to update node metadata {}", nodeID);
}
return tmpResultSet;
}

private String generateNewEcChronosID() throws UnknownHostException
{
String uuid = UUID.randomUUID().toString();
return uuid.concat(InetAddress.getLocalHost().getHostName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public final class EccNodesSync
private final CqlSession mySession;
private final List<Node> myNodesList;
private final String ecChronosID;
private final EccNodeMetadata myEccNodeMetadata;

private final PreparedStatement myCreateStatement;
private final PreparedStatement myUpdateStatusStatement;
Expand All @@ -75,8 +76,8 @@ public final class EccNodesSync
private EccNodesSync(final Builder builder) throws UnknownHostException
{
mySession = Preconditions.checkNotNull(builder.mySession, "Session cannot be null");
myNodesList = Preconditions
.checkNotNull(builder.initialNodesList, "Nodes list cannot be null");
myNodesList = builder.initialNodesList;
myEccNodeMetadata = new EccNodeMetadata(myNodesList, mySession);
myCreateStatement = mySession.prepare(QueryBuilder.insertInto(KEYSPACE_NAME, TABLE_NAME)
.value(COLUMN_ECCHRONOS_ID, bindMarker())
.value(COLUMN_DC_NAME, bindMarker())
Expand All @@ -91,16 +92,20 @@ private EccNodesSync(final Builder builder) throws UnknownHostException
.setColumn(COLUMN_NODE_STATUS, bindMarker())
.setColumn(COLUMN_LAST_CONNECTION, bindMarker())
.setColumn(COLUMN_NEXT_CONNECTION, bindMarker())
.whereColumn(COLUMN_ECCHRONOS_ID).isEqualTo(bindMarker())
.whereColumn(COLUMN_DC_NAME).isEqualTo(bindMarker())
.whereColumn(COLUMN_NODE_ID).isEqualTo(bindMarker())
.whereColumn(COLUMN_ECCHRONOS_ID)
.isEqualTo(bindMarker())
.whereColumn(COLUMN_DC_NAME)
.isEqualTo(bindMarker())
.whereColumn(COLUMN_NODE_ID)
.isEqualTo(bindMarker())
.build()
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM));
mySelectStatusStatement = mySession.prepare(selectFrom(KEYSPACE_NAME, TABLE_NAME)
.columns(COLUMN_NODE_ID, COLUMN_NODE_ENDPOINT, COLUMN_DC_NAME, COLUMN_NODE_STATUS)
.whereColumn(COLUMN_ECCHRONOS_ID).isEqualTo(bindMarker())
.whereColumn(COLUMN_ECCHRONOS_ID)
.isEqualTo(bindMarker())
.build());
ecChronosID = builder.myEcchronosID;
ecChronosID = myEccNodeMetadata.getEccChronosID();

connectionDelayValue = builder.myConnectionDelayValue;
connectionDelayUnit = builder.myConnectionDelayUnit;
Expand All @@ -115,10 +120,6 @@ public ResultSet getResultSet()

public void acquireNodes() throws EcChronosException
{
if (myNodesList.isEmpty())
{
throw new EcChronosException("Cannot Acquire Nodes because there is no nodes to be acquired");
}
for (Node node : myNodesList)
{
LOG.info(
Expand All @@ -129,6 +130,7 @@ public void acquireNodes() throws EcChronosException
ResultSet tmpResultSet = acquireNode(node);
if (tmpResultSet.wasApplied())
{
myEccNodeMetadata.insertNodeMetadataInfo(node.getDatacenter(), node.getHostId(), ecChronosID);
LOG.info("Node successfully acquired by instance {}", ecChronosID);
}
else
Expand Down Expand Up @@ -156,24 +158,22 @@ public ResultSet verifyAcquireNode(final Node node)
}

private ResultSet insertNodeInfo(
final String datacenterName,
final String nodeEndpoint,
final String nodeStatus,
final Instant lastConnection,
final Instant nextConnection,
final UUID nodeID
)
final String datacenterName,
final String nodeEndpoint,
final String nodeStatus,
final Instant lastConnection,
final Instant nextConnection,
final UUID nodeID)
{
BoundStatement insertNodeSyncInfo = myCreateStatement.bind(ecChronosID,
datacenterName, nodeEndpoint, nodeStatus, lastConnection, nextConnection, nodeID);
return execute(insertNodeSyncInfo);
}

public ResultSet updateNodeStatus(
final NodeStatus nodeStatus,
final String datacenterName,
final UUID nodeID
)
final NodeStatus nodeStatus,
final String datacenterName,
final UUID nodeID)
{
ResultSet tmpResultSet = updateNodeStateStatement(nodeStatus, datacenterName, nodeID);
if (tmpResultSet.wasApplied())
Expand All @@ -188,40 +188,36 @@ public ResultSet updateNodeStatus(
}

private ResultSet updateNodeStateStatement(
final NodeStatus nodeStatus,
final String datacenterName,
final UUID nodeID
)
final NodeStatus nodeStatus,
final String datacenterName,
final UUID nodeID)
{
BoundStatement updateNodeStatus = myUpdateStatusStatement.bind(
nodeStatus.toString(),
Instant.now(),
Instant.now().plus(DEFAULT_CONNECTION_DELAY_IN_MINUTES, ChronoUnit.MINUTES),
ecChronosID,
datacenterName,
nodeID
);
nodeID);
return execute(updateNodeStatus);
}

@VisibleForTesting
public ResultSet verifyInsertNodeInfo(
final String datacenterName,
final String nodeEndpoint,
final String nodeStatus,
final Instant lastConnection,
final Instant nextConnection,
final UUID nodeID
)
final String datacenterName,
final String nodeEndpoint,
final String nodeStatus,
final Instant lastConnection,
final Instant nextConnection,
final UUID nodeID)
{
return insertNodeInfo(
datacenterName,
nodeEndpoint,
nodeStatus,
lastConnection,
nextConnection,
nodeID
);
nodeID);
}

public ResultSet execute(final BoundStatement statement)
Expand All @@ -238,7 +234,6 @@ public static class Builder
{
private CqlSession mySession;
private List<Node> initialNodesList;
private String myEcchronosID;
private Long myConnectionDelayValue;
private ChronoUnit myConnectionDelayUnit;

Expand Down Expand Up @@ -294,29 +289,19 @@ public Builder withConnectionDelayUnit(final TimeUnit connectionDelayUnit)
return this;
}

/**
* Builds EccNodesSync with ecchronosID.
*
* @param echronosID
* ecchronos ID generated by BeanConfigurator.
* @return Builder
*/
public Builder withEcchronosID(final String echronosID)
{
this.myEcchronosID = echronosID;
return this;
}

/**
* Builds EccNodesSync.
*
* @return Builder
* @throws UnknownHostException
*/
public EccNodesSync build() throws UnknownHostException
public EccNodesSync build() throws UnknownHostException, EcChronosException
{
if (initialNodesList == null || initialNodesList.isEmpty())
{
throw new EcChronosException("No nodes available because the node list is empty or null");
}
return new EccNodesSync(this);
}
}
}

Loading

0 comments on commit 87b95c5

Please sign in to comment.