Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Rename DecommissionBrokers to UnregisterBrokers #10084

Merged
merged 4 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1486,41 +1486,46 @@ default DescribeFeaturesResult describeFeatures() {
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);

/**
* Permanently remove a broker and reassign any partitions on the broker.
* Unregister a broker.
* <p>
* This operation is supported only on self-managed Kafka clusters (i.e. brokers which do not rely on Zookeeper).
* This operation does not have any effect on partition assignments. It is supported
* only on Kafka clusters which use Raft to store metadata, rather than ZooKeeper.
*
* @param brokerId the broker id to unregister.
* This is a convenience method for {@link #unregisterBroker(int, UnregisterBrokerOptions)}
*
* <p>This is a convenience method for {@link #decommissionBroker(int, DecommissionBrokerOptions)}
* @param brokerId the broker id to unregister.
*
* @return the {@link DecommissionBrokerResult} containing the result
* @return the {@link UnregisterBrokerResult} containing the result
*/
default DecommissionBrokerResult decommissionBroker(int brokerId) {
return decommissionBroker(brokerId, new DecommissionBrokerOptions());
@InterfaceStability.Unstable
default UnregisterBrokerResult unregisterBroker(int brokerId) {
return unregisterBroker(brokerId, new UnregisterBrokerOptions());
}

/**
* Permanently remove a broker and reassign any partitions on the broker.
* Unregister a broker.
* <p>
* This operation is supported only on self-managed Kafka clusters (i.e. brokers which do not rely on Zookeeper).
* This operation does not have any effect on partition assignments. It is supported
* only on Kafka clusters which use Raft to store metadata, rather than ZooKeeper.
*
* The following exceptions can be anticipated when calling {@code get()} on the future from the
* returned {@link DescribeFeaturesResult}:
* returned {@link UnregisterBrokerResult}:
* <ul>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* If the request timed out before the describe operation could finish.</li>
* <li>{@link org.apache.kafka.common.errors.UnsupportedVersionException}
* If the software is too old to support decommissioning.
* If the software is too old to support the unregistration API, or if the
* cluster is not using Raft to store metadata.
* </ul>
* <p>
*
* @param brokerId the broker id to unregister.
* @param options the options to use.
*
* @return the {@link DecommissionBrokerResult} containing the result
* @return the {@link UnregisterBrokerResult} containing the result
*/
DecommissionBrokerResult decommissionBroker(int brokerId, DecommissionBrokerOptions options);
@InterfaceStability.Unstable
UnregisterBrokerResult unregisterBroker(int brokerId, UnregisterBrokerOptions options);
Copy link
Contributor

@ijuma ijuma Feb 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe We should mark all KIP-500 methods with @Unstable and mention it in the documentation too.


/**
* Get the metrics kept by the adminClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicConfigs;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.DecommissionBrokerRequestData;
import org.apache.kafka.common.message.DeleteAclsRequestData;
import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
import org.apache.kafka.common.message.DeleteAclsResponseData;
Expand Down Expand Up @@ -147,6 +146,7 @@
import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopic;
import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.message.UnregisterBrokerRequestData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResult;
import org.apache.kafka.common.metrics.JmxReporter;
Expand Down Expand Up @@ -185,8 +185,6 @@
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DecommissionBrokerRequest;
import org.apache.kafka.common.requests.DecommissionBrokerResponse;
import org.apache.kafka.common.requests.DeleteAclsRequest;
import org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
Expand Down Expand Up @@ -238,6 +236,8 @@
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
import org.apache.kafka.common.requests.UnregisterBrokerRequest;
import org.apache.kafka.common.requests.UnregisterBrokerResponse;
import org.apache.kafka.common.requests.UpdateFeaturesRequest;
import org.apache.kafka.common.requests.UpdateFeaturesResponse;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
Expand Down Expand Up @@ -4608,23 +4608,23 @@ void handleFailure(Throwable throwable) {
}

@Override
public DecommissionBrokerResult decommissionBroker(int brokerId, DecommissionBrokerOptions options) {
public UnregisterBrokerResult unregisterBroker(int brokerId, UnregisterBrokerOptions options) {
final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
final Call call = new Call("decommissionBroker", calcDeadlineMs(now, options.timeoutMs()),
final Call call = new Call("unregisterBroker", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {

@Override
DecommissionBrokerRequest.Builder createRequest(int timeoutMs) {
DecommissionBrokerRequestData data =
new DecommissionBrokerRequestData().setBrokerId(brokerId);
return new DecommissionBrokerRequest.Builder(data);
UnregisterBrokerRequest.Builder createRequest(int timeoutMs) {
UnregisterBrokerRequestData data =
new UnregisterBrokerRequestData().setBrokerId(brokerId);
return new UnregisterBrokerRequest.Builder(data);
}

@Override
void handleResponse(AbstractResponse abstractResponse) {
final DecommissionBrokerResponse response =
(DecommissionBrokerResponse) abstractResponse;
final UnregisterBrokerResponse response =
(UnregisterBrokerResponse) abstractResponse;
Errors error = Errors.forCode(response.data().errorCode());
switch (error) {
case NONE:
Expand All @@ -4633,7 +4633,8 @@ void handleResponse(AbstractResponse abstractResponse) {
case REQUEST_TIMED_OUT:
throw error.exception();
default:
log.error("Decommission Broker request for broker ID {} failed: {}", brokerId, error.message());
log.error("Unregister broker request for broker ID {} failed: {}",
brokerId, error.message());
future.completeExceptionally(error.exception());
break;
}
Expand All @@ -4645,7 +4646,7 @@ void handleFailure(Throwable throwable) {
}
};
runnable.call(call, now);
return new DecommissionBrokerResult(future);
return new UnregisterBrokerResult(future);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import org.apache.kafka.common.annotation.InterfaceStability;

/**
* Options for {@link Admin#decommissionBroker(int, DecommissionBrokerOptions)}.
* Options for {@link Admin#unregisterBroker(int, UnregisterBrokerOptions)}.
*
* The API of this class is evolving. See {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DecommissionBrokerOptions extends AbstractOptions<UpdateFeaturesOptions> {
public class UnregisterBrokerOptions extends AbstractOptions<UpdateFeaturesOptions> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
import org.apache.kafka.common.KafkaFuture;

/**
* The result of the {@link Admin#decommissionBroker(int, DecommissionBrokerOptions)} call.
* The result of the {@link Admin#unregisterBroker(int, UnregisterBrokerOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
public class DecommissionBrokerResult {
public class UnregisterBrokerResult {
private final KafkaFuture<Void> future;

DecommissionBrokerResult(final KafkaFuture<Void> future) {
UnregisterBrokerResult(final KafkaFuture<Void> future) {
this.future = future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,7 @@ public enum ApiKeys {
DESCRIBE_PRODUCERS(ApiMessageType.DESCRIBE_PRODUCERS),
BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, RecordBatch.MAGIC_VALUE_V0, false, true),
BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false, true),

// Once we have the controller integration for supporting broker decommissioning, we will support forwarding from the broker
// This is a short-term workaround to avoid advertizing the API on Zookeeper-based brokers
DECOMMISSION_BROKER(ApiMessageType.DECOMMISSION_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true, true);
UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true, false);

// The generator ensures every `ApiMessageType` has a unique id
private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ public enum Errors {
"Requested position is not greater than or equal to zero, and less than the size of the snapshot.",
PositionOutOfRangeException::new),
UNKNOWN_TOPIC_ID(100, "This server does not host this topic ID.", UnknownTopicIdException::new),
DUPLICATE_BROKER_REGISTRATION_EXCEPTION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new);
DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return BrokerRegistrationRequest.parse(buffer, apiVersion);
case BROKER_HEARTBEAT:
return BrokerHeartbeatRequest.parse(buffer, apiVersion);
case DECOMMISSION_BROKER:
return DecommissionBrokerRequest.parse(buffer, apiVersion);
case UNREGISTER_BROKER:
return UnregisterBrokerRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return BrokerRegistrationResponse.parse(responseBuffer, version);
case BROKER_HEARTBEAT:
return BrokerHeartbeatResponse.parse(responseBuffer, version);
case DECOMMISSION_BROKER:
return DecommissionBrokerResponse.parse(responseBuffer, version);
case UNREGISTER_BROKER:
return UnregisterBrokerResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,52 @@
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.DecommissionBrokerRequestData;
import org.apache.kafka.common.message.DecommissionBrokerResponseData;
import org.apache.kafka.common.message.UnregisterBrokerRequestData;
import org.apache.kafka.common.message.UnregisterBrokerResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;

public class DecommissionBrokerRequest extends AbstractRequest {
public class UnregisterBrokerRequest extends AbstractRequest {

public static class Builder extends AbstractRequest.Builder<DecommissionBrokerRequest> {
private final DecommissionBrokerRequestData data;
public static class Builder extends AbstractRequest.Builder<UnregisterBrokerRequest> {
private final UnregisterBrokerRequestData data;

public Builder(DecommissionBrokerRequestData data) {
super(ApiKeys.DECOMMISSION_BROKER);
public Builder(UnregisterBrokerRequestData data) {
super(ApiKeys.UNREGISTER_BROKER);
this.data = data;
}

@Override
public DecommissionBrokerRequest build(short version) {
return new DecommissionBrokerRequest(data, version);
public UnregisterBrokerRequest build(short version) {
return new UnregisterBrokerRequest(data, version);
}
}

private final DecommissionBrokerRequestData data;
private final UnregisterBrokerRequestData data;

public DecommissionBrokerRequest(DecommissionBrokerRequestData data, short version) {
super(ApiKeys.DECOMMISSION_BROKER, version);
public UnregisterBrokerRequest(UnregisterBrokerRequestData data, short version) {
super(ApiKeys.UNREGISTER_BROKER, version);
this.data = data;
}

@Override
public DecommissionBrokerRequestData data() {
public UnregisterBrokerRequestData data() {
return data;
}

@Override
public DecommissionBrokerResponse getErrorResponse(int throttleTimeMs, Throwable e) {
public UnregisterBrokerResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
return new DecommissionBrokerResponse(new DecommissionBrokerResponseData()
return new UnregisterBrokerResponse(new UnregisterBrokerResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(error.code()));
}

public static DecommissionBrokerRequest parse(ByteBuffer buffer, short version) {
return new DecommissionBrokerRequest(new DecommissionBrokerRequestData(new ByteBufferAccessor(buffer), version),
public static UnregisterBrokerRequest parse(ByteBuffer buffer, short version) {
return new UnregisterBrokerRequest(new UnregisterBrokerRequestData(new ByteBufferAccessor(buffer), version),
version);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.DecommissionBrokerResponseData;
import org.apache.kafka.common.message.UnregisterBrokerResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
Expand All @@ -26,16 +26,16 @@
import java.util.HashMap;
import java.util.Map;

public class DecommissionBrokerResponse extends AbstractResponse {
private final DecommissionBrokerResponseData data;
public class UnregisterBrokerResponse extends AbstractResponse {
private final UnregisterBrokerResponseData data;

public DecommissionBrokerResponse(DecommissionBrokerResponseData data) {
super(ApiKeys.DECOMMISSION_BROKER);
public UnregisterBrokerResponse(UnregisterBrokerResponseData data) {
super(ApiKeys.UNREGISTER_BROKER);
this.data = data;
}

@Override
public DecommissionBrokerResponseData data() {
public UnregisterBrokerResponseData data() {
return data;
}

Expand All @@ -53,8 +53,8 @@ public Map<Errors, Integer> errorCounts() {
return errorCounts;
}

public static DecommissionBrokerResponse parse(ByteBuffer buffer, short version) {
return new DecommissionBrokerResponse(new DecommissionBrokerResponseData(new ByteBufferAccessor(buffer), version));
public static UnregisterBrokerResponse parse(ByteBuffer buffer, short version) {
return new UnregisterBrokerResponse(new UnregisterBrokerResponseData(new ByteBufferAccessor(buffer), version));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
{
"apiKey": 64,
"type": "request",
"name": "DecommissionBrokerRequest",
"name": "UnregisterBrokerRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+",
"about": "The broker ID to decommission" }
"about": "The broker ID to unregister." }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"apiKey": 64,
"type": "response",
"name": "DecommissionBrokerResponse",
"name": "UnregisterBrokerResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
Expand Down
Loading