diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index c8291385..41b95b4c 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -169,6 +169,10 @@ public X509Certificate[] getAcceptedIssuers() { private long connectTimeout = TimeUnit.SECONDS.toMillis(3); + private long netWriteTimeout; + + private long netReadTimeout; + private volatile ExecutorService keepAliveThreadExecutor; private final Lock connectLock = new ReentrantLock(); @@ -496,6 +500,42 @@ public void setConnectTimeout(long connectTimeout) { this.connectTimeout = connectTimeout; } + /** + * @return net_write_timeout in seconds (0 if not set (default)). + * @see #setNetWriteTimeout(long) + */ + public long getNetWriteTimeout() { + return netWriteTimeout; + } + + /** + * @param netWriteTimeout net_write_timeout in seconds. Controls how long the server waits for + * a write to the client to complete. May need to be increased for large data volumes to avoid + * EOFException. 0 means use server default. + * @see #getNetWriteTimeout() + */ + public void setNetWriteTimeout(long netWriteTimeout) { + this.netWriteTimeout = netWriteTimeout; + } + + /** + * @return net_read_timeout in seconds (0 if not set (default)). + * @see #setNetReadTimeout(long) + */ + public long getNetReadTimeout() { + return netReadTimeout; + } + + /** + * @param netReadTimeout net_read_timeout in seconds. Controls how long the server waits for + * a read from the client to complete. May need to be increased for high-latency networks to + * avoid EOFException. 0 means use server default. + * @see #getNetReadTimeout() + */ + public void setNetReadTimeout(long netReadTimeout) { + this.netReadTimeout = netReadTimeout; + } + /** * @param eventDeserializer custom event deserializer */ @@ -682,6 +722,12 @@ protected void setupConnection() throws IOException { if (heartbeatInterval > 0) { enableHeartbeat(); } + if (netWriteTimeout > 0) { + setNetWriteTimeoutOnServer(); + } + if (netReadTimeout > 0) { + setNetReadTimeoutOnServer(); + } } private PacketChannel openChannel() throws IOException { @@ -779,6 +825,18 @@ private void enableHeartbeat() throws IOException { checkError(statementResult); } + private void setNetWriteTimeoutOnServer() throws IOException { + channel.write(new QueryCommand("set net_write_timeout=" + netWriteTimeout)); + byte[] statementResult = channel.read(); + checkError(statementResult); + } + + private void setNetReadTimeoutOnServer() throws IOException { + channel.write(new QueryCommand("set net_read_timeout=" + netReadTimeout)); + byte[] statementResult = channel.read(); + checkError(statementResult); + } + private void setMasterServerId() throws IOException { channel.write(new QueryCommand("select @@server_id")); ResultSetRowPacket[] resultSet = readResultSet(); diff --git a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java index a212c592..09bafd9c 100644 --- a/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java +++ b/src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java @@ -69,6 +69,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -86,6 +87,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -1073,6 +1075,299 @@ public void testMySQL8InvisibleColumn() throws Exception { eventListener.waitFor(WriteRowsEventData.class, 1, DEFAULT_TIMEOUT); } + @Test(timeOut = 120000) + public void testNetWriteTimeoutCausesEOF() throws Exception { + MysqlOnetimeServer server = new MysqlOnetimeServer(); + server.boot(); + + MySQLConnection conn = new MySQLConnection("127.0.0.1", server.getPort(), "root", ""); + + try { + conn.execute((Callback) statement -> { + statement.execute("CREATE DATABASE IF NOT EXISTS net_timeout_test"); + statement.execute("USE net_timeout_test"); + statement.execute("CREATE TABLE big_data (id INT AUTO_INCREMENT PRIMARY KEY, " + + "data1 LONGTEXT, data2 LONGTEXT)"); + }); + + // Create BinaryLogClient with net_write_timeout=1 (1 second) + final BinaryLogClient testClient = new BinaryLogClient("127.0.0.1", server.getPort(), "root", ""); + testClient.setNetWriteTimeout(1); + testClient.setKeepAlive(false); + + // Track communication failure or disconnect + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final AtomicReference failureException = new AtomicReference<>(); + + testClient.registerLifecycleListener(new BinaryLogClient.AbstractLifecycleListener() { + @Override + public void onCommunicationFailure(BinaryLogClient client, Exception ex) { + logger.info("onCommunicationFailure: " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + failureException.set(ex); + disconnectLatch.countDown(); + } + + @Override + public void onDisconnect(BinaryLogClient client) { + logger.info("onDisconnect triggered"); + disconnectLatch.countDown(); + } + }); + + // EventListener: block for 30s on first WRITE_ROWS event to stall binlog consumption + final CountDownLatch blockingStarted = new CountDownLatch(1); + testClient.registerEventListener(new BinaryLogClient.EventListener() { + private volatile boolean blocked = false; + public void onEvent(Event event) { + if (!blocked && EventType.isRowMutation(event.getHeader().getEventType())) { + blocked = true; + blockingStarted.countDown(); + logger.info("Row mutation event received, blocking for 30s..."); + try { + // Block 30s - well beyond net_write_timeout=1s + Thread.sleep(30000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + }); + + testClient.connect(DEFAULT_TIMEOUT); + logger.info("BinaryLogClient connected, inserting data..."); + + // Insert large data in a background thread + final MysqlOnetimeServer finalServer = server; + final AtomicReference dataThreadError = new AtomicReference<>(); + Thread dataThread = new Thread(() -> { + try { + MySQLConnection dataConn = new MySQLConnection("127.0.0.1", finalServer.getPort(), "root", ""); + dataConn.connection.setAutoCommit(true); // ensure each INSERT is committed immediately + Statement stmt = dataConn.connection.createStatement(); + stmt.execute("USE net_timeout_test"); + String pad = new String(new char[50000]).replace('\0', 'X'); + for (int i = 0; i < 1000; i++) { + try { + stmt.execute("INSERT INTO big_data (data1, data2) VALUES ('" + + pad + "', '" + pad + "')"); + } catch (SQLException e) { + logger.info("Data insertion interrupted at row " + i + ": " + e.getMessage()); + break; + } + } + stmt.close(); + dataConn.close(); + } catch (Exception e) { + logger.info("Data thread exception: " + e.getClass().getSimpleName() + ": " + e.getMessage()); + dataThreadError.set(e); + } + }); + dataThread.start(); + + // Wait for event processing to start blocking + assertTrue(blockingStarted.await(60, TimeUnit.SECONDS), "Should receive WRITE_ROWS event and start blocking"); + logger.info("Event processing blocked, waiting for TCP buffer to fill + net_write_timeout..."); + + // Wait for disconnect (up to 60s) + boolean disconnected = disconnectLatch.await(60, TimeUnit.SECONDS); + assertTrue(disconnected, "net_write_timeout=1 should cause disconnect (EOF error)"); + + Exception ex = failureException.get(); + if (ex != null) { + logger.info("net_write_timeout=1 triggered communication failure: " + + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + } else { + logger.info("net_write_timeout=1 triggered disconnect"); + } + + try { + testClient.disconnect(); + } catch (Exception ignored) {} + dataThread.interrupt(); + dataThread.join(5000); + } finally { + conn.close(); + server.shutDown(); + } + } + + @Test(timeOut = 120000) + public void testNetWriteTimeoutLargeValueWorksNormally() throws Exception { + // Boot a standalone MySQL instance + MysqlOnetimeServer server = new MysqlOnetimeServer(); + server.boot(); + + MySQLConnection conn = new MySQLConnection("127.0.0.1", server.getPort(), "root", ""); + + try { + // Set global net_write_timeout=1 (same harsh condition as testNetWriteTimeoutCausesEOF) + conn.execute(new Callback() { + public void execute(Statement statement) throws SQLException { + statement.execute("SET GLOBAL net_write_timeout=1"); + statement.execute("CREATE DATABASE IF NOT EXISTS net_timeout_test2"); + statement.execute("USE net_timeout_test2"); + statement.execute("CREATE TABLE big_data2 (id INT AUTO_INCREMENT PRIMARY KEY, " + + "data1 LONGTEXT, data2 LONGTEXT)"); + } + }); + logger.info("Global net_write_timeout set to 1s"); + + // Create BinaryLogClient with session-level net_write_timeout=120 (overrides global) + final BinaryLogClient testClient = new BinaryLogClient("127.0.0.1", server.getPort(), "root", ""); + testClient.setNetWriteTimeout(120); // session override + testClient.setKeepAlive(false); + + // Monitor communication failures + final AtomicBoolean communicationFailed = new AtomicBoolean(false); + final AtomicReference failureException = new AtomicReference(); + testClient.registerLifecycleListener(new BinaryLogClient.AbstractLifecycleListener() { + @Override + public void onCommunicationFailure(BinaryLogClient client, Exception ex) { + communicationFailed.set(true); + failureException.set(ex); + logger.info("onCommunicationFailure (should not happen): " + + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + } + }); + + // EventListener: block for 10s on first WRITE_ROWS event + // global net_write_timeout=1 would disconnect in 10s without session override + // session net_write_timeout=120 keeps connection alive + final CountDownLatch blockingDone = new CountDownLatch(1); + final CountDownLatch blockingStarted = new CountDownLatch(1); + testClient.registerEventListener(new BinaryLogClient.EventListener() { + private volatile boolean blocked = false; + public void onEvent(Event event) { + if (!blocked && EventType.isRowMutation(event.getHeader().getEventType())) { + blocked = true; + blockingStarted.countDown(); + logger.info("Row mutation event received, blocking for 10s (testing session override)..."); + try { + // 10s: well beyond global net_write_timeout=1s, but within session 120s + Thread.sleep(10000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + blockingDone.countDown(); + logger.info("10s blocking finished, resuming event processing"); + } + } + }); + + testClient.connect(DEFAULT_TIMEOUT); + logger.info("BinaryLogClient connected (global net_write_timeout=1, session=120)"); + + try { + // Insert large data in background (same pressure as testNetWriteTimeoutCausesEOF) + final MysqlOnetimeServer finalServer = server; + Thread dataThread = new Thread(() -> { + try { + MySQLConnection dataConn = new MySQLConnection("127.0.0.1", finalServer.getPort(), "root", ""); + dataConn.connection.setAutoCommit(true); + Statement stmt = dataConn.connection.createStatement(); + stmt.execute("USE net_timeout_test2"); + String pad = new String(new char[50000]).replace('\0', 'Y'); + for (int i = 0; i < 500; i++) { + try { + stmt.execute("INSERT INTO big_data2 (data1, data2) VALUES ('" + + pad + "', '" + pad + "')"); + } catch (SQLException e) { + break; + } + } + stmt.close(); + dataConn.close(); + } catch (Exception e) { + // ignored + } + }); + dataThread.start(); + + // Wait for blocking to start + assertTrue(blockingStarted.await(60, TimeUnit.SECONDS), "Should receive row mutation event and start blocking"); + logger.info("Event processing blocked for 10s, global net_write_timeout=1 but session=120..."); + + // Wait for blocking to finish + assertTrue(blockingDone.await(30, TimeUnit.SECONDS), "10s blocking should complete normally"); + + // Wait a bit more to confirm connection is still alive + Thread.sleep(3000); + + // Core assertion: with global net_write_timeout=1 + large data + 10s blocking, + // session-level net_write_timeout=120 keeps the connection alive + assertFalse(communicationFailed.get(), + "session net_write_timeout=120 should override global net_write_timeout=1, " + + "no communication failure expected. Actual: " + failureException.get()); + + logger.info("session net_write_timeout=120 successfully overrode global=1, connection stable"); + + dataThread.interrupt(); + dataThread.join(5000); + } finally { + testClient.disconnect(); + } + } finally { + // Restore global setting + try { + conn.execute((Callback) statement -> statement.execute("SET GLOBAL net_write_timeout=60")); + } catch (Exception ignored) {} + conn.close(); + server.shutDown(); + } + } + + @Test(timeOut = 60000) + public void testNetReadTimeoutApplied() throws Exception { + // Boot a standalone MySQL instance + MysqlOnetimeServer server = new MysqlOnetimeServer(); + server.boot(); + + MySQLConnection conn = new MySQLConnection("127.0.0.1", server.getPort(), "root", ""); + + try { + // Set global net_read_timeout=1 + conn.execute((Callback) statement -> statement.execute("SET GLOBAL net_read_timeout=1")); + logger.info("Global net_read_timeout set to 1s"); + + // Create BinaryLogClient with session-level net_read_timeout=120 + final BinaryLogClient testClient = new BinaryLogClient("127.0.0.1", server.getPort(), "root", ""); + testClient.setNetReadTimeout(120); + testClient.setKeepAlive(false); + + testClient.connect(DEFAULT_TIMEOUT); + logger.info("BinaryLogClient connected (global net_read_timeout=1, session=120)"); + + try { + // Verify global net_read_timeout=1 via JDBC + final AtomicLong globalValue = new AtomicLong(); + conn.execute((Callback) statement -> { + ResultSet rs = statement.executeQuery("SELECT @@global.net_read_timeout"); + rs.next(); + globalValue.set(rs.getLong(1)); + rs.close(); + }); + assertEquals(globalValue.get(), 1, "global net_read_timeout should be 1"); + logger.info("Confirmed global net_read_timeout=" + globalValue.get()); + + // Connection remains stable, proving session override is effective + Thread.sleep(3000); + assertTrue(testClient.isConnected(), + "session net_read_timeout=120 should override global net_read_timeout=1, connection should be stable"); + + logger.info("session net_read_timeout=120 successfully overrode global=1, connection stable"); + } finally { + testClient.disconnect(); + } + } finally { + // Restore global setting + try { + conn.execute((Callback) statement -> statement.execute("SET GLOBAL net_read_timeout=30")); + } catch (Exception ignored) {} + conn.close(); + server.shutDown(); + } + } + @AfterMethod public void afterEachTest() throws Exception { final CountDownLatch latch = new CountDownLatch(1);