Skip to content

Commit

Permalink
forward warning headers to JDBC driver
Browse files Browse the repository at this point in the history
  • Loading branch information
Luegg committed Mar 1, 2022
1 parent beb7c9e commit 990cbcb
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ default int columnSize() {
int batchSize();

void close() throws SQLException;

List<String> warnings();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,24 @@ class DefaultCursor implements Cursor {

private final List<JdbcColumnInfo> columnInfos;
private List<List<Object>> rows;
private final List<String> warnings;
private int row = -1;
private String cursor;

DefaultCursor(JdbcHttpClient client, String cursor, List<JdbcColumnInfo> columnInfos, List<List<Object>> rows, RequestMeta meta) {
DefaultCursor(
JdbcHttpClient client,
String cursor,
List<JdbcColumnInfo> columnInfos,
List<List<Object>> rows,
RequestMeta meta,
List<String> warnings
) {
this.client = client;
this.meta = meta;
this.cursor = cursor;
this.columnInfos = columnInfos;
this.rows = rows;
this.warnings = warnings;
}

@Override
Expand Down Expand Up @@ -67,4 +76,8 @@ public void close() throws SQLException {
client.queryClose(cursor);
}
}

public List<String> warnings() {
return warnings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.sql.RowIdLifetime;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static java.sql.JDBCType.BIGINT;
Expand Down Expand Up @@ -1420,5 +1421,10 @@ public int batchSize() {
public void close() throws SQLException {
// this cursor doesn't hold any resource - no need to clean up
}

@Override
public List<String> warnings() {
return Collections.emptyList();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,15 @@ Cursor query(String sql, List<SqlTypedParamValue> params, RequestMeta meta) thro
conCfg.indexIncludeFrozen(),
conCfg.binaryCommunication()
);
SqlQueryResponse response = httpClient.query(sqlRequest);
return new DefaultCursor(this, response.cursor(), toJdbcColumnInfo(response.columns()), response.rows(), meta);
Tuple<SqlQueryResponse, List<String>> response = httpClient.query(sqlRequest);
return new DefaultCursor(
this,
response.v1().cursor(),
toJdbcColumnInfo(response.v1().columns()),
response.v1().rows(),
meta,
response.v2()
);
}

/**
Expand All @@ -91,7 +98,7 @@ Tuple<String, List<List<Object>>> nextPage(String cursor, RequestMeta meta) thro
new RequestInfo(Mode.JDBC),
conCfg.binaryCommunication()
);
SqlQueryResponse response = httpClient.query(sqlRequest);
SqlQueryResponse response = httpClient.query(sqlRequest).v1();
return new Tuple<>(response.cursor(), response.rows());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,15 @@ public InputStream getBinaryStream(String columnLabel) throws SQLException {
@Override
public SQLWarning getWarnings() throws SQLException {
checkOpen();
return null;
SQLWarning sqlWarning = null;
for (String warning : cursor.warnings()) {
if (sqlWarning == null) {
sqlWarning = new SQLWarning(warning);
} else {
sqlWarning.setNextWarning(new SQLWarning(warning));
}
}
return sqlWarning;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,63 @@

package org.elasticsearch.xpack.sql.qa.jdbc;

import org.elasticsearch.Version;
import org.junit.Before;

import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.Properties;

import static org.elasticsearch.xpack.sql.qa.jdbc.JdbcTestUtils.JDBC_DRIVER_VERSION;
import static org.hamcrest.Matchers.containsString;

public abstract class JdbcWarningsTestCase extends JdbcIntegrationTestCase {

public void testDeprecationWarningsDoNotReachJdbcDriver() throws Exception {
@Before
public void setupData() throws IOException {
index("test_data", b -> b.field("foo", 1));
}

public void testNoWarnings() throws SQLException {
try (Connection connection = esJdbc(); Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SELECT * FROM FROZEN \"test_*\"");
ResultSet rs = statement.executeQuery("SELECT * FROM test_data");
assertNull(rs.getWarnings());
}
}

private static Version WARNING_HANDLING_ADDED_VERSION = Version.V_8_2_0;

public void testSingleDeprecationWarning() throws SQLException {
assumeTrue("Driver does not yet handle deprecation warnings", JDBC_DRIVER_VERSION.onOrAfter(WARNING_HANDLING_ADDED_VERSION));

try (Connection connection = esJdbc(); Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SELECT * FROM FROZEN test_data");
SQLWarning warning = rs.getWarnings();
assertThat(warning.getMessage(), containsString("[FROZEN] syntax is deprecated because frozen indices have been deprecated."));
assertNull(warning.getNextWarning());
}
}

public void testMultipleDeprecationWarnings() throws SQLException {
assumeTrue("Driver does not yet handle deprecation warnings", JDBC_DRIVER_VERSION.onOrAfter(WARNING_HANDLING_ADDED_VERSION));

Properties props = connectionProperties();
props.setProperty("index.include.frozen", "true");

try (Connection connection = esJdbc(props); Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SELECT * FROM FROZEN test_data");
SQLWarning warning = rs.getWarnings();
assertThat(warning.getMessage(), containsString("[FROZEN] syntax is deprecated because frozen indices have been deprecated."));
assertThat(
warning.getNextWarning().getMessage(),
containsString("[index_include_frozen] parameter is deprecated because frozen indices have been deprecated.")
);
assertNull(warning.getNextWarning().getNextWarning());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import java.io.InputStream;
import java.security.PrivilegedAction;
import java.sql.SQLException;
import java.util.function.Function;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static java.util.Collections.emptyList;

Expand Down Expand Up @@ -82,10 +84,10 @@ public SqlQueryResponse basicQuery(String query, int fetchSize, boolean fieldMul
false,
cfg.binaryCommunication()
);
return query(sqlRequest);
return query(sqlRequest).v1();
}

public SqlQueryResponse query(SqlQueryRequest sqlRequest) throws SQLException {
public Tuple<SqlQueryResponse, List<String>> query(SqlQueryRequest sqlRequest) throws SQLException {
return post(CoreProtocol.SQL_QUERY_REST_ENDPOINT, sqlRequest, Payloads::parseQueryResponse);
}

Expand All @@ -98,28 +100,28 @@ public SqlQueryResponse nextPage(String cursor) throws SQLException {
new RequestInfo(Mode.CLI),
cfg.binaryCommunication()
);
return post(CoreProtocol.SQL_QUERY_REST_ENDPOINT, sqlRequest, Payloads::parseQueryResponse);
return post(CoreProtocol.SQL_QUERY_REST_ENDPOINT, sqlRequest, Payloads::parseQueryResponse).v1();
}

public boolean queryClose(String cursor, Mode mode) throws SQLException {
SqlClearCursorResponse response = post(
Tuple<SqlClearCursorResponse, List<String>> response = post(
CoreProtocol.CLEAR_CURSOR_REST_ENDPOINT,
new SqlClearCursorRequest(cursor, new RequestInfo(mode)),
Payloads::parseClearCursorResponse
);
return response.isSucceeded();
return response.v1().isSucceeded();
}

@SuppressWarnings({ "removal" })
private <Request extends AbstractSqlRequest, Response> Response post(
private <Request extends AbstractSqlRequest, Response> Tuple<Response, List<String>> post(
String path,
Request request,
CheckedFunction<JsonParser, Response, IOException> responseParser
) throws SQLException {
byte[] requestBytes = toContent(request);
String query = "error_trace";
Tuple<ContentType, byte[]> response = java.security.AccessController.doPrivileged(
(PrivilegedAction<ResponseOrException<Tuple<ContentType, byte[]>>>) () -> JreHttpUrlConnection.http(
Tuple<Map<String, List<String>>, byte[]> response = java.security.AccessController.doPrivileged(
(PrivilegedAction<ResponseOrException<Tuple<Map<String, List<String>>, byte[]>>>) () -> JreHttpUrlConnection.http(
path,
query,
cfg,
Expand All @@ -131,7 +133,11 @@ private <Request extends AbstractSqlRequest, Response> Response post(
)
)
).getResponseOrThrowException();
return fromContent(response.v1(), response.v2(), responseParser);
List<String> warnings = response.v1().get("Warning");
return new Tuple<>(
fromContent(response.v1(), response.v2(), responseParser),
warnings == null ? Collections.emptyList() : warnings
);
}

@SuppressWarnings({ "removal" })
Expand Down Expand Up @@ -162,8 +168,8 @@ private boolean head(String path, long timeoutInMs) throws SQLException {

@SuppressWarnings({ "removal" })
private <Response> Response get(String path, CheckedFunction<JsonParser, Response, IOException> responseParser) throws SQLException {
Tuple<ContentType, byte[]> response = java.security.AccessController.doPrivileged(
(PrivilegedAction<ResponseOrException<Tuple<ContentType, byte[]>>>) () -> JreHttpUrlConnection.http(
Tuple<Map<String, List<String>>, byte[]> response = java.security.AccessController.doPrivileged(
(PrivilegedAction<ResponseOrException<Tuple<Map<String, List<String>>, byte[]>>>) () -> JreHttpUrlConnection.http(
path,
"error_trace",
cfg,
Expand All @@ -184,31 +190,30 @@ private <Request extends AbstractSqlRequest> byte[] toContent(Request request) {
}
}

private Tuple<ContentType, byte[]> readFrom(InputStream inputStream, Function<String, String> headers) {
String contentType = headers.apply("Content-Type");
ContentType type = ContentFactory.parseMediaType(contentType);
if (type == null) {
throw new IllegalStateException("Unsupported Content-Type: " + contentType);
}
private Tuple<Map<String, List<String>>, byte[]> readFrom(InputStream inputStream, Map<String, List<String>> headers) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
Streams.copy(inputStream, out);
} catch (Exception ex) {
throw new ClientException("Cannot deserialize response", ex);
}
return new Tuple<>(type, out.toByteArray());
return new Tuple<>(headers, out.toByteArray());

}

private <Response> Response fromContent(
ContentType contentType,
Map<String, List<String>> headers,
byte[] bytesReference,
CheckedFunction<JsonParser, Response, IOException> responseParser
) {
try (
InputStream stream = new ByteArrayInputStream(bytesReference);
JsonParser parser = ContentFactory.parser(contentType, stream)
) {
List<String> contentTypeHeaders = headers.get("content-type");
String contentType = contentTypeHeaders == null || contentTypeHeaders.isEmpty() ? null : contentTypeHeaders.get(0);
ContentType type = ContentFactory.parseMediaType(contentType);
if (type == null) {
throw new IllegalStateException("Unsupported Content-Type: " + contentType);
}

try (InputStream stream = new ByteArrayInputStream(bytesReference); JsonParser parser = ContentFactory.parser(type, stream)) {
return responseParser.apply(parser);
} catch (Exception ex) {
throw new ClientException("Cannot parse response", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.sql.SQLSyntaxErrorException;
import java.sql.SQLTimeoutException;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.zip.GZIPInputStream;

Expand Down Expand Up @@ -150,15 +152,15 @@ public boolean head() throws ClientException {

public <R> ResponseOrException<R> request(
CheckedConsumer<OutputStream, IOException> doc,
CheckedBiFunction<InputStream, Function<String, String>, R, IOException> parser,
CheckedBiFunction<InputStream, Map<String, List<String>>, R, IOException> parser,
String requestMethod
) throws ClientException {
return request(doc, parser, requestMethod, "application/json");
}

public <R> ResponseOrException<R> request(
CheckedConsumer<OutputStream, IOException> doc,
CheckedBiFunction<InputStream, Function<String, String>, R, IOException> parser,
CheckedBiFunction<InputStream, Map<String, List<String>>, R, IOException> parser,
String requestMethod,
String contentTypeHeader
) throws ClientException {
Expand All @@ -174,7 +176,7 @@ public <R> ResponseOrException<R> request(
}
if (shouldParseBody(con.getResponseCode())) {
try (InputStream stream = getStream(con, con.getInputStream())) {
return new ResponseOrException<>(parser.apply(new BufferedInputStream(stream), con::getHeaderField));
return new ResponseOrException<>(parser.apply(new BufferedInputStream(stream), con.getHeaderFields()));
}
}
return parserError();
Expand Down

0 comments on commit 990cbcb

Please sign in to comment.