Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2527 增加流量控制功能,防止select大表导致mycat内存问题 #2540

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/main/java/io/mycat/backend/BackendConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ public void execute(RouteResultsetNode node, ServerConnection source,

public boolean checkAlive();

public void disableRead();

public void enableRead();

}
11 changes: 11 additions & 0 deletions src/main/java/io/mycat/backend/jdbc/JDBCConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -994,5 +994,16 @@ public boolean checkAlive() {
}
}

@Override
public void disableRead() {
// TODO Auto-generated method stub

}

@Override
public void enableRead() {
// TODO Auto-generated method stub

}

}
10 changes: 10 additions & 0 deletions src/main/java/io/mycat/backend/mysql/nio/MySQLConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -794,4 +794,14 @@ public boolean isModifiedSQLExecuted() {
public int getTxIsolation() {
return txIsolation;
}

@Override
public void disableRead() {
this.getSocketWR().disableRead();
}

@Override
public void enableRead() {
this.getSocketWR().enableRead();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -587,4 +587,16 @@ public void query(String sql, int charsetIndex) {
LOGGER.debug("UnsupportedEncodingException :"+ e.getMessage());
}
}

@Override
public void disableRead() {
// TODO Auto-generated method stub

}

@Override
public void enableRead() {
// TODO Auto-generated method stub

}
}
34 changes: 34 additions & 0 deletions src/main/java/io/mycat/config/model/SystemConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ public final class SystemConfig {

private int parallExecute;

private boolean enableWriteQueueFlowControl;// 写队列流量控制
private int writeQueueStopThreshold;// 写队列停止写入阈值
private int writeQueueRecoverThreshold;// 写队列恢复写入阈值

public String getDefaultSqlParser() {
return defaultSqlParser;
}
Expand Down Expand Up @@ -314,6 +318,11 @@ public SystemConfig() {
this.ignoreUnknownCommand = 0;
this.parallExecute = 0;
this.removeGraveAccent = 1;

// 流量控制相关
this.enableWriteQueueFlowControl = false;
this.writeQueueStopThreshold = 10 * 1024;
this.writeQueueRecoverThreshold = 512;
}

public void setMaxPreparedStmtCount(int maxPreparedStmtCount){
Expand Down Expand Up @@ -1060,4 +1069,29 @@ public int getRemoveGraveAccent() {
public void setRemoveGraveAccent(int removeGraveAccent) {
this.removeGraveAccent = removeGraveAccent;
}

public boolean isEnableWriteQueueFlowControl() {
return enableWriteQueueFlowControl;
}

public void setEnableWriteQueueFlowControl(boolean enableWriteQueueFlowControl) {
this.enableWriteQueueFlowControl = enableWriteQueueFlowControl;
}

public int getWriteQueueStopThreshold() {
return writeQueueStopThreshold;
}

public void setWriteQueueStopThreshold(int writeQueueStopThreshold) {
this.writeQueueStopThreshold = writeQueueStopThreshold;
}

public int getWriteQueueRecoverThreshold() {
return writeQueueRecoverThreshold;
}

public void setWriteQueueRecoverThreshold(int writeQueueRecoverThreshold) {
this.writeQueueRecoverThreshold = writeQueueRecoverThreshold;
}

}
6 changes: 6 additions & 0 deletions src/main/java/io/mycat/manager/ManagerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,10 @@ public void handle(final byte[] data) {
handler.handle(data);
}

@Override
public void checkQueueFlow() {
// TODO Auto-generated method stub

}

}
12 changes: 12 additions & 0 deletions src/main/java/io/mycat/net/AIOSocketWR.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,18 @@ public void doNextWriteCheck()
public boolean checkAlive() {
return channel.isOpen();
}

@Override
public void disableRead() {
// TODO Auto-generated method stub

}

@Override
public void enableRead() {
// TODO Auto-generated method stub

}
}

