Skip to content

Commit

Permalink
feat:support zstd compress and uncompressed (#1701)
Browse files Browse the repository at this point in the history
* feat:support zstd compress and uncompressed

* fix:real & stackless write using different pool to avoid get stackless.writer

* fix:zstd normalize compress level

* Change empty string checks to be more idiomatic (#1684)

* chore:lint fix and rebase with master

* chore:remove 1.18 test & upgrade compress version

* fix:error default compress level

* Fix lint

---------

Co-authored-by: Erik Dubbelboer <erik@dubbelboer.com>
  • Loading branch information
Max-Cheng and erikdubbelboer committed Feb 21, 2024
1 parent 4c326e8 commit 5f81476
Show file tree
Hide file tree
Showing 9 changed files with 435 additions and 18 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ on:
branches:
- master
pull_request:

permissions:
# Required: allow read access to the content for analysis.
contents: read
# Optional: allow read access to pull request. Use with `only-new-issues` option.
pull-requests: read
# Optional: Allow write access to checks to allow the action to annotate code in the PR.
checks: write

jobs:
lint:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
strategy:
fail-fast: false
matrix:
go-version: [1.18.x, 1.19.x, 1.20.x, 1.21.x, 1.22.x]
go-version: [1.19.x, 1.20.x, 1.21.x, 1.22.x]
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
Expand Down
74 changes: 59 additions & 15 deletions fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/andybalholm/brotli"
"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/zstd"
"github.com/valyala/bytebufferpool"
)

Expand Down Expand Up @@ -370,6 +371,7 @@ const FSCompressedFileSuffix = ".fasthttp.gz"
var FSCompressedFileSuffixes = map[string]string{
"gzip": ".fasthttp.gz",
"br": ".fasthttp.br",
"zstd": ".fasthttp.zst",
}

// FSHandlerCacheDuration is the default expiration duration for inactive
Expand Down Expand Up @@ -460,7 +462,9 @@ func (fs *FS) initRequestHandler() {

compressedFileSuffixes := fs.CompressedFileSuffixes
if compressedFileSuffixes["br"] == "" || compressedFileSuffixes["gzip"] == "" ||
compressedFileSuffixes["br"] == compressedFileSuffixes["gzip"] {
compressedFileSuffixes["zstd"] == "" || compressedFileSuffixes["br"] == compressedFileSuffixes["gzip"] ||
compressedFileSuffixes["br"] == compressedFileSuffixes["zstd"] ||
compressedFileSuffixes["gzip"] == compressedFileSuffixes["zstd"] {
// Copy global map
compressedFileSuffixes = make(map[string]string, len(FSCompressedFileSuffixes))
for k, v := range FSCompressedFileSuffixes {
Expand All @@ -471,6 +475,7 @@ func (fs *FS) initRequestHandler() {
if fs.CompressedFileSuffix != "" {
compressedFileSuffixes["gzip"] = fs.CompressedFileSuffix
compressedFileSuffixes["br"] = FSCompressedFileSuffixes["br"]
compressedFileSuffixes["zstd"] = FSCompressedFileSuffixes["zstd"]
}

h := &fsHandler{
Expand Down Expand Up @@ -794,6 +799,7 @@ const (
defaultCacheKind CacheKind = iota
brotliCacheKind
gzipCacheKind
zstdCacheKind
)

func newCacheManager(fs *FS) cacheManager {
Expand Down Expand Up @@ -1032,14 +1038,19 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) {
fileEncoding := ""
byteRange := ctx.Request.Header.peek(strRange)
if len(byteRange) == 0 && h.compress {
if h.compressBrotli && ctx.Request.Header.HasAcceptEncodingBytes(strBr) {
switch {
case h.compressBrotli && ctx.Request.Header.HasAcceptEncodingBytes(strBr):
mustCompress = true
fileCacheKind = brotliCacheKind
fileEncoding = "br"
} else if ctx.Request.Header.HasAcceptEncodingBytes(strGzip) {
case ctx.Request.Header.HasAcceptEncodingBytes(strGzip):
mustCompress = true
fileCacheKind = gzipCacheKind
fileEncoding = "gzip"
case ctx.Request.Header.HasAcceptEncodingBytes(strZstd):
mustCompress = true
fileCacheKind = zstdCacheKind
fileEncoding = "zstd"
}
}

Expand Down Expand Up @@ -1097,10 +1108,13 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) {

hdr := &ctx.Response.Header
if ff.compressed {
if fileEncoding == "br" {
switch fileEncoding {
case "br":
hdr.SetContentEncodingBytes(strBr)
} else if fileEncoding == "gzip" {
case "gzip":
hdr.SetContentEncodingBytes(strGzip)
case "zstd":
hdr.SetContentEncodingBytes(strZstd)
}
}

Expand Down Expand Up @@ -1304,10 +1318,13 @@ nestedContinue:

if mustCompress {
var zbuf bytebufferpool.ByteBuffer
if fileEncoding == "br" {
switch fileEncoding {
case "br":
zbuf.B = AppendBrotliBytesLevel(zbuf.B, w.B, CompressDefaultCompression)
} else if fileEncoding == "gzip" {
case "gzip":
zbuf.B = AppendGzipBytesLevel(zbuf.B, w.B, CompressDefaultCompression)
case "zstd":
zbuf.B = AppendZstdBytesLevel(zbuf.B, w.B, CompressZstdDefault)
}
w = &zbuf
}
Expand Down Expand Up @@ -1406,20 +1423,28 @@ func (h *fsHandler) compressFileNolock(
}
return nil, errNoCreatePermission
}
if fileEncoding == "br" {
switch fileEncoding {
case "br":
zw := acquireStacklessBrotliWriter(zf, CompressDefaultCompression)
_, err = copyZeroAlloc(zw, f)
if err1 := zw.Flush(); err == nil {
err = err1
}
releaseStacklessBrotliWriter(zw, CompressDefaultCompression)
} else if fileEncoding == "gzip" {
case "gzip":
zw := acquireStacklessGzipWriter(zf, CompressDefaultCompression)
_, err = copyZeroAlloc(zw, f)
if err1 := zw.Flush(); err == nil {
err = err1
}
releaseStacklessGzipWriter(zw, CompressDefaultCompression)
case "zstd":
zw := acquireStacklessZstdWriter(zf, CompressZstdDefault)
_, err = copyZeroAlloc(zw, f)
if err1 := zw.Flush(); err == nil {
err = err1
}
releaseStacklessZstdWriter(zw, CompressZstdDefault)
}
_ = zf.Close()
_ = f.Close()
Expand All @@ -1443,20 +1468,28 @@ func (h *fsHandler) newCompressedFSFileCache(f fs.File, fileInfo fs.FileInfo, fi
err error
)

if fileEncoding == "br" {
switch fileEncoding {
case "br":
zw := acquireStacklessBrotliWriter(w, CompressDefaultCompression)
_, err = copyZeroAlloc(zw, f)
if err1 := zw.Flush(); err == nil {
err = err1
}
releaseStacklessBrotliWriter(zw, CompressDefaultCompression)
} else if fileEncoding == "gzip" {
case "gzip":
zw := acquireStacklessGzipWriter(w, CompressDefaultCompression)
_, err = copyZeroAlloc(zw, f)
if err1 := zw.Flush(); err == nil {
err = err1
}
releaseStacklessGzipWriter(zw, CompressDefaultCompression)
case "zstd":
zw := acquireStacklessZstdWriter(w, CompressZstdDefault)
_, err = copyZeroAlloc(zw, f)
if err1 := zw.Flush(); err == nil {
err = err1
}
releaseStacklessZstdWriter(zw, CompressZstdDefault)
}
defer func() { _ = f.Close() }()

Expand Down Expand Up @@ -1600,21 +1633,28 @@ func (h *fsHandler) newFSFile(f fs.File, fileInfo fs.FileInfo, compressed bool,
func readFileHeader(f io.Reader, compressed bool, fileEncoding string) ([]byte, error) {
r := f
var (
br *brotli.Reader
zr *gzip.Reader
br *brotli.Reader
zr *gzip.Reader
zsr *zstd.Decoder
)
if compressed {
var err error
if fileEncoding == "br" {
switch fileEncoding {
case "br":
if br, err = acquireBrotliReader(f); err != nil {
return nil, err
}
r = br
} else if fileEncoding == "gzip" {
case "gzip":
if zr, err = acquireGzipReader(f); err != nil {
return nil, err
}
r = zr
case "zstd":
if zsr, err = acquireZstdReader(f); err != nil {
return nil, err
}
r = zsr
}
}

Expand All @@ -1639,6 +1679,10 @@ func readFileHeader(f io.Reader, compressed bool, fileEncoding string) ([]byte,
releaseGzipReader(zr)
}

if zsr != nil {
releaseZstdReader(zsr)
}

return data, err
}

Expand Down
70 changes: 70 additions & 0 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,23 @@ func (ctx *RequestCtx) RequestBodyStream() io.Reader {
return ctx.Request.bodyStream
}

func (req *Request) BodyUnzstd() ([]byte, error) {
return unzstdData(req.Body())
}

func (resp *Response) BodyUnzstd() ([]byte, error) {
return unzstdData(resp.Body())
}

func unzstdData(p []byte) ([]byte, error) {
var bb bytebufferpool.ByteBuffer
_, err := WriteUnzstd(&bb, p)
if err != nil {
return nil, err
}
return bb.B, nil
}

func inflateData(p []byte) ([]byte, error) {
var bb bytebufferpool.ByteBuffer
_, err := WriteInflate(&bb, p)
Expand All @@ -554,6 +571,8 @@ func (req *Request) BodyUncompressed() ([]byte, error) {
return req.BodyGunzip()
case "br":
return req.BodyUnbrotli()
case "zstd":
return req.BodyUnzstd()
default:
return nil, ErrContentEncodingUnsupported
}
Expand All @@ -574,6 +593,8 @@ func (resp *Response) BodyUncompressed() ([]byte, error) {
return resp.BodyGunzip()
case "br":
return resp.BodyUnbrotli()
case "zstd":
return resp.BodyUnzstd()
default:
return nil, ErrContentEncodingUnsupported
}
Expand Down Expand Up @@ -1849,6 +1870,55 @@ func (resp *Response) deflateBody(level int) error {
return nil
}

func (resp *Response) zstdBody(level int) error {
if len(resp.Header.ContentEncoding()) > 0 {
return nil
}

if !resp.Header.isCompressibleContentType() {
return nil
}

if resp.bodyStream != nil {
// Reset Content-Length to -1, since it is impossible
// to determine body size beforehand of streamed compression.
// For
resp.Header.SetContentLength(-1)

// Do not care about memory allocations here, since flate is slow
// and allocates a lot of memory by itself.
bs := resp.bodyStream
resp.bodyStream = NewStreamReader(func(sw *bufio.Writer) {
zw := acquireStacklessZstdWriter(sw, level)
fw := &flushWriter{
wf: zw,
bw: sw,
}
copyZeroAlloc(fw, bs) //nolint:errcheck
releaseStacklessZstdWriter(zw, level)
if bsc, ok := bs.(io.Closer); ok {
bsc.Close()
}
})
} else {
bodyBytes := resp.bodyBytes()
if len(bodyBytes) < minCompressLen {
return nil
}
w := responseBodyPool.Get()
w.B = AppendZstdBytesLevel(w.B, bodyBytes, level)

if resp.body != nil {
responseBodyPool.Put(resp.body)
}
resp.body = w
resp.bodyRaw = nil
}
resp.Header.SetContentEncodingBytes(strZstd)
resp.Header.addVaryBytes(strAcceptEncoding)
return nil
}

// Bodies with sizes smaller than minCompressLen aren't compressed at all.
const minCompressLen = 200

Expand Down
Binary file added request_body.zst
Binary file not shown.
9 changes: 7 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,10 +523,13 @@ func CompressHandler(h RequestHandler) RequestHandler {
func CompressHandlerLevel(h RequestHandler, level int) RequestHandler {
return func(ctx *RequestCtx) {
h(ctx)
if ctx.Request.Header.HasAcceptEncodingBytes(strGzip) {
switch {
case ctx.Request.Header.HasAcceptEncodingBytes(strGzip):
ctx.Response.gzipBody(level) //nolint:errcheck
} else if ctx.Request.Header.HasAcceptEncodingBytes(strDeflate) {
case ctx.Request.Header.HasAcceptEncodingBytes(strDeflate):
ctx.Response.deflateBody(level) //nolint:errcheck
case ctx.Request.Header.HasAcceptEncodingBytes(strZstd):
ctx.Response.zstdBody(level) //nolint:errcheck
}
}
}
Expand Down Expand Up @@ -559,6 +562,8 @@ func CompressHandlerBrotliLevel(h RequestHandler, brotliLevel, otherLevel int) R
ctx.Response.gzipBody(otherLevel) //nolint:errcheck
case ctx.Request.Header.HasAcceptEncodingBytes(strDeflate):
ctx.Response.deflateBody(otherLevel) //nolint:errcheck
case ctx.Request.Header.HasAcceptEncodingBytes(strZstd):
ctx.Response.zstdBody(otherLevel) //nolint:errcheck
}
}
}
Expand Down
1 change: 1 addition & 0 deletions strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ var (
strClose = []byte("close")
strGzip = []byte("gzip")
strBr = []byte("br")
strZstd = []byte("zstd")
strDeflate = []byte("deflate")
strKeepAlive = []byte("keep-alive")
strUpgrade = []byte("Upgrade")
Expand Down
Loading

0 comments on commit 5f81476

Please sign in to comment.