Skip to content

Commit

Permalink
Merge branch 'implement_kusto_query_object' into azure-identity
Browse files Browse the repository at this point in the history
  • Loading branch information
AsafMah committed Sep 29, 2024
2 parents e8c8884 + 7cf2501 commit 1f9c569
Show file tree
Hide file tree
Showing 18 changed files with 154 additions and 223 deletions.
30 changes: 15 additions & 15 deletions data/src/main/java/com/microsoft/azure/kusto/data/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@ protected String post(HttpRequest request) throws DataServiceException {
}
}

protected Mono<String> postAsync(HttpRequest request) {
// Execute and get the response
return httpClient.send(request)
.onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err))
.flatMap(this::processResponseAsync)
.onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err));
}

public Mono<String> processResponseAsync(HttpResponse response) {
try {
return Mono.just(Objects.requireNonNull(processResponseBody(response)));
} catch (Exception e) {
return Mono.error(new RuntimeException("Error processing response", e));
}
}
// protected Mono<String> postAsync(HttpRequest request) {
// // Execute and get the response
// return httpClient.send(request)
// .onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err))
// .flatMap(this::processResponseAsync)
// .onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err));
// }
//
// public Mono<String> processResponseAsync(HttpResponse response) {
// try {
// return Mono.just(Objects.requireNonNull(processResponseBody(response)));
// } catch (Exception e) {
// return Mono.error(new RuntimeException("Error processing response", e));
// }
// }

private String processResponseBody(HttpResponse response) throws DataServiceException {
String responseBody = Utils.isGzipResponse(response) ? Utils.gzipedInputToString(response.getBodyAsBinaryData().toStream())
Expand Down
10 changes: 5 additions & 5 deletions data/src/main/java/com/microsoft/azure/kusto/data/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

public interface Client {

Mono<KustoOperationResult> executeQueryAsync(String database, String command, ClientRequestProperties properties);

Mono<KustoOperationResult> executeMgmtAsync(String database, String command, ClientRequestProperties properties);

Mono<String> executeToJsonAsync(String database, String command, ClientRequestProperties properties);
// Mono<KustoOperationResult> executeQueryAsync(String database, String command, ClientRequestProperties properties);
//
// Mono<KustoOperationResult> executeMgmtAsync(String database, String command, ClientRequestProperties properties);
//
// Mono<String> executeToJsonAsync(String database, String command, ClientRequestProperties properties);

@Deprecated
KustoOperationResult executeQuery(String command) throws DataServiceException, DataClientException;
Expand Down
180 changes: 74 additions & 106 deletions data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@
import com.microsoft.azure.kusto.data.res.JsonResult;
import org.apache.commons.lang3.StringUtils;

import org.apache.http.client.utils.URIBuilder;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;

Expand All @@ -37,7 +35,6 @@ class ClientImpl extends BaseClient {
public static final String STREAMING_VERSION = "v1";
private static final String DEFAULT_DATABASE_NAME = "NetDefaultDb";

public static final String FEDERATED_SECURITY_SUFFIX = ";fed=true";
private final TokenProviderBase aadAuthenticationHelper;

private final String clusterUrl;
Expand All @@ -54,97 +51,76 @@ public ClientImpl(ConnectionStringBuilder csb, HttpClientProperties properties)

public ClientImpl(ConnectionStringBuilder csb, HttpClient httpClient) throws URISyntaxException {
super(httpClient);

URI clusterUrlForParsing = new URI(csb.getClusterUrl());
String host = clusterUrlForParsing.getHost();
Objects.requireNonNull(clusterUrlForParsing.getAuthority(), "clusterUri must have uri authority component");
String auth = clusterUrlForParsing.getAuthority().toLowerCase();
if (host == null) {
host = StringUtils.removeEndIgnoreCase(auth, FEDERATED_SECURITY_SUFFIX);
}
URIBuilder uriBuilder = new URIBuilder()
.setScheme(clusterUrlForParsing.getScheme())
.setHost(host);
String path = clusterUrlForParsing.getPath();
if (path != null && !path.isEmpty()) {
path = StringUtils.removeEndIgnoreCase(path, FEDERATED_SECURITY_SUFFIX);
path = StringUtils.removeEndIgnoreCase(path, "/");

uriBuilder.setPath(path);
}

if (clusterUrlForParsing.getPort() != -1) {
uriBuilder.setPort(clusterUrlForParsing.getPort());
}
csb.setClusterUrl(uriBuilder.build().toString());
String clusterURL = UriUtils.createClusterURLFrom(csb.getClusterUrl());
csb.setClusterUrl(clusterURL);

clusterUrl = csb.getClusterUrl();
aadAuthenticationHelper = clusterUrl.toLowerCase().startsWith(CloudInfo.LOCALHOST) ? null : TokenProviderFactory.createTokenProvider(csb, httpClient);
clientDetails = new ClientDetails(csb.getApplicationNameForTracing(), csb.getUserNameForTracing(), csb.getClientVersionForTracing());
}

@Override
public Mono<KustoOperationResult> executeQueryAsync(String database, String command, ClientRequestProperties properties) {
KustoRequest kr = new KustoRequest(command, database, properties);
return executeQueryAsync(kr);
}

Mono<KustoOperationResult> executeQueryAsync(@NotNull KustoRequest kr) {
if (kr.getCommandType() != CommandType.QUERY) {
kr.setCommandType(CommandType.QUERY);
}
return executeAsync(kr);
}

@Override
public Mono<KustoOperationResult> executeMgmtAsync(String database, String command, ClientRequestProperties properties) {
KustoRequest kr = new KustoRequest(command, database, properties);
return executeMgmtAsync(kr);
}

public Mono<KustoOperationResult> executeMgmtAsync(@NotNull KustoRequest kr) {
if (kr.getCommandType() != CommandType.ADMIN_COMMAND) {
kr.setCommandType(CommandType.ADMIN_COMMAND);
}
return executeAsync(kr);
}

private Mono<KustoOperationResult> executeAsync(KustoRequest kr) {

Mono<String> resultMono = executeToJsonAsync(kr)
.onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err));
Mono<String> endpointMono = Mono.just(String.format(kr.getCommandType().getEndpoint(), clusterUrl))
.onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err));

