Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL: forward warning headers to JDBC driver #84499

Merged
merged 11 commits into from
Mar 15, 2022
5 changes: 5 additions & 0 deletions docs/changelog/84499.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 84499
summary: Forward warning headers to JDBC driver
area: SQL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ default int columnSize() {
int batchSize();

void close() throws SQLException;

List<String> warnings();

void clearWarnings();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,24 @@ class DefaultCursor implements Cursor {

private final List<JdbcColumnInfo> columnInfos;
private List<List<Object>> rows;
private final List<String> warnings;
private int row = -1;
private String cursor;

DefaultCursor(JdbcHttpClient client, String cursor, List<JdbcColumnInfo> columnInfos, List<List<Object>> rows, RequestMeta meta) {
DefaultCursor(
JdbcHttpClient client,
String cursor,
List<JdbcColumnInfo> columnInfos,
List<List<Object>> rows,
RequestMeta meta,
List<String> warnings
) {
this.client = client;
this.meta = meta;
this.cursor = cursor;
this.columnInfos = columnInfos;
this.rows = rows;
this.warnings = warnings;
}

@Override
Expand Down Expand Up @@ -67,4 +76,13 @@ public void close() throws SQLException {
client.queryClose(cursor);
}
}

public List<String> warnings() {
return warnings;
}

@Override
public void clearWarnings() {
warnings.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.sql.RowIdLifetime;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static java.sql.JDBCType.BIGINT;
Expand Down Expand Up @@ -1420,5 +1421,13 @@ public int batchSize() {
public void close() throws SQLException {
// this cursor doesn't hold any resource - no need to clean up
}

@Override
public List<String> warnings() {
return Collections.emptyList();
}

@Override
public void clearWarnings() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.xpack.sql.client.ClientException;
import org.elasticsearch.xpack.sql.client.ClientVersion;
import org.elasticsearch.xpack.sql.client.HttpClient;
import org.elasticsearch.xpack.sql.client.HttpClient.ResponseWithWarnings;
import org.elasticsearch.xpack.sql.proto.ColumnInfo;
import org.elasticsearch.xpack.sql.proto.MainResponse;
import org.elasticsearch.xpack.sql.proto.Mode;
Expand Down Expand Up @@ -75,8 +76,15 @@ Cursor query(String sql, List<SqlTypedParamValue> params, RequestMeta meta) thro
conCfg.indexIncludeFrozen(),
conCfg.binaryCommunication()
);
SqlQueryResponse response = httpClient.query(sqlRequest);
return new DefaultCursor(this, response.cursor(), toJdbcColumnInfo(response.columns()), response.rows(), meta);
ResponseWithWarnings<SqlQueryResponse> response = httpClient.query(sqlRequest);
return new DefaultCursor(
this,
response.response().cursor(),
toJdbcColumnInfo(response.response().columns()),
response.response().rows(),
meta,
response.warnings()
);
}

/**
Expand All @@ -91,7 +99,7 @@ Tuple<String, List<List<Object>>> nextPage(String cursor, RequestMeta meta) thro
new RequestInfo(Mode.JDBC),
conCfg.binaryCommunication()
);
SqlQueryResponse response = httpClient.query(sqlRequest);
SqlQueryResponse response = httpClient.query(sqlRequest).response();
return new Tuple<>(response.cursor(), response.rows());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class JdbcResultSet implements ResultSet, JdbcWrapper {
private final Calendar defaultCalendar;

private final JdbcStatement statement;
private final Cursor cursor;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By using clearWarnings the Cursor is preserved as final.

private Cursor cursor;
private final Map<String, Integer> nameToIndex = new LinkedHashMap<>();

private boolean closed = false;
Expand Down Expand Up @@ -585,12 +585,21 @@ public InputStream getBinaryStream(String columnLabel) throws SQLException {
@Override
public SQLWarning getWarnings() throws SQLException {
checkOpen();
return null;
SQLWarning sqlWarning = null;
for (String warning : cursor.warnings()) {
if (sqlWarning == null) {
sqlWarning = new SQLWarning(warning);
} else {
sqlWarning.setNextWarning(new SQLWarning(warning));
}
}
return sqlWarning;
}

@Override
public void clearWarnings() throws SQLException {
checkOpen();
cursor.clearWarnings();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,89 @@

package org.elasticsearch.xpack.sql.qa.jdbc;

import org.elasticsearch.Version;
import org.junit.Before;

import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;

import static org.elasticsearch.xpack.sql.qa.jdbc.JdbcTestUtils.JDBC_DRIVER_VERSION;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;

public abstract class JdbcWarningsTestCase extends JdbcIntegrationTestCase {

public void testDeprecationWarningsDoNotReachJdbcDriver() throws Exception {
private static final Version WARNING_HANDLING_ADDED_VERSION = Version.V_8_2_0;

@Before
public void setupData() throws IOException {
index("test_data", b -> b.field("foo", 1));
}

public void testNoWarnings() throws SQLException {
try (Connection connection = esJdbc(); Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SELECT * FROM test_data");
assertNull(rs.getWarnings());
}
}

public void testSingleDeprecationWarning() throws SQLException {
assumeWarningHandlingDriverVersion();

try (Connection connection = esJdbc(); Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SELECT * FROM FROZEN test_data");
SQLWarning warning = rs.getWarnings();
assertThat(warning.getMessage(), containsString("[FROZEN] syntax is deprecated because frozen indices have been deprecated."));
assertNull(warning.getNextWarning());
}
}

public void testMultipleDeprecationWarnings() throws SQLException {
assumeWarningHandlingDriverVersion();

Properties props = connectionProperties();
props.setProperty("index.include.frozen", "true");

try (Connection connection = esJdbc(props); Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SELECT * FROM FROZEN test_data");
List<String> warnings = new LinkedList<>();
SQLWarning warning = rs.getWarnings();
while (warning != null) {
warnings.add(warning.getMessage());
warning = warning.getNextWarning();
}

assertThat(
warnings,
containsInAnyOrder(
containsString("[FROZEN] syntax is deprecated because frozen indices have been deprecated."),
containsString("[index_include_frozen] parameter is deprecated because frozen indices have been deprecated.")
)
);
}
}

public void testClearWarnings() throws SQLException {
assumeWarningHandlingDriverVersion();

try (Connection connection = esJdbc(); Statement statement = connection.createStatement()) {
ResultSet rs = statement.executeQuery("SELECT * FROM FROZEN \"test_*\"");
ResultSet rs = statement.executeQuery("SELECT * FROM FROZEN test_data");
assertNotNull(rs.getWarnings());

rs.clearWarnings();
assertNull(rs.getWarnings());
}
}

private void assumeWarningHandlingDriverVersion() {
assumeTrue("Driver does not yet handle deprecation warnings", JDBC_DRIVER_VERSION.onOrAfter(WARNING_HANDLING_ADDED_VERSION));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.io.InputStream;
import java.security.PrivilegedAction;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

import static java.util.Collections.emptyList;
Expand All @@ -44,6 +46,24 @@
*/
public class HttpClient {

public static class ResponseWithWarnings<R> {
private final R response;
private final List<String> warnings;

ResponseWithWarnings(R response, List<String> warnings) {
this.response = response;
this.warnings = warnings;
}

public R response() {
return response;
}

public List<String> warnings() {
return warnings;
}
}

private final ConnectionConfiguration cfg;
private final ContentType requestBodyContentType;

Expand Down Expand Up @@ -82,10 +102,10 @@ public SqlQueryResponse basicQuery(String query, int fetchSize, boolean fieldMul
false,
cfg.binaryCommunication()
);
return query(sqlRequest);
return query(sqlRequest).response();
}

public SqlQueryResponse query(SqlQueryRequest sqlRequest) throws SQLException {
public ResponseWithWarnings<SqlQueryResponse> query(SqlQueryRequest sqlRequest) throws SQLException {
return post(CoreProtocol.SQL_QUERY_REST_ENDPOINT, sqlRequest, Payloads::parseQueryResponse);
}

Expand All @@ -98,28 +118,28 @@ public SqlQueryResponse nextPage(String cursor) throws SQLException {
new RequestInfo(Mode.CLI),
cfg.binaryCommunication()
);
return post(CoreProtocol.SQL_QUERY_REST_ENDPOINT, sqlRequest, Payloads::parseQueryResponse);
return post(CoreProtocol.SQL_QUERY_REST_ENDPOINT, sqlRequest, Payloads::parseQueryResponse).response();
}

public boolean queryClose(String cursor, Mode mode) throws SQLException {
SqlClearCursorResponse response = post(
ResponseWithWarnings<SqlClearCursorResponse> response = post(
CoreProtocol.CLEAR_CURSOR_REST_ENDPOINT,
new SqlClearCursorRequest(cursor, new RequestInfo(mode)),
Payloads::parseClearCursorResponse
);
return response.isSucceeded();
return response.response().isSucceeded();
}

@SuppressWarnings({ "removal" })
private <Request extends AbstractSqlRequest, Response> Response post(
private <Request extends AbstractSqlRequest, Response> ResponseWithWarnings<Response> post(
String path,
Request request,
CheckedFunction<JsonParser, Response, IOException> responseParser
) throws SQLException {
byte[] requestBytes = toContent(request);
String query = "error_trace";
Tuple<ContentType, byte[]> response = java.security.AccessController.doPrivileged(
(PrivilegedAction<ResponseOrException<Tuple<ContentType, byte[]>>>) () -> JreHttpUrlConnection.http(
Tuple<Function<String, List<String>>, byte[]> response = java.security.AccessController.doPrivileged(
(PrivilegedAction<ResponseOrException<Tuple<Function<String, List<String>>, byte[]>>>) () -> JreHttpUrlConnection.http(
path,
query,
cfg,
Expand All @@ -131,7 +151,11 @@ private <Request extends AbstractSqlRequest, Response> Response post(
)
)
).getResponseOrThrowException();
return fromContent(response.v1(), response.v2(), responseParser);
List<String> warnings = response.v1().apply("Warning");
return new ResponseWithWarnings<>(
fromContent(contentType(response.v1()), response.v2(), responseParser),
warnings == null ? Collections.emptyList() : warnings
);
}

@SuppressWarnings({ "removal" })
Expand Down Expand Up @@ -162,15 +186,15 @@ private boolean head(String path, long timeoutInMs) throws SQLException {

@SuppressWarnings({ "removal" })
private <Response> Response get(String path, CheckedFunction<JsonParser, Response, IOException> responseParser) throws SQLException {
Tuple<ContentType, byte[]> response = java.security.AccessController.doPrivileged(
(PrivilegedAction<ResponseOrException<Tuple<ContentType, byte[]>>>) () -> JreHttpUrlConnection.http(
Tuple<Function<String, List<String>>, byte[]> response = java.security.AccessController.doPrivileged(
(PrivilegedAction<ResponseOrException<Tuple<Function<String, List<String>>, byte[]>>>) () -> JreHttpUrlConnection.http(
path,
"error_trace",
cfg,
con -> con.request(null, this::readFrom, "GET")
)
).getResponseOrThrowException();
return fromContent(response.v1(), response.v2(), responseParser);
return fromContent(contentType(response.v1()), response.v2(), responseParser);
}

private <Request extends AbstractSqlRequest> byte[] toContent(Request request) {
Expand All @@ -184,31 +208,35 @@ private <Request extends AbstractSqlRequest> byte[] toContent(Request request) {
}
}

private Tuple<ContentType, byte[]> readFrom(InputStream inputStream, Function<String, String> headers) {
String contentType = headers.apply("Content-Type");
ContentType type = ContentFactory.parseMediaType(contentType);
if (type == null) {
throw new IllegalStateException("Unsupported Content-Type: " + contentType);
}
private Tuple<Function<String, List<String>>, byte[]> readFrom(InputStream inputStream, Function<String, List<String>> headers) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
Streams.copy(inputStream, out);
} catch (Exception ex) {
throw new ClientException("Cannot deserialize response", ex);
}
return new Tuple<>(type, out.toByteArray());
return new Tuple<>(headers, out.toByteArray());

}

private ContentType contentType(Function<String, List<String>> headers) {
List<String> contentTypeHeaders = headers.apply("Content-Type");

String contentType = contentTypeHeaders == null || contentTypeHeaders.isEmpty() ? null : contentTypeHeaders.get(0);
ContentType type = ContentFactory.parseMediaType(contentType);
if (type == null) {
throw new IllegalStateException("Unsupported Content-Type: " + contentType);
} else {
return type;
}
}

private <Response> Response fromContent(
ContentType contentType,
ContentType type,
byte[] bytesReference,
CheckedFunction<JsonParser, Response, IOException> responseParser
) {
try (
InputStream stream = new ByteArrayInputStream(bytesReference);
JsonParser parser = ContentFactory.parser(contentType, stream)
) {
try (InputStream stream = new ByteArrayInputStream(bytesReference); JsonParser parser = ContentFactory.parser(type, stream)) {
return responseParser.apply(parser);
} catch (Exception ex) {
throw new ClientException("Cannot parse response", ex);
Expand Down
Loading