Skip to content

Commit

Permalink
fix m3u8 streams
Browse files Browse the repository at this point in the history
  • Loading branch information
sonroyaalmerol committed Sep 23, 2024
1 parent 1d0d82d commit 2744b38
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
8 changes: 5 additions & 3 deletions proxy/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ func (b *Buffer) Write(data []byte) {
b.cond.Broadcast() // Notify all waiting clients that new data is available
}

func (b *Buffer) ReadChunk(size int) ([]byte, bool) {
func (b *Buffer) ReadChunk(size int, force bool) ([]byte, bool) {
b.mu.Lock()
defer b.mu.Unlock()

// Wait for buffer to have enough data
for len(b.data) < size {
b.cond.Wait()
if !force {
for len(b.data) < size {
b.cond.Wait()
}
}

chunk := b.data[:size]
Expand Down
12 changes: 8 additions & 4 deletions proxy/stream_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (instance *StreamInstance) BufferStream(ctx context.Context, m3uIndex int,
}
}

func (instance *StreamInstance) StreamBuffer(ctx context.Context, w http.ResponseWriter) {
func (instance *StreamInstance) StreamBuffer(ctx context.Context, resp *http.Response, r *http.Request, w http.ResponseWriter) {
for {
select {
case <-ctx.Done(): // handle context cancellation
Expand All @@ -196,9 +196,13 @@ func (instance *StreamInstance) StreamBuffer(ctx context.Context, w http.Respons
bufferSize = bufferMbInt * 1024 * 1024
}

chunk, ok := instance.Buffer.ReadChunk(bufferSize)
forceRead := r.Method != http.MethodGet || utils.EOFIsExpected(resp)

chunk, ok := instance.Buffer.ReadChunk(bufferSize, forceRead)
if !ok {
time.Sleep(100 * time.Millisecond) // Wait a bit before checking buffer again
if !forceRead {
time.Sleep(100 * time.Millisecond) // Wait a bit before checking buffer again
}
continue
}

Expand Down Expand Up @@ -304,7 +308,7 @@ func Handler(w http.ResponseWriter, r *http.Request) {
}
}
}()
go stream.StreamBuffer(ctx, w)
go stream.StreamBuffer(ctx, resp, r, w)

stream.Buffer.testedIndexes = append(stream.Buffer.testedIndexes, selectedIndex)

Expand Down

0 comments on commit 2744b38

Please sign in to comment.