Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: add the MetaLogListener, LocalLogManager, and Controller interface. #10106

Merged
merged 3 commits into from
Feb 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,27 @@
</subpackage>
</subpackage>

<subpackage name="controller">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.feature" />
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
</subpackage>

<subpackage name="metadata">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.annotation" />
Expand All @@ -201,6 +222,15 @@
<allow pkg="org.apache.kafka.test" />
</subpackage>

<subpackage name="metalog">
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.test" />
</subpackage>

<subpackage name="clients">
<allow pkg="org.slf4j" />
<allow pkg="org.apache.kafka.common" />
Expand Down
180 changes: 180 additions & 0 deletions metadata/src/main/java/org/apache/kafka/controller/Controller.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.controller;

import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;


public interface Controller extends AutoCloseable {
/**
* Change the in-sync replica sets for some partitions.
*
* @param request The AlterIsrRequest data.
*
* @return A future yielding the response.
*/
CompletableFuture<AlterIsrResponseData> alterIsr(AlterIsrRequestData request);

/**
* Create a batch of topics.
*
* @param request The CreateTopicsRequest data.
*
* @return A future yielding the response.
*/
CompletableFuture<CreateTopicsResponseData>
createTopics(CreateTopicsRequestData request);

/**
* Unregister a broker.
*
* @param brokerId The broker id to unregister.
*
* @return A future that is completed successfully when the broker is
* unregistered.
*/
CompletableFuture<Void> unregisterBroker(int brokerId);

/**
* Describe the current configuration of various resources.
*
* @param resources A map from resources to the collection of config keys that we
* want to describe for each. If the collection is empty, then
* all configuration keys will be described.
*
* @return
*/
CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>>
describeConfigs(Map<ConfigResource, Collection<String>> resources);

/**
* Elect new partition leaders.
*
* @param request The request.
*
* @return A future yielding the elect leaders response.
*/
CompletableFuture<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request);

/**
* Get the current finalized feature ranges for each feature.
*
* @return A future yielding the feature ranges.
*/
CompletableFuture<FeatureMapAndEpoch> finalizedFeatures();

/**
* Perform some incremental configuration changes.
*
* @param configChanges The changes.
* @param validateOnly True if we should validate the changes but not apply them.
*
* @return A future yielding a map from partitions to error results.
*/
CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges,
boolean validateOnly);

/**
* Perform some configuration changes using the legacy API.
*
* @param newConfigs The new configuration maps to apply.
* @param validateOnly True if we should validate the changes but not apply them.
*
* @return A future yielding a map from partitions to error results.
*/
CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly);

/**
* Process a heartbeat from a broker.
*
* @param request The broker heartbeat request.
*
* @return A future yielding a heartbeat reply.
*/
CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat(
BrokerHeartbeatRequestData request);

/**
* Attempt to register the given broker.
*
* @param request The registration request.
*
* @return A future yielding a registration reply.
*/
CompletableFuture<BrokerRegistrationReply> registerBroker(
BrokerRegistrationRequestData request);

/**
* Wait for the given number of brokers to be registered and unfenced.
* This is for testing.
*
* @param minBrokers The minimum number of brokers to wait for.
* @return A future which is completed when the given number of brokers
* is reached.
*/
CompletableFuture<Void> waitForReadyBrokers(int minBrokers);

/**
* Perform some client quota changes
*
* @param quotaAlterations The list of quotas to alter
* @param validateOnly True if we should validate the changes but not apply them.
* @return A future yielding a map of quota entities to error results.
*/
CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly
);

/**
* Begin shutting down, but don't block. You must still call close to clean up all
* resources.
*/
void beginShutdown();

/**
* If this controller is active, this is the non-negative controller epoch.
* Otherwise, this is -1.
*/
long curClaimEpoch();

/**
* Blocks until we have shut down and freed all resources.
*/
void close() throws InterruptedException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.controller;

import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;

import java.util.Objects;


class ResultOrError<T> {
mumrah marked this conversation as resolved.
Show resolved Hide resolved
private final ApiError error;
private final T result;

public ResultOrError(Errors error, String message) {
this(new ApiError(error, message));
}

public ResultOrError(ApiError error) {
Objects.requireNonNull(error);
this.error = error;
this.result = null;
}

public ResultOrError(T result) {
this.error = null;
this.result = result;
}

public boolean isError() {
return error != null;
}

public boolean isResult() {
return error == null;
}

public ApiError error() {
return error;
}

public T result() {
return result;
}

@Override
public boolean equals(Object o) {
if (o == null || (!o.getClass().equals(getClass()))) {
return false;
}
ResultOrError other = (ResultOrError) o;
return error.equals(other.error) &&
Objects.equals(result, other.result);
}

@Override
public int hashCode() {
return Objects.hash(error, result);
}

@Override
public String toString() {
if (error.isSuccess()) {
return "ResultOrError(" + result + ")";
} else {
return "ResultOrError(" + error + ")";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.metadata;

import java.util.Objects;


public class BrokerHeartbeatReply {
/**
* True if the heartbeat reply should tell the broker that it has caught up.
*/
private final boolean isCaughtUp;

/**
* True if the heartbeat reply should tell the broker that it is fenced.
*/
private final boolean isFenced;

/**
* True if the heartbeat reply should tell the broker that it should shut down.
*/
private final boolean shouldShutDown;

public BrokerHeartbeatReply(boolean isCaughtUp,
boolean isFenced,
boolean shouldShutDown) {
this.isCaughtUp = isCaughtUp;
this.isFenced = isFenced;
this.shouldShutDown = shouldShutDown;
}

public boolean isCaughtUp() {
return isCaughtUp;
}

public boolean isFenced() {
return isFenced;
}

public boolean shouldShutDown() {
return shouldShutDown;
}

@Override
public int hashCode() {
return Objects.hash(isCaughtUp, isFenced, shouldShutDown);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof BrokerHeartbeatReply)) return false;
BrokerHeartbeatReply other = (BrokerHeartbeatReply) o;
return other.isCaughtUp == isCaughtUp &&
other.isFenced == isFenced &&
other.shouldShutDown == shouldShutDown;
}

@Override
public String toString() {
return "BrokerHeartbeatReply(isCaughtUp=" + isCaughtUp +
", isFenced=" + isFenced +
", shouldShutDown = " + shouldShutDown +
")";
}
}
Loading