Skip to content

Commit

Permalink
skip buffer for m3u8 streams
Browse files Browse the repository at this point in the history
  • Loading branch information
sonroyaalmerol committed Sep 23, 2024
1 parent e2f59fa commit 709238b
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 19 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Access the generated M3U playlist at `http://<server ip>:8080/playlist.m3u`.
| USER_AGENT | Set the User-Agent of HTTP requests. | IPTV Smarters/1.0.3 (iPad; iOS 16.6.1; Scale/2.00) | Any valid user agent |
| ~~LOAD_BALANCING_MODE~~ (removed on version 0.10.0) | Set load balancing algorithm to a specific mode | brute-force | brute-force/round-robin |
| PARSER_WORKERS | Set number of workers to spawn for M3U parsing. | 5 | Any positive integer |
| BUFFER_MB | Set buffer size in mb. | 0 (no buffer) | Any positive integer |
| BUFFER_MB | Set buffer size in mb. Maximum suggested value would be 4 MB. | 0 (no buffer) | Any positive integer |
| INCLUDE_GROUPS_1, INCLUDE_GROUPS_2, INCLUDE_GROUPS_X | Set channels to include based on groups (Takes precedence over EXCLUDE_GROUPS_X) | N/A | Go regexp |
| EXCLUDE_GROUPS_1, EXCLUDE_GROUPS_2, EXCLUDE_GROUPS_X | Set channels to exclude based on groups | N/A | Go regexp |
| INCLUDE_TITLE_1, INCLUDE_TITLE_2, INCLUDE_TITLE_X | Set channels to include based on title (Takes precedence over EXCLUDE_TITLE_X) | N/A | Go regexp |
Expand Down
6 changes: 1 addition & 5 deletions proxy/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,13 @@ func (b *Buffer) Write(data []byte) {
b.cond.Broadcast() // Notify all waiting clients that new data is available
}

func (b *Buffer) ReadChunk(size int, force bool) ([]byte, bool) {
func (b *Buffer) ReadChunk(size int) ([]byte, bool) {
b.mu.Lock()
defer b.mu.Unlock()

// Wait for buffer to have enough data
for len(b.data) < size {
b.cond.Wait()
if force {
size = len(b.data)
break
}
}

chunk := b.data[:size]
Expand Down
37 changes: 24 additions & 13 deletions proxy/stream_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func InitializeStream(streamUrl string) (*StreamInstance, error) {
}, nil
}

func (instance *StreamInstance) BufferStream(ctx context.Context, m3uIndex int, resp *http.Response, r *http.Request, statusChan chan int) {
func (instance *StreamInstance) BufferStream(ctx context.Context, m3uIndex int, resp *http.Response, r *http.Request, w http.ResponseWriter, statusChan chan int) {
debug := os.Getenv("DEBUG") == "true"

if r.Method != http.MethodGet || utils.EOFIsExpected(resp) {
Expand All @@ -65,20 +65,35 @@ func (instance *StreamInstance) BufferStream(ctx context.Context, m3uIndex int,
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "#") {
instance.Buffer.Write([]byte(line + "\n"))
_, err := w.Write([]byte(line + "\n"))
if err != nil {
utils.SafeLogf("Failed to write line to response: %v", err)
statusChan <- 4
return
}
} else if strings.TrimSpace(line) != "" {
u, err := url.Parse(line)
if err != nil {
utils.SafeLogf("Failed to parse M3U8 URL in line: %v", err)
instance.Buffer.Write([]byte(line + "\n"))
_, err := w.Write([]byte(line + "\n"))
if err != nil {
utils.SafeLogf("Failed to write line to response: %v", err)
statusChan <- 4
return
}
continue
}

if !u.IsAbs() {
u = base.ResolveReference(u)
}

instance.Buffer.Write([]byte(u.String() + "\n"))
_, err = w.Write([]byte(u.String() + "\n"))
if err != nil {
utils.SafeLogf("Failed to write line to response: %v", err)
statusChan <- 4
return
}
}
}

Expand Down Expand Up @@ -184,7 +199,7 @@ func (instance *StreamInstance) BufferStream(ctx context.Context, m3uIndex int,
}
}

func (instance *StreamInstance) StreamBuffer(ctx context.Context, resp *http.Response, r *http.Request, w http.ResponseWriter) {
func (instance *StreamInstance) StreamBuffer(ctx context.Context, w http.ResponseWriter) {
for {
select {
case <-ctx.Done(): // handle context cancellation
Expand All @@ -196,13 +211,9 @@ func (instance *StreamInstance) StreamBuffer(ctx context.Context, resp *http.Res
bufferSize = bufferMbInt * 1024 * 1024
}

forceRead := r.Method != http.MethodGet || utils.EOFIsExpected(resp)

chunk, ok := instance.Buffer.ReadChunk(bufferSize, forceRead)
chunk, ok := instance.Buffer.ReadChunk(bufferSize)
if !ok {
if !forceRead {
time.Sleep(100 * time.Millisecond) // Wait a bit before checking buffer again
}
time.Sleep(100 * time.Millisecond) // Wait a bit before checking buffer again
continue
}

Expand Down Expand Up @@ -308,12 +319,12 @@ func Handler(w http.ResponseWriter, r *http.Request) {

defer stream.Buffer.ingest.Unlock()

stream.BufferStream(ctx, selectedIndex, resp, r, exitStatus)
stream.BufferStream(ctx, selectedIndex, resp, r, w, exitStatus)
return
}
}
}()
go stream.StreamBuffer(ctx, resp, r, w)
go stream.StreamBuffer(ctx, w)

stream.Buffer.testedIndexes = append(stream.Buffer.testedIndexes, selectedIndex)

Expand Down

0 comments on commit 709238b

Please sign in to comment.