Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions pkg/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,24 @@ func (e *E2E) Do(rewrite func(r *http.Request)) (*http.Response, error) {
// wait for a while to let the server ready
time.Sleep(time.Millisecond * 100)

rewrite(e.req)
nr := e.req.Clone(context.Background())

method := e.req.Method
rewrite(nr)

e.req.Header.Set(protocol.InternalUpstreamAddr, e.ts.Listener.Addr().String())
method := nr.Method

nr.Header.Set(protocol.InternalUpstreamAddr, e.ts.Listener.Addr().String())

if dumpReq.Load() && method != "PURGE" {
DumpReq(e.req)
DumpReq(nr)
}

if manual.Load() {
fmt.Printf("manual mode wait 20s, src addr %q\n", e.ts.Listener.Addr().String())
time.Sleep(time.Second * 20)
}

resp, err := e.cs.Do(e.req)
resp, err := e.cs.Do(nr)
e.resp = resp
e.err = err

Expand Down
90 changes: 64 additions & 26 deletions server/middleware/caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ func Middleware(c *configv1.Middleware) (middleware.Middleware, func(), error) {
proxyClient := proxy.GetProxy()
store := storagev1.Current()

// Flight groups for collapsed forwarding at object and chunk level.
// These mirror Squid's collapsed_forwarding: one origin request
// serves many waiting clients.
objectFlight := &ObjectFlightGroup{}
chunkFlight := &ChunkFlightGroup{}

return middleware.RoundTripperFunc(func(req *http.Request) (resp *http.Response, err error) {
// only cache GET/HEAD request
if req.Method != http.MethodGet && req.Method != http.MethodHead {
Expand All @@ -130,6 +136,9 @@ func Middleware(c *configv1.Middleware) (middleware.Middleware, func(), error) {
// cachingPool.Put(caching)
//}()

// Wire up chunk-level collapsed forwarding.
caching.chunkFlight = chunkFlight

// err to BYPASS caching
if err != nil {
caching.log.Warnf("Precache processor failed: %v BYPASS", err)
Expand All @@ -147,34 +156,28 @@ func Middleware(c *configv1.Middleware) (middleware.Middleware, func(), error) {

// cache HIT
if caching.hit {
caching.cacheStatus = storage.CacheHit

rng, err1 := xhttp.SingleRange(req.Header.Get("Range"), caching.md.Size)
if err1 != nil {
// 无效 Range 处理
headers := make(http.Header)
xhttp.CopyHeader(caching.md.Headers, headers)
headers.Set("Content-Range", fmt.Sprintf("bytes */%d", caching.md.Size))
return nil, xhttp.NewBizError(http.StatusRequestedRangeNotSatisfiable, headers)
}

// mark cache status with Range requests.
caching.markCacheStatus(rng.Start, rng.End)
return caching.respondFromCache(req)
}

// find file seek(start, end)
resp, err = caching.lazilyRespond(req, rng.Start, rng.End)
if err != nil {
// fd leak
closeBody(resp)
return nil, err
// full MISS — use object-level collapsed forwarding so that
// concurrent requests for the same cache object share one
// origin fetch (Squid-style collapsed_forwarding).
if opts.CollapsedRequest {
flightResp, _, flightErr := objectFlight.Do(caching.id.HashStr(), opts.CollapsedRequestWaitTimeout.AsDuration(), func() (*http.Response, error) {
r, e := caching.doProxy(req, false)
if e != nil {
return nil, e
}
return processor.postCacheProcessor(caching, req, r)
})
if flightErr != nil {
return nil, flightErr
}

// response now
resp, err = caching.processor.postCacheProcessor(caching, req, resp)
resp = flightResp
return
}

// full MISS
// full MISS (collapsed forwarding disabled)
resp, err = caching.doProxy(req, false)
if err != nil {
return nil, err
Expand All @@ -187,6 +190,31 @@ func Middleware(c *configv1.Middleware) (middleware.Middleware, func(), error) {
}, middleware.EmptyCleanup, nil
}

// respondFromCache assembles a response from cached chunks for a cache HIT.
// It parses the Range header, builds a multi-part reader from disk, and
// runs post-cache processing (headers, cache status, store).
func (c *Caching) respondFromCache(req *http.Request) (*http.Response, error) {
c.cacheStatus = storage.CacheHit

rng, err := xhttp.SingleRange(req.Header.Get("Range"), c.md.Size)
if err != nil {
headers := make(http.Header)
xhttp.CopyHeader(c.md.Headers, headers)
headers.Set("Content-Range", fmt.Sprintf("bytes */%d", c.md.Size))
return nil, xhttp.NewBizError(http.StatusRequestedRangeNotSatisfiable, headers)
}

c.markCacheStatus(rng.Start, rng.End)

resp, err := c.lazilyRespond(req, rng.Start, rng.End)
if err != nil {
closeBody(resp)
return nil, err
}

return c.processor.postCacheProcessor(c, req, resp)
}

func (c *Caching) lazilyRespond(req *http.Request, start, end int64) (*http.Response, error) {
// 这里通过缓存的块大小来计算,而不是配置默认的 SliceSize
// 这样已缓存的对象可以使用原来的配置块大小,不受配置文件变更影响
Expand Down Expand Up @@ -271,16 +299,26 @@ func (c *Caching) getUpstreamReader(fromByte, toByte uint64, async bool) (io.Rea
closeBody(resp)
return nil, err
}
// 部分命中
c.cacheStatus = storage.CachePartHit
// 发起的是 206 请求,但是返回的非 206
if resp.StatusCode != http.StatusPartialContent {
c.log.Warnf("getUpstreamReader doProxy[chunk]: status code: %d, bod size: %d", resp.StatusCode, resp.ContentLength)
return resp, xhttp.NewBizError(resp.StatusCode, resp.Header)
}
return resp, nil
}

// Chunk-level collapsed forwarding: if another goroutine is already
// fetching the same byte range for this object, wait and share the
// response body (io.MultiWriter fan-out). This mirrors Squid's
// collapsed_forwarding at the chunk/segment level.
if c.chunkFlight != nil && c.opt.CollapsedRequest && c.id != nil {
key := fmt.Sprintf("%s:%d-%d", c.id.HashStr(), fromByte, toByte)
reader, shared, err := c.chunkFlight.Do(key, c.opt.CollapsedRequestWaitTimeout.AsDuration(), doSubRequest)
Comment on lines +313 to +315
if shared {
c.cacheStatus = storage.CachePartHit
}
return reader, err
}

if async {
return iobuf.AsyncReadCloser(doSubRequest), nil
}
Expand Down
106 changes: 106 additions & 0 deletions server/middleware/caching/chunk_flight.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package caching

import (
"io"
"net/http"
"sync"
"time"
)

// chunkCall is an in-flight chunk upstream request.
type chunkCall struct {
pipes []*io.PipeWriter
}

// ChunkFlightGroup collapses concurrent upstream requests for the same
// (object, byte-range) into a single origin fetch. Response body bytes
// are fanned out to all waiters via io.MultiWriter + io.Pipe.
//
// This mirrors Squid's collapsed_forwarding at the chunk/segment level:
// when two goroutines request the same byte range of the same cached
// object, only one hits origin and the others wait.
type ChunkFlightGroup struct {
mu sync.Mutex
m map[string]*chunkCall
}

// Do executes fn once per key. All callers — including the first — receive
// an io.PipeReader carrying the upstream response body. The returned bool
// reports whether this caller shared an in-flight request.
//
// waiter is the duration the origin goroutine pauses *before* calling fn,
// giving late-arriving callers a window to register under the same key.
// In production the network round-trip naturally provides this window;
// waiter ensures correctness even when fn would otherwise complete nearly
// instantly (e.g. in tests, or for tiny ranges on a local origin).
//
// Contract: fn owns resp.Body. On success ChunkFlightGroup reads and
// closes it. On error fn must either return (nil, err) or close the body
// before returning (resp, err).
func (g *ChunkFlightGroup) Do(key string, waiter time.Duration, fn func() (*http.Response, error)) (io.ReadCloser, bool, error) {
pr, pw := io.Pipe()

g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*chunkCall)
}
if c, ok := g.m[key]; ok {
c.pipes = append(c.pipes, pw)
g.mu.Unlock()
return pr, true, nil
}

c := &chunkCall{pipes: []*io.PipeWriter{pw}}
g.m[key] = c
g.mu.Unlock()

go func() {
// Pause before hitting origin so concurrent callers have time
// to register under this key. Without this window an instant
// fn would complete and delete the map entry before anyone
// else could join.
if waiter > 0 {
time.Sleep(waiter)
}

resp, err := fn()

g.mu.Lock()
// Snapshot pipes and remove the key so no further callers
// register against this flight.
pipes := make([]*io.PipeWriter, len(c.pipes))
copy(pipes, c.pipes)
delete(g.m, key)
Comment on lines +68 to +73

if err != nil {
g.mu.Unlock()
for _, p := range pipes {
_ = p.CloseWithError(err)
}
// fn owns resp.Body on error — it must close it before
// returning. We only guard against a nil body here.
Comment on lines +80 to +81
return
}

// Build MultiWriter from all registered pipe writers.
writers := make([]io.Writer, len(pipes))
for i, p := range pipes {
writers[i] = p
}
mw := io.MultiWriter(writers...)
g.mu.Unlock()

_, copyErr := io.Copy(mw, resp.Body)
_ = resp.Body.Close()

for _, p := range pipes {
if copyErr != nil && copyErr != io.EOF {
Comment on lines +85 to +97
_ = p.CloseWithError(copyErr)
} else {
_ = p.Close()
}
}
}()

return pr, false, nil
}
Loading
Loading