Skip to content

Commit

Permalink
Merge pull request #91 from sonroyaalmerol/debug-logs
Browse files Browse the repository at this point in the history
Add more debug logs
  • Loading branch information
sonroyaalmerol committed Aug 20, 2024
2 parents 8b7d4bd + 33268ab commit 2e6dc93
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 19 deletions.
74 changes: 70 additions & 4 deletions database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package database
import (
"context"
"fmt"
"log"
"math"
"os"
"strconv"
Expand Down Expand Up @@ -61,6 +62,8 @@ func (db *Instance) ClearDb() error {
}

func (db *Instance) SaveToDb(streams []StreamInfo) error {
var debug = os.Getenv("DEBUG") == "true"

pipeline := db.Redis.Pipeline()

for _, s := range streams {
Expand All @@ -72,32 +75,59 @@ func (db *Instance) SaveToDb(streams []StreamInfo) error {
"logo_url": s.LogoURL,
"group_name": s.Group,
}

if debug {
log.Printf("[DEBUG] Preparing to set data for stream key %s: %v\n", streamKey, streamData)
}

pipeline.HSet(db.Ctx, streamKey, streamData)

for index, u := range s.URLs {
streamURLKey := fmt.Sprintf("stream:%s:url:%d", s.Slug, index)

if debug {
log.Printf("[DEBUG] Preparing to set URL for key %s: %s\n", streamURLKey, u)
}

pipeline.Set(db.Ctx, streamURLKey, u, 0)
}

// Add to the sorted set
sortScore := calculateSortScore(s)

if debug {
log.Printf("[DEBUG] Adding to sorted set with score %f and member %s\n", sortScore, streamKey)
}

pipeline.ZAdd(db.Ctx, "streams_sorted", redis.Z{
Score: sortScore,
Member: streamKey,
})
}

if len(streams) > 0 {
if debug {
log.Println("[DEBUG] Executing pipeline...")
}

_, err := pipeline.Exec(db.Ctx)
if err != nil {
return fmt.Errorf("SaveToDb error: %v", err)
}

if debug {
log.Println("[DEBUG] Pipeline executed successfully.")
}
}

db.Cache.Clear("streams_sorted_cache")

if debug {
log.Println("[DEBUG] Cache cleared.")
}

return nil
}

func (db *Instance) DeleteStreamBySlug(slug string) error {
streamKey := fmt.Sprintf("stream:%s", slug)

Expand Down Expand Up @@ -204,12 +234,21 @@ func (db *Instance) GetStreamBySlug(slug string) (StreamInfo, error) {
}

func (db *Instance) GetStreams() ([]StreamInfo, error) {
var debug = os.Getenv("DEBUG") == "true"

// Check if the data is in the cache
cacheKey := "streams_sorted_cache"
if data, found := db.Cache.Get(cacheKey); found {
if debug {
log.Printf("[DEBUG] Cache hit for key %s\n", cacheKey)
}
return data, nil
}

if debug {
log.Println("[DEBUG] Cache miss. Retrieving streams from Redis...")
}

keys, err := db.Redis.ZRange(db.Ctx, "streams_sorted", 0, -1).Result()
if err != nil {
return nil, fmt.Errorf("error retrieving streams: %v", err)
Expand All @@ -223,6 +262,10 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) {
}
}

if debug {
log.Printf("[DEBUG] Filtered stream keys: %v\n", streamKeys)
}

// Split the stream keys into chunks
chunkSize := 100
var chunks [][]string
Expand All @@ -234,28 +277,39 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) {
chunks = append(chunks, streamKeys[i:end])
}

if debug {
log.Printf("[DEBUG] Chunks created: %d chunks\n", len(chunks))
}

// Create channels for work distribution and results collection
workChan := make(chan []string, len(chunks))
resultChan := make(chan []StreamInfo, len(chunks))
errChan := make(chan error, len(chunks))

// Define the number of workers
parserWorkers := os.Getenv("PARSER_WORKERS")
if parserWorkers != "" {
if parserWorkers == "" {
parserWorkers = "5"
}
numWorkers, err := strconv.Atoi(parserWorkers)
if err != nil {
numWorkers = 5
}

if debug {
log.Printf("[DEBUG] Number of workers: %d\n", numWorkers)
}

var wg sync.WaitGroup

// Start the worker pool
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
go func(workerID int) {
defer wg.Done()
if debug {
log.Printf("[DEBUG] Worker %d started\n", workerID)
}
for chunk := range workChan {
pipe := db.Redis.Pipeline()
cmds := make([]*redis.MapStringStringCmd, len(chunk))
Expand All @@ -264,6 +318,10 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) {
cmds[i] = pipe.HGetAll(db.Ctx, key)
}

if debug {
log.Printf("[DEBUG] Executing pipeline for chunk: %v\n", chunk)
}

_, err := pipe.Exec(db.Ctx)
if err != nil {
errChan <- fmt.Errorf("error executing Redis pipeline: %v", err)
Expand All @@ -288,6 +346,10 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) {
URLs: map[int]string{},
}

if debug {
log.Printf("[DEBUG] Processing stream: %v\n", stream)
}

urlKeys, err := db.Redis.Keys(db.Ctx, fmt.Sprintf("%s:url:*", chunk[i])).Result()
if err != nil {
errChan <- fmt.Errorf("error finding URLs for stream: %v", err)
Expand All @@ -314,7 +376,7 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) {

resultChan <- chunkStreams
}
}()
}(i)
}

