From c421e06b99beb5546849d19d225343c850325e6b Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 3 Feb 2026 15:41:26 +0100 Subject: [PATCH 1/5] test: add query timeout unit test --- .../v3/client/query/QueryOptionsTest.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index c17e118f..21f40502 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -41,6 +41,10 @@ import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.FlightRuntimeException; import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.NoOpFlightProducer; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.FlightProducer.CallContext; +import org.apache.arrow.flight.FlightProducer.ServerStreamListener; import org.apache.arrow.flight.impl.FlightServiceGrpc; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -48,6 +52,7 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import com.influxdb.v3.client.InfluxDBClient; import com.influxdb.v3.client.PointValues; @@ -186,6 +191,53 @@ void setInboundMessageSizeLarge() throws Exception { } } + @Test + @Timeout(5) + void queryTimeout() throws Exception { + int freePort = findFreePort(); + URI uri = URI.create("http://127.0.0.1:" + freePort); + try (VectorSchemaRoot vectorSchemaRoot = TestUtils.generateVectorSchemaRoot(1, 1); + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FlightServer flightServer = TestUtils.simpleFlightServer(uri, allocator, new NoOpFlightProducer() { + @Override + public void getStream(final CallContext context, + final Ticket ticket, + final ServerStreamListener listener) { + listener.start(vectorSchemaRoot); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + listener.completed(); + } + }) + ) { + flightServer.start(); + + String host = String.format("http://%s:%d", uri.getHost(), uri.getPort()); + ClientConfig clientConfig = new ClientConfig.Builder() + .host(host) + .database("test") + .queryTimeout(Duration.ofMillis(200)) + .build(); + + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + Throwable thrown = Assertions.catchThrowable(() -> { + try (Stream stream = influxDBClient.queryPoints( + "Select * from \"nothing\"" + )) { + stream.count(); + } + }); + + Assertions.assertThat(thrown).isInstanceOf(FlightRuntimeException.class); + FlightRuntimeException fre = (FlightRuntimeException) thrown; + Assertions.assertThat(fre.status().code()).isEqualTo(CallStatus.TIMED_OUT.code()); + } + } + } + @Test void defaultGrpcCallOptions() { GrpcCallOptions grpcCallOptions = new QueryOptions("test").grpcCallOptions(); From fc62497250a6fa7b22252a0eafabc5497eb7f6cd Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 3 Feb 2026 15:50:00 +0100 Subject: [PATCH 2/5] chore: update Arrow Flight and align Netty version with it --- pom.xml | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 8c8db111..39441264 100644 --- a/pom.xml +++ b/pom.xml @@ -90,7 +90,7 @@ 5.14.2 1.14.2 - 4.2.9.Final + 4.1.124.Final 3.5.4 0.8.14 @@ -98,6 +98,18 @@ 3.12.0 + + + + io.grpc + grpc-bom + 1.75.0 + pom + import + + + + From b8a599ee36543e75e1a2784a3a659c8cab285dc9 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 3 Feb 2026 16:15:59 +0100 Subject: [PATCH 3/5] style: fix imports order --- .../java/com/influxdb/v3/client/query/QueryOptionsTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index 21f40502..9f972e34 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -41,10 +41,10 @@ import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.FlightRuntimeException; import org.apache.arrow.flight.FlightServer; -import org.apache.arrow.flight.NoOpFlightProducer; -import org.apache.arrow.flight.Ticket; import org.apache.arrow.flight.FlightProducer.CallContext; import org.apache.arrow.flight.FlightProducer.ServerStreamListener; +import org.apache.arrow.flight.NoOpFlightProducer; +import org.apache.arrow.flight.Ticket; import org.apache.arrow.flight.impl.FlightServiceGrpc; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; From 8f8edcf1d045daec18c0b064c2826d0e4e397811 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 3 Feb 2026 16:22:56 +0100 Subject: [PATCH 4/5] style: fix imports order --- .../java/com/influxdb/v3/client/query/QueryOptionsTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java index 9f972e34..b11b30f6 100644 --- a/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java +++ b/src/test/java/com/influxdb/v3/client/query/QueryOptionsTest.java @@ -39,10 +39,10 @@ import org.apache.arrow.flight.CallOption; import org.apache.arrow.flight.CallOptions; import org.apache.arrow.flight.CallStatus; -import org.apache.arrow.flight.FlightRuntimeException; -import org.apache.arrow.flight.FlightServer; import org.apache.arrow.flight.FlightProducer.CallContext; import org.apache.arrow.flight.FlightProducer.ServerStreamListener; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.FlightServer; import org.apache.arrow.flight.NoOpFlightProducer; import org.apache.arrow.flight.Ticket; import org.apache.arrow.flight.impl.FlightServiceGrpc; From 09d17bebde2e20ae5fcacf3ed2fefc2dcf82a08b Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 3 Feb 2026 22:49:37 +0100 Subject: [PATCH 5/5] test: fix E2E test for Enterprise --- .../v3/client/integration/E2ETest.java | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java index 63c95afa..917b606f 100644 --- a/src/test/java/com/influxdb/v3/client/integration/E2ETest.java +++ b/src/test/java/com/influxdb/v3/client/integration/E2ETest.java @@ -110,15 +110,16 @@ void correctSslCertificates() throws Exception { .database(System.getenv("TESTING_INFLUXDB_DATABASE")) .sslRootsFilePath(influxDBcertificateFile) .build(); - InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig); - assertGetDataSuccess(influxDBClient); + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + assertGetDataSuccess(influxDBClient); + } } @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") @Test - void disableServerCertificateValidation() { + void disableServerCertificateValidation() throws Exception { String wrongCertificateFile = "src/test/java/com/influxdb/v3/client/testdata/docker.com.pem"; ClientConfig clientConfig = new ClientConfig.Builder() @@ -130,8 +131,9 @@ void disableServerCertificateValidation() { .build(); // Test succeeded with wrong certificate file because disableServerCertificateValidation is true - InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig); - assertGetDataSuccess(influxDBClient); + try (InfluxDBClient influxDBClient = InfluxDBClient.getInstance(clientConfig)) { + assertGetDataSuccess(influxDBClient); + } } @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") @@ -196,8 +198,9 @@ public void testQueryRows() throws Exception { String uuid = UUID.randomUUID().toString(); String measurement = "host22"; List> testDatas = new ArrayList<>(); + long baseTimestamp = System.currentTimeMillis(); for (int i = 0; i <= 9; i++) { - long timestamp = System.currentTimeMillis(); + long timestamp = baseTimestamp + i; Map map = Map.of( "measurement", measurement, "tag", "tagValue", @@ -230,6 +233,8 @@ public void testQueryRows() throws Exception { testDatas.add(map); } + Thread.sleep(2_000); + Map parameters = Map.of("testId", uuid); // Result set much be ordered by time String sql = String.format("Select * from %s where \"testId\"=$testId order by time", measurement); @@ -274,8 +279,9 @@ public void testQueryRowWithOptions() throws Exception { String uuid = UUID.randomUUID().toString(); String measurement = "host21"; List> testDatas = new ArrayList<>(); + long baseTimestamp = System.currentTimeMillis(); for (int i = 0; i <= 9; i++) { - long timestamp = System.currentTimeMillis(); + long timestamp = baseTimestamp + i; Map map = Map.of( "measurement", measurement, "tag", "tagValue", @@ -308,6 +314,8 @@ public void testQueryRowWithOptions() throws Exception { testDatas.add(map); } + Thread.sleep(2_000); + // Result set much be ordered by time String sql = String.format("Select * from %s where \"testId\"='%s' order by time", measurement, uuid); try (Stream> stream = client.queryRows(sql, QueryOptions.defaultQueryOptions())) { @@ -417,6 +425,12 @@ public void testNoAllocatorMemoryLeak() { .setTimestamp(now)); client.writePoints(points); + try { + Thread.sleep(2_000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } String query = "SELECT * FROM " + measurement; try (Stream stream = client.queryPoints(query)) { @@ -469,6 +483,7 @@ public void testMultipleQueries() throws Exception { .setTimestamp(now)); client.writePoints(points); + Thread.sleep(2_000); String query = "SELECT * FROM " + measurement; for (int i = 0; i < 20; i++) {