Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more debug logs #91

Merged
merged 2 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 70 additions & 4 deletions database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package database
import (
"context"
"fmt"
"log"
"math"
"os"
"strconv"
Expand Down Expand Up @@ -61,6 +62,8 @@ func (db *Instance) ClearDb() error {
}

func (db *Instance) SaveToDb(streams []StreamInfo) error {
var debug = os.Getenv("DEBUG") == "true"

pipeline := db.Redis.Pipeline()

for _, s := range streams {
Expand All @@ -72,32 +75,59 @@ func (db *Instance) SaveToDb(streams []StreamInfo) error {
"logo_url": s.LogoURL,
"group_name": s.Group,
}

if debug {
log.Printf("[DEBUG] Preparing to set data for stream key %s: %v\n", streamKey, streamData)
}

pipeline.HSet(db.Ctx, streamKey, streamData)

for index, u := range s.URLs {
streamURLKey := fmt.Sprintf("stream:%s:url:%d", s.Slug, index)

if debug {
log.Printf("[DEBUG] Preparing to set URL for key %s: %s\n", streamURLKey, u)
}

pipeline.Set(db.Ctx, streamURLKey, u, 0)
}

// Add to the sorted set
sortScore := calculateSortScore(s)

if debug {
log.Printf("[DEBUG] Adding to sorted set with score %f and member %s\n", sortScore, streamKey)
}

pipeline.ZAdd(db.Ctx, "streams_sorted", redis.Z{
Score: sortScore,
Member: streamKey,
})
}

if len(streams) > 0 {
if debug {
log.Println("[DEBUG] Executing pipeline...")
}

_, err := pipeline.Exec(db.Ctx)
if err != nil {
return fmt.Errorf("SaveToDb error: %v", err)
}

if debug {
log.Println("[DEBUG] Pipeline executed successfully.")
}
}

db.Cache.Clear("streams_sorted_cache")

if debug {
log.Println("[DEBUG] Cache cleared.")
}

return nil
}

func (db *Instance) DeleteStreamBySlug(slug string) error {
streamKey := fmt.Sprintf("stream:%s", slug)

Expand Down Expand Up @@ -204,12 +234,21 @@ func (db *Instance) GetStreamBySlug(slug string) (StreamInfo, error) {
}

func (db *Instance) GetStreams() ([]StreamInfo, error) {
var debug = os.Getenv("DEBUG") == "true"

// Check if the data is in the cache
cacheKey := "streams_sorted_cache"
if data, found := db.Cache.Get(cacheKey); found {
if debug {
log.Printf("[DEBUG] Cache hit for key %s\n", cacheKey)
}
return data, nil
}

if debug {
log.Println("[DEBUG] Cache miss. Retrieving streams from Redis...")
}

keys, err := db.Redis.ZRange(db.Ctx, "streams_sorted", 0, -1).Result()
if err != nil {
return nil, fmt.Errorf("error retrieving streams: %v", err)
Expand All @@ -223,6 +262,10 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) {
}
}

if debug {
log.Printf("[DEBUG] Filtered stream keys: %v\n", streamKeys)
}

// Split the stream keys into chunks
chunkSize := 100
var chunks [][]string
Expand All @@ -234,28 +277,39 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) {
chunks = append(chunks, streamKeys[i:end])
}

if debug {
log.Printf("[DEBUG] Chunks created: %d chunks\n", len(chunks))
}

// Create channels for work distribution and results collection
workChan := make(chan []string, len(chunks))
resultChan := make(chan []StreamInfo, len(chunks))
errChan := make(chan error, len(chunks))

// Define the number of workers
parserWorkers := os.Getenv("PARSER_WORKERS")
if parserWorkers != "" {
if parserWorkers == "" {
parserWorkers = "5"
}
numWorkers, err := strconv.Atoi(parserWorkers)
if err != nil {
numWorkers = 5
}

if debug {
log.Printf("[DEBUG] Number of workers: %d\n", numWorkers)
}

var wg sync.WaitGroup

// Start the worker pool
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
go func(workerID int) {
defer wg.Done()
if debug {
log.Printf("[DEBUG] Worker %d started\n", workerID)
}
for chunk := range workChan {
pipe := db.Redis.Pipeline()
cmds := make([]*redis.MapStringStringCmd, len(chunk))
Expand All @@ -264,6 +318,10 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) {
cmds[i] = pipe.HGetAll(db.Ctx, key)
}

if debug {
log.Printf("[DEBUG] Executing pipeline for chunk: %v\n", chunk)
}

_, err := pipe.Exec(db.Ctx)
if err != nil {
errChan <- fmt.Errorf("error executing Redis pipeline: %v", err)
Expand All @@ -288,6 +346,10 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) {
URLs: map[int]string{},
}

if debug {
log.Printf("[DEBUG] Processing stream: %v\n", stream)
}

urlKeys, err := db.Redis.Keys(db.Ctx, fmt.Sprintf("%s:url:*", chunk[i])).Result()
if err != nil {
errChan <- fmt.Errorf("error finding URLs for stream: %v", err)
Expand All @@ -314,7 +376,7 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) {

resultChan <- chunkStreams
}
}()
}(i)
}