// Send work to the workers
Expand Down Expand Up @@ -345,6 +407,10 @@ func (db *Instance) GetStreams() ([]StreamInfo, error) {
// Store the result in the cache before returning
db.Cache.Set(cacheKey, streams)

if debug {
log.Println("[DEBUG] Streams retrieved and cached successfully.")
}

return streams, nil
}

Expand Down
33 changes: 30 additions & 3 deletions m3u/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"m3u-stream-merger/utils"
"net/http"
"net/url"
"os"
"strings"
)

Expand All @@ -32,12 +33,22 @@ func GenerateStreamURL(baseUrl string, slug string, sampleUrl string) string {
}

func GenerateM3UContent(w http.ResponseWriter, r *http.Request, db *database.Instance) {
debug := os.Getenv("DEBUG") == "true"

if debug {
log.Println("[DEBUG] Generating M3U content")
}

streams, err := db.GetStreams()
if err != nil {
log.Println(fmt.Errorf("GetStreams error: %v", err))
}

w.Header().Set("Content-Type", "text/plain") // Set the Content-Type header to M3U
if debug {
log.Printf("[DEBUG] Retrieved %d streams\n", len(streams))
}

w.Header().Set("Content-Type", "text/plain")
w.Header().Set("Access-Control-Allow-Origin", "*")

baseUrl := ""
Expand All @@ -47,6 +58,10 @@ func GenerateM3UContent(w http.ResponseWriter, r *http.Request, db *database.Ins
baseUrl = fmt.Sprintf("https://%s/stream", r.Host)
}

if debug {
log.Printf("[DEBUG] Base URL set to %s\n", baseUrl)
}

_, err = fmt.Fprintf(w, "#EXTM3U\n")
if err != nil {
log.Println(fmt.Errorf("Fprintf error: %v", err))
Expand All @@ -57,17 +72,29 @@ func GenerateM3UContent(w http.ResponseWriter, r *http.Request, db *database.Ins
continue
}

// Write #EXTINF line
if debug {
log.Printf("[DEBUG] Processing stream with TVG ID: %s\n", stream.TvgID)
}

_, err := fmt.Fprintf(w, "#EXTINF:-1 channelID=\"x-ID.%s\" tvg-chno=\"%s\" tvg-id=\"%s\" tvg-name=\"%s\" tvg-logo=\"%s\" group-title=\"%s\",%s\n",
stream.TvgID, stream.TvgChNo, stream.TvgID, stream.Title, stream.LogoURL, stream.Group, stream.Title)
if err != nil {
if debug {
log.Printf("[DEBUG] Error writing #EXTINF line for stream %s: %v\n", stream.TvgID, err)
}
continue
}

// Write stream URL
_, err = fmt.Fprintf(w, "%s", GenerateStreamURL(baseUrl, stream.Slug, stream.URLs[0]))
if err != nil {
if debug {
log.Printf("[DEBUG] Error writing stream URL for stream %s: %v\n", stream.TvgID, err)
}
continue
}
}

if debug {
log.Println("[DEBUG] Finished generating M3U content")
}
}
Loading

0 comments on commit 2e6dc93

Please sign in to comment.