return Mono.zip(resultMono, endpointMono)
.onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err))
.map(tuple2 -> new JsonResult(tuple2.getT1(), tuple2.getT2()))
.onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err))
.flatMap(this::processJsonResultAsync)
.onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err));
}

public Mono<KustoOperationResult> processJsonResultAsync(JsonResult res) {
try {
return Mono.just(processJsonResult(res));
} catch (Exception e) {
return Mono.error(new RuntimeException("Error processing json result", e));
}
}

public Mono<String> executeToJsonAsync(String database, String command, ClientRequestProperties properties) {
KustoRequest kr = new KustoRequest(command, database, properties);
return executeToJsonAsync(kr)
.onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err));
}

Mono<String> executeToJsonAsync(KustoRequest kr) {
return just(kr)
.flatMap(this::prepareRequestAsync)
.onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err))
.flatMap(this::processRequestAsync)
.onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err));
}
// @Override
// public Mono<KustoOperationResult> executeQueryAsync(String database, String command, ClientRequestProperties properties) {
// KustoRequest kr = new KustoRequest(command, database, properties);
// return executeQueryAsync(kr);
// }
//
// Mono<KustoOperationResult> executeQueryAsync(@NotNull KustoRequest kr) {
// if (kr.getCommandType() != CommandType.QUERY) {
// kr.setCommandType(CommandType.QUERY);
// }
// return executeAsync(kr);
// }
//
// @Override
// public Mono<KustoOperationResult> executeMgmtAsync(String database, String command, ClientRequestProperties properties) {
// KustoRequest kr = new KustoRequest(command, database, properties);
// return executeMgmtAsync(kr);
// }
//
// public Mono<KustoOperationResult> executeMgmtAsync(@NotNull KustoRequest kr) {
// if (kr.getCommandType() != CommandType.ADMIN_COMMAND) {
// kr.setCommandType(CommandType.ADMIN_COMMAND);
// }
// return executeAsync(kr);
// }
//
// private Mono<KustoOperationResult> executeAsync(KustoRequest kr) {
//
// Mono<String> resultMono = executeToJsonAsync(kr)
// .onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err));
// Mono<String> endpointMono = Mono.just(String.format(kr.getCommandType().getEndpoint(), clusterUrl))
// .onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err));
//
// return Mono.zip(resultMono, endpointMono)
// .onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err))
// .map(tuple2 -> new JsonResult(tuple2.getT1(), tuple2.getT2()))
// .onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err))
// .flatMap(this::processJsonResultAsync)
// .onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err));
// }
//
// public Mono<KustoOperationResult> processJsonResultAsync(JsonResult res) {
// try {
// return Mono.just(processJsonResult(res));
// } catch (Exception e) {
// return Mono.error(new RuntimeException("Error processing json result", e));
// }
// }
//
// public Mono<String> executeToJsonAsync(String database, String command, ClientRequestProperties properties) {
// KustoRequest kr = new KustoRequest(command, database, properties);
// return executeToJsonAsync(kr)
// .onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err));
// }
//
// Mono<String> executeToJsonAsync(KustoRequest kr) {
// return just(kr)
// .flatMap(this::prepareRequestAsync)
// .onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err))
// .flatMap(this::processRequestAsync)
// .onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err));
// }