// Send work to the workers
Expand Down Expand Up @@ -345,6 +407,10 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) {
// Store the result in the cache before returning
db.Cache.Set(cacheKey, streams)

if debug {
log.Println("[DEBUG] Streams retrieved and cached successfully.")
}

return streams, nil
}

Expand Down
33 changes: 30 additions & 3 deletions m3u/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"m3u-stream-merger/utils"
"net/http"
"net/url"
"os"
"strings"
)

Expand All @@ -32,12 +33,22 @@ func GenerateStreamURL(baseUrl string, slug string, sampleUrl string) string {
}

func GenerateM3UContent(w http.ResponseWriter, r *http.Request, db *database.Instance) {
debug := os.Getenv("DEBUG") == "true"

if debug {
log.Println("[DEBUG] Generating M3U content")
}

streams, err := db.GetStreams()
if err != nil {
log.Println(fmt.Errorf("GetStreams error: %v", err))
}

w.Header().Set("Content-Type", "text/plain") // Set the Content-Type header to M3U
if debug {
log.Printf("[DEBUG] Retrieved %d streams\n", len(streams))
}

w.Header().Set("Content-Type", "text/plain")
w.Header().Set("Access-Control-Allow-Origin", "*")

baseUrl := ""
Expand All @@ -47,6 +58,10 @@ func GenerateM3UContent(w http.ResponseWriter, r *http.Request, db *database.Ins
baseUrl = fmt.Sprintf("https://%s/stream", r.Host)
}

if debug {
log.Printf("[DEBUG] Base URL set to %s\n", baseUrl)
}

_, err = fmt.Fprintf(w, "#EXTM3U\n")
if err != nil {
log.Println(fmt.Errorf("Fprintf error: %v", err))
Expand All @@ -57,17 +72,29 @@ func GenerateM3UContent(w http.ResponseWriter, r *http.Request, db *database.Ins
continue
}

// Write #EXTINF line
if debug {
log.Printf("[DEBUG] Processing stream with TVG ID: %s\n", stream.TvgID)
}

_, err := fmt.Fprintf(w, "#EXTINF:-1 channelID=\"x-ID.%s\" tvg-chno=\"%s\" tvg-id=\"%s\" tvg-name=\"%s\" tvg-logo=\"%s\" group-title=\"%s\",%s\n",
stream.TvgID, stream.TvgChNo, stream.TvgID, stream.Title, stream.LogoURL, stream.Group, stream.Title)
if err != nil {
if debug {
log.Printf("[DEBUG] Error writing #EXTINF line for stream %s: %v\n", stream.TvgID, err)
}
continue
}

// Write stream URL
_, err = fmt.Fprintf(w, "%s", GenerateStreamURL(baseUrl, stream.Slug, stream.URLs[0]))
if err != nil {
if debug {
log.Printf("[DEBUG] Error writing stream URL for stream %s: %v\n", stream.TvgID, err)
}
continue
}
}

if debug {
log.Println("[DEBUG] Finished generating M3U content")
}
}
Loading