Skip to content

Commit

Permalink
add a semaphore channel to ensure max concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
sonroyaalmerol committed Mar 2, 2024
1 parent 0b8084f commit ea0d678
Showing 1 changed file with 39 additions and 6 deletions.
45 changes: 39 additions & 6 deletions mp4_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"m3u-stream-merger/utils"
"net/http"
"strings"
"sync"
"syscall"
)

Expand Down Expand Up @@ -40,19 +41,51 @@ func mp4Handler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")

var resp *http.Response
var mutex sync.Mutex
defer func() {
mutex.Lock()
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
mutex.Unlock()
}()

// Concurrently handle each stream URL
var wg sync.WaitGroup
for _, url := range stream.URLs {
resp, err = http.Get(url.Content)
if err == nil {
break
}
// Log the error
log.Printf("Error fetching MP4 stream: %s\n", err.Error())
wg.Add(1)
go func(url database.StreamURL) {
defer wg.Done()

// Ensure that the maximum concurrency is respected
semaphore := make(chan struct{}, url.MaxConcurrency)
semaphore <- struct{}{}

mutex.Lock()
defer mutex.Unlock()

// If there's already a successful response, return immediately
if resp != nil {
return
}

// Wait for permission from the semaphore before making the request
<-semaphore
defer func() {
semaphore <- struct{}{}
}()

// Make the request
response, err := http.Get(url.Content)
if err != nil {
// Log the error
log.Printf("Error fetching MP4 stream: %s\n", err.Error())
return
}

// Set the response if successful
resp = response
}(url)
}

if resp == nil {
Expand Down

0 comments on commit ea0d678

Please sign in to comment.