@Override
public KustoOperationResult executeQuery(String command) throws DataServiceException, DataClientException {
Expand Down Expand Up @@ -281,22 +257,18 @@ private String executeToJsonResult(KustoRequest kr) throws DataServiceException,
request.getSdkRequest().getCommandType().getActivityTypeSuffix().concat(".executeToJsonResult"));
}

public Mono<String> processRequestAsync(KustoRequestContext request) {
return MonitoredActivity.invoke(
(SupplierNoException<Mono<String>>) () -> postAsync(request.getHttpRequest())
.onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err)),
request.getSdkRequest().getCommandType().getActivityTypeSuffix().concat(".executeToJsonResult"));
}
// public Mono<String> processRequestAsync(KustoRequestContext request) {
// return MonitoredActivity.invoke(
// (SupplierNoException<Mono<String>>) () -> postAsync(request.getHttpRequest())
// .onErrorContinue((err, src) -> LOGGER.error("Error coming from src {}", src, err)),
// request.getSdkRequest().getCommandType().getActivityTypeSuffix().concat(".executeToJsonResult"));
// }

private void validateEndpoint() throws DataServiceException, DataClientException {
try {
if (!endpointValidated) {
KustoTrustedEndpoints.validateTrustedEndpoint(clusterUrl,
CloudInfo.retrieveCloudInfoForCluster(clusterUrl).getLoginEndpoint());
endpointValidated = true;
}
} catch (KustoClientInvalidConnectionStringException e) {
throw new DataClientException(clusterUrl, e.getMessage(), e);
if (!endpointValidated) {
KustoTrustedEndpoints.validateTrustedEndpoint(clusterUrl,
CloudInfo.retrieveCloudInfoForCluster(clusterUrl).getLoginEndpoint());
endpointValidated = true;
}
}

Expand Down Expand Up @@ -477,8 +449,4 @@ ClientDetails getClientDetails() {
return clientDetails;
}

@Override
public void close() throws IOException {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public Map<String, String> getTracingAttributes() {
}

/**
* Gets the amount of time a query may execute on the service before it times out, formatted as a krL timespan.
* Gets the amount of time a query may execute on the service before it times out, formatted as a KQL timespan.
* @param timeoutObj amount of time before timeout, which may be a Long, String or Integer.
* Value must be between 1 minute and 1 hour, and so value below the minimum or above the maximum will be adjusted accordingly.
*/
Expand All @@ -340,7 +340,7 @@ String getTimeoutAsCslTimespan(Object timeoutObj) {
}

/**
* Gets the amount of time a query may execute on the service before it times out, formatted as a krL timespan.
* Gets the amount of time a query may execute on the service before it times out, formatted as a KQL timespan.
* Value must be between 1 minute and 1 hour, and so if the value had been set below the minimum or above the maximum, the value returned will be adjusted accordingly.
*/
String getTimeoutAsCslTimespan() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;

import java.io.Closeable;
import java.io.InputStream;

public interface StreamingClient extends Closeable {
public interface StreamingClient {
/**
* <p>Ingest data from a given stream directly into Kusto database.</p>
* This method ingests the data from a given stream directly into Kusto database, using streaming ingestion endpoint,
Expand Down
30 changes: 30 additions & 0 deletions data/src/main/java/com/microsoft/azure/kusto/data/UriUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,44 @@
import org.apache.http.client.utils.URIBuilder;

import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.Objects;

public class UriUtils {

public static final String FEDERATED_SECURITY_SUFFIX = ";fed=true";

private UriUtils() {
// Providing hidden constructor to hide default public constructor in utils class
}

public static String createClusterURLFrom(final String clusterURI) throws URISyntaxException {
URI clusterUrlForParsing = new URI(clusterURI);
String host = clusterUrlForParsing.getHost();
Objects.requireNonNull(clusterUrlForParsing.getAuthority(), "clusterUri must have uri authority component");
String auth = clusterUrlForParsing.getAuthority().toLowerCase();
if (host == null) {
host = StringUtils.removeEndIgnoreCase(auth, FEDERATED_SECURITY_SUFFIX);
}
URIBuilder uriBuilder = new URIBuilder()
.setScheme(clusterUrlForParsing.getScheme())
.setHost(host);
String path = clusterUrlForParsing.getPath();
if (path != null && !path.isEmpty()) {
path = StringUtils.removeEndIgnoreCase(path, FEDERATED_SECURITY_SUFFIX);
path = StringUtils.removeEndIgnoreCase(path, "/");

uriBuilder.setPath(path);
}

if (clusterUrlForParsing.getPort() != -1) {
uriBuilder.setPort(clusterUrlForParsing.getPort());
}
return uriBuilder.build().toString();
}

public static String setPathForUri(String uri, String path, boolean ensureTrailingSlash) throws URISyntaxException {
path = StringUtils.prependIfMissing(path, "/");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,11 @@ private static CloudInfo fetchImpl(String clusterUrl, @Nullable HttpClient given
HttpClient localHttpClient = givenHttpClient == null ? HttpClientFactory.create(null) : givenHttpClient;
try {
HttpRequest request = new HttpRequest(HttpMethod.GET, UriUtils.appendPathToUri(clusterUrl, METADATA_ENDPOINT));
request.setHeader(HttpHeaderName.ACCEPT_ENCODING, "gzip");
request.setHeader(HttpHeaderName.ACCEPT_ENCODING, "gzip,deflate");
request.setHeader(HttpHeaderName.ACCEPT, "application/json");

// trace CloudInfo.httpCall
// Fixme: Make this async in the future
try (HttpResponse response = MonitoredActivity.invoke(
(SupplierOneException<HttpResponse, IOException>) () -> localHttpClient.sendSync(request, Context.NONE),
"CloudInfo.httpCall")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static void validateTrustedEndpoint(String uri, String loginEndpoint) thr
try {
validateTrustedEndpoint(new URI(uri), loginEndpoint);
} catch (URISyntaxException ex) {
throw new KustoClientInvalidConnectionStringException(ex);
throw new KustoClientInvalidConnectionStringException(uri, ex.getMessage(), ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@
This class represents an error that happened on the client side and is therefore considered permanent
*/
public class DataClientException extends KustoDataExceptionBase {
public DataClientException(Exception ex) {
this(ex.getMessage());
}

public DataClientException(String message) {
this(null, message);
}

public DataClientException(String ingestionSource, String message) {
this(ingestionSource, message, null);
}
Expand Down
Loading

0 comments on commit 1f9c569

Please sign in to comment.