class AIOWriteHandler implements CompletionHandler<Integer, AIOSocketWR> {
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/io/mycat/net/AbstractConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@

import com.google.common.base.Strings;

import io.mycat.MycatServer;
import io.mycat.backend.mysql.CharsetUtil;
import io.mycat.config.model.SystemConfig;
import io.mycat.util.CompressUtil;
import io.mycat.util.TimeUtil;

Expand Down Expand Up @@ -85,6 +87,10 @@ public abstract class AbstractConnection implements NIOConnection {

protected final SocketWR socketWR;

protected boolean enableFlowController;// writeQueue是否开启流控
protected final int writeQueueStopThreshold;// writeQueue停止阀值
protected final int writeQueueRecoverThreshold;// writeQueue恢复阀值

public AbstractConnection(NetworkChannel channel) {
this.channel = channel;
boolean isAIO = (channel instanceof AsynchronousChannel);
Expand All @@ -97,6 +103,11 @@ public AbstractConnection(NetworkChannel channel) {
this.startupTime = TimeUtil.currentTimeMillis();
this.lastReadTime = startupTime;
this.lastWriteTime = startupTime;

SystemConfig config = MycatServer.getInstance().getConfig().getSystem();
this.enableFlowController = config.isEnableWriteQueueFlowControl();
this.writeQueueStopThreshold = config.getWriteQueueStopThreshold();
this.writeQueueRecoverThreshold = config.getWriteQueueRecoverThreshold();
}

public String getCharset() {
Expand Down Expand Up @@ -623,4 +634,22 @@ public void onConnectfinish() {
public boolean checkAlive(){
return socketWR.checkAlive();
}

public boolean isEnableFlowController() {
return enableFlowController;
}

public int getWriteQueueStopThreshold() {
return writeQueueStopThreshold;
}

public int getWriteQueueRecoverThreshold() {
return writeQueueRecoverThreshold;
}

/**
* 检查写队列流量,必要时候进行流控
*/
abstract public void checkQueueFlow();

}
5 changes: 5 additions & 0 deletions src/main/java/io/mycat/net/BackendAIOConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public void setProcessor(NIOProcessor processor) {
processor.addBackend(this);
}

@Override
public void checkQueueFlow() {

}

@Override
public String toString() {
return "BackendConnection [id=" + id + ", host=" + host + ", port="
Expand Down
88 changes: 88 additions & 0 deletions src/main/java/io/mycat/net/FrontendConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.NetworkChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.mycat.MycatServer;
import io.mycat.backend.BackendConnection;
import io.mycat.backend.mysql.CharsetUtil;
import io.mycat.backend.mysql.MySQLMessage;
import io.mycat.config.Capabilities;
Expand All @@ -54,6 +58,7 @@
import io.mycat.net.mysql.HandshakeV10Packet;
import io.mycat.net.mysql.MySQLPacket;
import io.mycat.net.mysql.OkPacket;
import io.mycat.route.RouteResultsetNode;
import io.mycat.server.parser.ServerParse;
import io.mycat.util.CompressUtil;
import io.mycat.util.RandomUtil;
Expand All @@ -80,6 +85,7 @@ public abstract class FrontendConnection extends AbstractConnection {
protected LoadDataInfileHandler loadDataInfileHandler;
protected boolean isAccepted;
protected boolean isAuthenticated;
protected QueueFlowController flowController;
private boolean allowMultiStatements = false;

public FrontendConnection(NetworkChannel channel) throws IOException {
Expand All @@ -97,6 +103,10 @@ public FrontendConnection(NetworkChannel channel) throws IOException {
this.port = localAddr.getPort();
this.localPort = remoteAddr.getPort();
this.handler = new FrontendAuthenticator(this);

if (enableFlowController) {
this.flowController = new QueueFlowController(this);
}
}

public long getId() {
Expand Down Expand Up @@ -672,5 +682,83 @@ public void setOption(byte[] data) {
return;
}
}

public boolean isEnableFlowController() {
return enableFlowController;
}

public void setEnableFlowController(boolean enableFlowController) {
this.enableFlowController = enableFlowController;
}

public QueueFlowController getFlowController() {
return flowController;
}

/**
*
* 队列流量控制器,防止队列过大内存OOM,功能:
* 1)超过最大阀值,关闭NIO读事件,停止从网络读取mysql数据
* 2)队列恢复到可继续写的阀值,重启NIO读事件,继续写队列
*/
public class QueueFlowController {

private volatile boolean readIOStopped; // 读事件的IO是否已经停止
private Collection<BackendConnection> relationedBackendConns;// 关联的后端连接
private final FrontendConnection frontendConn;

public QueueFlowController(FrontendConnection c) {
this.readIOStopped = false;
this.relationedBackendConns = new ArrayList<BackendConnection>();
this.frontendConn = c;
}

/**
*恢复所有后端连接的读事件
*/
private void recoverIORead() {
if (readIOStopped) {
synchronized (relationedBackendConns) {
if (readIOStopped) {// 再次判断,防止并发多次执行
readIOStopped = false;
for (final BackendConnection conn : relationedBackendConns) {
conn.enableRead();
}
relationedBackendConns.clear();
LOGGER.info("The connection[{}] has removed flow control.", frontendConn.toString());
}
}
}
}

private void stopIORead(final Collection<BackendConnection> conns) {
if (null != conns && conns.size() > 0) {
synchronized (relationedBackendConns) {
if (!readIOStopped) {// 再次判断,防止并发多次执行
readIOStopped = true;
for (BackendConnection conn : conns) {
conn.disableRead();
this.relationedBackendConns.add(conn);
}
LOGGER.info("Now the connection[{}] is under flow control", frontendConn.toString());
}
}
}
}
/**
* 检查writeQueue的流量控制阈值
*
* @param connection
*/
public void check(Map<RouteResultsetNode, BackendConnection> target) {
int size = writeQueue.size();
if (!readIOStopped && size > writeQueueStopThreshold) {
stopIORead(target.values());
} else {
if (readIOStopped && size <= writeQueueRecoverThreshold) {
recoverIORead();
}
}
}
}
}
6 changes: 5 additions & 1 deletion src/main/java/io/mycat/net/NIOProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,11 @@ private void checkConSendQueue(AbstractConnection c) {
if (!c.writeQueue.isEmpty()) {
c.getSocketWR().doNextWriteCheck();
}
}

if (c.isEnableFlowController()) {
c.checkQueueFlow();
}
}

// 后端连接检查
private void backendCheck() {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/mycat/net/SocketWR.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ public abstract class SocketWR {
public abstract void asynRead() throws IOException;
public abstract void doNextWriteCheck() ;
public abstract boolean checkAlive();
public abstract void disableRead();
public abstract void enableRead();
}
8 changes: 7 additions & 1 deletion src/main/java/io/mycat/server/NonBlockingSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class NonBlockingSession implements Session {

private volatile MiddlerResultHandler middlerResultHandler;
private boolean prepared;
private RouteResultset rrs;

public NonBlockingSession(ServerConnection source) {
this.source = source;
Expand Down Expand Up @@ -118,7 +119,7 @@ public BackendConnection removeTarget(RouteResultsetNode key) {

@Override
public void execute(RouteResultset rrs, int type) {

this.rrs = rrs;
// clear prev execute resources
clearHandlesResources();
if (LOGGER.isDebugEnabled()) {
Expand Down Expand Up @@ -659,4 +660,9 @@ public String toString() {
}
return sb.toString();
}

public RouteResultset getRrs() {
return rrs;
}

}
Loading