Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ public X509Certificate[] getAcceptedIssuers() {

private long connectTimeout = TimeUnit.SECONDS.toMillis(3);

private long netWriteTimeout;

private long netReadTimeout;

private volatile ExecutorService keepAliveThreadExecutor;

private final Lock connectLock = new ReentrantLock();
Expand Down Expand Up @@ -496,6 +500,42 @@ public void setConnectTimeout(long connectTimeout) {
this.connectTimeout = connectTimeout;
}

/**
* @return net_write_timeout in seconds (0 if not set (default)).
* @see #setNetWriteTimeout(long)
*/
public long getNetWriteTimeout() {
return netWriteTimeout;
}

/**
* @param netWriteTimeout net_write_timeout in seconds. Controls how long the server waits for
* a write to the client to complete. May need to be increased for large data volumes to avoid
* EOFException. 0 means use server default.
* @see #getNetWriteTimeout()
*/
public void setNetWriteTimeout(long netWriteTimeout) {
this.netWriteTimeout = netWriteTimeout;
}

/**
* @return net_read_timeout in seconds (0 if not set (default)).
* @see #setNetReadTimeout(long)
*/
public long getNetReadTimeout() {
return netReadTimeout;
}

/**
* @param netReadTimeout net_read_timeout in seconds. Controls how long the server waits for
* a read from the client to complete. May need to be increased for high-latency networks to
* avoid EOFException. 0 means use server default.
* @see #getNetReadTimeout()
*/
public void setNetReadTimeout(long netReadTimeout) {
this.netReadTimeout = netReadTimeout;
}

/**
* @param eventDeserializer custom event deserializer
*/
Expand Down Expand Up @@ -682,6 +722,12 @@ protected void setupConnection() throws IOException {
if (heartbeatInterval > 0) {
enableHeartbeat();
}
if (netWriteTimeout > 0) {
setNetWriteTimeoutOnServer();
}
if (netReadTimeout > 0) {
setNetReadTimeoutOnServer();
}
}

private PacketChannel openChannel() throws IOException {
Expand Down Expand Up @@ -779,6 +825,18 @@ private void enableHeartbeat() throws IOException {
checkError(statementResult);
}

private void setNetWriteTimeoutOnServer() throws IOException {
channel.write(new QueryCommand("set net_write_timeout=" + netWriteTimeout));
byte[] statementResult = channel.read();
checkError(statementResult);
}

private void setNetReadTimeoutOnServer() throws IOException {
channel.write(new QueryCommand("set net_read_timeout=" + netReadTimeout));
byte[] statementResult = channel.read();
checkError(statementResult);
}

private void setMasterServerId() throws IOException {
channel.write(new QueryCommand("select @@server_id"));
ResultSetRowPacket[] resultSet = readResultSet();
Expand Down
Loading
Loading