Skip to content

Commit

Permalink
use internal redis for persistent tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
sonroyaalmerol committed Mar 2, 2024
1 parent ea0d678 commit 6d42fce
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 44 deletions.
16 changes: 13 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,24 @@ RUN go test ./...

####################

FROM scratch
# Start a new stage from scratch
FROM alpine

Check failure on line 29 in Dockerfile

View workflow job for this annotation

GitHub Actions / Lint - Docker

DL3006 warning: Always tag the version of an image explicitly

WORKDIR /
# Install Redis
# hadolint ignore=DL3018
RUN apk --no-cache add redis

# Copy the built Go binary from the previous stage
COPY --from=build /app/main /gomain

# Expose ports for Go application and Redis
EXPOSE 8080

ENTRYPOINT ["/gomain"]
# Copy the entrypoint script
COPY entrypoint.sh /

# Set execute permission on the entrypoint script
RUN chmod +x /entrypoint.sh

# Run the entrypoint script
CMD ["/entrypoint.sh"]
14 changes: 14 additions & 0 deletions database/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package database

import "github.com/redis/go-redis/v9"

func InitializeRedis() *redis.Client {
// Initialize Redis client
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Change this to your Redis server address
Password: "", // No password set
DB: 0, // Use default DB
})

return redisClient
}
12 changes: 12 additions & 0 deletions entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/sh

# Start Redis server
redis-server --daemonize yes

# Wait until Redis is ready
while ! redis-cli ping &>/dev/null; do
sleep 0.1
done

# Start Go application and redirect its stdout to the container's stdout
/gomain
9 changes: 7 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ module m3u-stream-merger

go 1.21.5

require github.com/satori/go.uuid v1.2.0
require github.com/mattn/go-sqlite3 v1.14.22

require github.com/mattn/go-sqlite3 v1.14.22 // indirect
require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/redis/go-redis/v9 v9.5.1 // indirect
)
12 changes: 10 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
78 changes: 41 additions & 37 deletions mp4_handler.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
package main

import (
"context"
"errors"
"io"
"log"
"m3u-stream-merger/database"
"m3u-stream-merger/utils"
"net/http"
"strconv"
"strings"
"sync"
"syscall"

"github.com/redis/go-redis/v9"
)

func mp4Handler(w http.ResponseWriter, r *http.Request) {
// Create a context with cancellation capability
ctx, cancel := context.WithCancel(r.Context())
defer cancel()

// Log the incoming request
log.Printf("Received request from %s for URL: %s\n", r.RemoteAddr, r.URL.Path)

Expand Down Expand Up @@ -41,51 +48,26 @@ func mp4Handler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")

var resp *http.Response
var mutex sync.Mutex
defer func() {
mutex.Lock()
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
mutex.Unlock()
}()

// Concurrently handle each stream URL
var wg sync.WaitGroup
// Iterate through the streams and select one based on concurrency and availability
for _, url := range stream.URLs {
wg.Add(1)
go func(url database.StreamURL) {
defer wg.Done()

// Ensure that the maximum concurrency is respected
semaphore := make(chan struct{}, url.MaxConcurrency)
semaphore <- struct{}{}

mutex.Lock()
defer mutex.Unlock()

// If there's already a successful response, return immediately
if resp != nil {
return
}
if checkConcurrency(ctx, url.Content, url.MaxConcurrency) {
continue // Skip this stream if concurrency limit reached
}

// Wait for permission from the semaphore before making the request
<-semaphore
defer func() {
semaphore <- struct{}{}
}()

// Make the request
response, err := http.Get(url.Content)
if err != nil {
// Log the error
log.Printf("Error fetching MP4 stream: %s\n", err.Error())
return
}
resp, err = http.Get(url.Content)
if err == nil {
updateConcurrency(ctx, url.Content)
break
}

// Set the response if successful
resp = response
}(url)
// Log the error
log.Printf("Error fetching MP4 stream: %s\n", err.Error())
}

if resp == nil {
Expand Down Expand Up @@ -127,3 +109,25 @@ func mp4Handler(w http.ResponseWriter, r *http.Request) {
}
}
}

func checkConcurrency(ctx context.Context, url string, maxConcurrency int) bool {
redisClient := database.InitializeRedis()
val, err := redisClient.Get(ctx, url).Result()
if err == redis.Nil {
return false // Key does not exist
} else if err != nil {
log.Printf("Error checking concurrency: %s\n", err.Error())
return false // Error occurred, treat as concurrency not reached
}

count, _ := strconv.Atoi(val)
return count >= maxConcurrency
}

func updateConcurrency(ctx context.Context, url string) {
redisClient := database.InitializeRedis()
err := redisClient.Incr(ctx, url).Err()
if err != nil {
log.Printf("Error updating concurrency: %s\n", err.Error())
}
}

0 comments on commit 6d42fce

Please sign in to comment.