Skip to content

Commit

Permalink
replace HTTP_CODE
Browse files Browse the repository at this point in the history
  • Loading branch information
imbajin committed Nov 22, 2023
1 parent 06f1130 commit 7e8e021
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public final class Constants {
public static final int FUTURE_TIMEOUT = 300;

/*
* The timeout in millisecond for threadpool shutdown
* The timeout in millisecond for thread-pool shutdown
*/
public static final long SHUTDOWN_TIMEOUT = 5000L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -36,7 +37,6 @@

import org.apache.commons.configuration2.MapConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpStatus;
import org.apache.hugegraph.computer.k8s.Constants;
import org.apache.hugegraph.computer.k8s.operator.common.AbstractController;
import org.apache.hugegraph.computer.k8s.operator.config.OperatorOptions;
Expand All @@ -61,6 +61,11 @@
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.fabric8.kubernetes.client.utils.Utils;

/**
* The OperatorEntrypoint class is the main entry point for the Kubernetes operator.
* It sets up the Kubernetes client, registers controllers, and starts the HTTP server for
* health checks.
*/
public class OperatorEntrypoint {

private static final Logger LOG = Log.logger(OperatorEntrypoint.class);
Expand All @@ -74,15 +79,13 @@ public class OperatorEntrypoint {

public static void main(String[] args) {
OperatorEntrypoint operatorEntrypoint = new OperatorEntrypoint();
Runtime.getRuntime().addShutdownHook(
new Thread(operatorEntrypoint::shutdown));
Runtime.getRuntime().addShutdownHook(new Thread(operatorEntrypoint::shutdown));
operatorEntrypoint.start();
}

static {
OptionSpace.register(
"computer-k8s-operator",
"org.apache.hugegraph.computer.k8s.operator.config.OperatorOptions"
OptionSpace.register("computer-k8s-operator",
"org.apache.hugegraph.computer.k8s.operator.config.OperatorOptions"
);
}

Expand All @@ -98,8 +101,7 @@ public OperatorEntrypoint() {
public void start() {
try {
this.kubeClient = new DefaultKubernetesClient();
String watchNamespace = this.config.get(
OperatorOptions.WATCH_NAMESPACE);
String watchNamespace = this.config.get(OperatorOptions.WATCH_NAMESPACE);
if (!Objects.equals(watchNamespace, Constants.ALL_NAMESPACE)) {
this.createNamespace(watchNamespace);
this.kubeClient = this.kubeClient.inNamespace(watchNamespace);
Expand All @@ -108,19 +110,17 @@ public void start() {
LOG.info("Watch namespace: " + watchNamespace);

this.addHealthCheck();

this.registerControllers();

this.informerFactory.startAllRegisteredInformers();
this.informerFactory.addSharedInformerEventListener(exception -> {
LOG.error("Informer event listener exception occurred",
exception);
LOG.error("Informer event listener exception occurred", exception);
OperatorEntrypoint.this.shutdown();
});

// Start all controller
this.controllerPool = ExecutorUtil.newFixedThreadPool(
this.controllers.size(), "controllers-%d");
// Start all controllers
this.controllerPool = ExecutorUtil.newFixedThreadPool(this.controllers.size(),
"controllers-%d");
CountDownLatch latch = new CountDownLatch(this.controllers.size());
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (AbstractController<?> controller : this.controllers) {
Expand All @@ -141,8 +141,7 @@ public void start() {
}
});

CompletableFuture.anyOf(futures.toArray(new CompletableFuture[]{}))
.get();
CompletableFuture.anyOf(futures.toArray(new CompletableFuture[]{})).get();
} catch (Throwable throwable) {
LOG.error("Failed to start Operator: ", throwable);
} finally {
Expand Down Expand Up @@ -201,16 +200,14 @@ private HugeConfig configFromSysPropsOrEnvVars() {
}

private void registerControllers() {
ComputerJobController jobController = new ComputerJobController(
this.config, this.kubeClient);
this.registerController(jobController,
ConfigMap.class, Job.class, Pod.class);
ComputerJobController jobController = new ComputerJobController(this.config,
this.kubeClient);
this.registerController(jobController, ConfigMap.class, Job.class, Pod.class);
}

@SafeVarargs
private final void registerController(
AbstractController<?> controller,
Class<? extends HasMetadata>... ownsClass) {
private void registerController(AbstractController<?> controller,
Class<? extends HasMetadata>... ownsClass) {
controller.register(this.informerFactory, ownsClass);
this.controllers.add(controller);
}
Expand All @@ -222,7 +219,7 @@ private void addHealthCheck() throws IOException {
this.httpServer = HttpServer.create(address, probeBacklog);
this.httpServer.createContext("/health", httpExchange -> {
byte[] bytes = "ALL GOOD!".getBytes(StandardCharsets.UTF_8);
httpExchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length);
httpExchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, bytes.length);
OutputStream responseBody = httpExchange.getResponseBody();
responseBody.write(bytes);
responseBody.close();
Expand All @@ -233,7 +230,7 @@ private void addHealthCheck() throws IOException {
private void addReadyCheck() {
this.httpServer.createContext("/ready", httpExchange -> {
byte[] bytes = "ALL Ready!".getBytes(StandardCharsets.UTF_8);
httpExchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length);
httpExchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, bytes.length);
OutputStream responseBody = httpExchange.getResponseBody();
responseBody.write(bytes);
responseBody.close();
Expand All @@ -245,8 +242,7 @@ private void createNamespace(String namespace) {
.withName(namespace)
.endMetadata();
KubeUtil.ignoreExists(() -> {
return this.kubeClient.namespaces()
.create(builder.build());
return this.kubeClient.namespaces().create(builder.build());
});
}
}

0 comments on commit 7e8e021

Please sign in to comment.