Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework cache for M3U playlist handler and adds SAFE_LOGS for easier bug reports #124

Merged
merged 9 commits into from
Aug 26, 2024
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,14 @@ Access the generated M3U playlist at `http://<server ip>:8080/playlist.m3u`.
| BUFFER_MB | Set buffer size in mb. | 0 (no buffer) | Any positive integer |
| INCLUDE_GROUPS_1, INCLUDE_GROUPS_2, INCLUDE_GROUPS_X | Set channel groups to include | all | Comma-separated values |
| TITLE_SUBSTR_FILTER | Sets a regex pattern used to exclude substrings from channel titles | none | Go regexp |
| BASE_URL | Sets the base URL for the stream URls in the M3U file to be generated. | http/s://<request_hostname> (e.g. <http://192.168.1.10:8080>) | Any string that follows the URL format |
| TZ | Set timezone | Etc/UTC | [TZ Identifiers](https://nodatime.org/TimeZones) |
| SYNC_CRON | Set cron schedule expression of the background updates. | 0 0 * * * | Any valid cron expression |
| SYNC_ON_BOOT | Set if an initial background syncing will be executed on boot | true | true/false |
| CACHE_ON_SYNC | Set if an initial background cache building will be executed after sync | false | true/false |
| CACHE_ON_SYNC | Set if an initial background cache building will be executed after sync. Requires BASE_URL to be set. | false | true/false |
| CLEAR_ON_BOOT | Set if an initial database clearing will be executed on boot | false | true/false |
| DEBUG | Set if verbose logging is enabled | false | true/false |
| SAFE_LOGS | Set if sensitive info are removed from logs. Always enable this if submitting a log publicly. | false | true/false |


## Endpoints
Expand Down
54 changes: 0 additions & 54 deletions database/cache.go

This file was deleted.

56 changes: 12 additions & 44 deletions database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"m3u-stream-merger/utils"
"math"
"os"
"strconv"
Expand All @@ -16,7 +17,6 @@ import (
type Instance struct {
Redis *redis.Client
Ctx context.Context
Cache *Cache
}

func InitializeDb(addr string, password string, db int) (*Instance, error) {
Expand Down Expand Up @@ -47,16 +47,14 @@ func InitializeDb(addr string, password string, db int) (*Instance, error) {
return nil, fmt.Errorf("error connecting to Redis: %v", err)
}

return &Instance{Redis: redisInstance, Ctx: context.Background(), Cache: NewCache()}, nil
return &Instance{Redis: redisInstance, Ctx: context.Background()}, nil
}

func (db *Instance) ClearDb() error {
if err := db.Redis.FlushDB(db.Ctx).Err(); err != nil {
return fmt.Errorf("error clearing Redis: %v", err)
}

db.Cache.Clear("streams_sorted_cache")

return nil
}

Expand All @@ -76,7 +74,7 @@ func (db *Instance) SaveToDb(streams []StreamInfo) error {
}

if debug {
log.Printf("[DEBUG] Preparing to set data for stream key %s: %v\n", streamKey, streamData)
utils.SafeLogPrintf(nil, nil, "[DEBUG] Preparing to set data for stream key %s: %v\n", streamKey, streamData)
}

pipeline.HSet(db.Ctx, streamKey, streamData)
Expand All @@ -85,7 +83,7 @@ func (db *Instance) SaveToDb(streams []StreamInfo) error {
streamURLKey := fmt.Sprintf("stream:%s:url:%d", s.Slug, index)

if debug {
log.Printf("[DEBUG] Preparing to set URL for key %s: %s\n", streamURLKey, u)
utils.SafeLogPrintf(nil, nil, "[DEBUG] Preparing to set URL for key %s: %s\n", streamURLKey, u)
}

pipeline.Set(db.Ctx, streamURLKey, u, 0)
Expand All @@ -95,7 +93,7 @@ func (db *Instance) SaveToDb(streams []StreamInfo) error {
sortScore := calculateSortScore(s)

if debug {
log.Printf("[DEBUG] Adding to sorted set with score %f and member %s\n", sortScore, streamKey)
utils.SafeLogPrintf(nil, nil, "[DEBUG] Adding to sorted set with score %f and member %s\n", sortScore, streamKey)
}

pipeline.ZAdd(db.Ctx, "streams_sorted", redis.Z{
Expand All @@ -119,12 +117,6 @@ func (db *Instance) SaveToDb(streams []StreamInfo) error {
}
}

db.Cache.Clear("streams_sorted_cache")

if debug {
log.Println("[DEBUG] Cache cleared.")
}

return nil
}
func (db *Instance) DeleteStreamBySlug(slug string) error {
Expand Down Expand Up @@ -160,7 +152,6 @@ func (db *Instance) DeleteStreamBySlug(slug string) error {
return fmt.Errorf("error deleting stream from Redis: %v", err)
}

db.Cache.Clear("streams_sorted_cache")
return nil
}

Expand All @@ -169,7 +160,6 @@ func (db *Instance) DeleteStreamURL(s StreamInfo, m3uIndex int) error {
return fmt.Errorf("error deleting stream URL from Redis: %v", err)
}

db.Cache.Clear("streams_sorted_cache")
return nil
}

Expand Down Expand Up @@ -241,37 +231,18 @@ func (db *Instance) GetStreams() <-chan StreamInfo {
go func() {
defer close(streamChan)

// Check if the data is in the cache
cacheKey := "streams_sorted_cache"
if data, found := db.Cache.Get(cacheKey); found {
if debug {
log.Printf("[DEBUG] Cache hit for key %s\n", cacheKey)
}
for _, stream := range data {
streamChan <- stream
}
return
}

if debug {
log.Println("[DEBUG] Cache miss. Retrieving streams from Redis...")
}

keys, err := db.Redis.ZRange(db.Ctx, "streams_sorted", 0, -1).Result()
if err != nil {
log.Printf("error retrieving streams: %v", err)
utils.SafeLogPrintf(nil, nil, "error retrieving streams: %v", err)
return
}

// Store the result in the cache
var streams []StreamInfo

// Filter out URL keys
for _, key := range keys {
if !strings.Contains(key, ":url:") {
streamData, err := db.Redis.HGetAll(db.Ctx, key).Result()
if err != nil {
log.Printf("error retrieving stream data: %v", err)
utils.SafeLogPrintf(nil, nil, "error retrieving stream data: %v", err)
return
}

Expand All @@ -287,40 +258,37 @@ func (db *Instance) GetStreams() <-chan StreamInfo {
}

if debug {
log.Printf("[DEBUG] Processing stream: %v\n", stream)
utils.SafeLogPrintf(nil, nil, "[DEBUG] Processing stream: %v\n", stream)
}

urlKeys, err := db.Redis.Keys(db.Ctx, fmt.Sprintf("%s:url:*", key)).Result()
if err != nil {
log.Printf("error finding URLs for stream: %v", err)
utils.SafeLogPrintf(nil, nil, "error finding URLs for stream: %v", err)
return
}

for _, urlKey := range urlKeys {
urlData, err := db.Redis.Get(db.Ctx, urlKey).Result()
if err != nil {
log.Printf("error getting URL data from Redis: %v", err)
utils.SafeLogPrintf(nil, nil, "error getting URL data from Redis: %v", err)
return
}

m3uIndex, err := strconv.Atoi(extractM3UIndex(urlKey))
if err != nil {
log.Printf("m3u index is not an integer: %v", err)
utils.SafeLogPrintf(nil, nil, "m3u index is not an integer: %v", err)
return
}
stream.URLs[m3uIndex] = urlData
}

// Send the stream to the channel
streams = append(streams, stream)
streamChan <- stream
}
}

db.Cache.Set(cacheKey, streams)

if debug {
log.Println("[DEBUG] Streams retrieved and cached successfully.")
log.Println("[DEBUG] Streams retrieved successfully.")
}
}()

Expand Down
111 changes: 76 additions & 35 deletions m3u/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,17 @@ import (
"net/url"
"os"
"strings"
"sync"
)

type Cache struct {
sync.Mutex
data string
Revalidating bool
}

var M3uCache = &Cache{}

func getFileExtensionFromUrl(rawUrl string) (string, error) {
u, err := url.Parse(rawUrl)
if err != nil {
Expand All @@ -32,35 +41,34 @@ func GenerateStreamURL(baseUrl string, slug string, sampleUrl string) string {
return fmt.Sprintf("%s/%s.%s\n", baseUrl, utils.GetStreamUrl(slug), ext)
}

func GenerateM3UContent(w http.ResponseWriter, r *http.Request, db *database.Instance) {
func GenerateAndCacheM3UContent(db *database.Instance, r *http.Request) string {
debug := os.Getenv("DEBUG") == "true"

if debug {
log.Println("[DEBUG] Generating M3U content")
log.Println("[DEBUG] Regenerating M3U cache in the background")
}

// Set response headers
w.Header().Set("Content-Type", "text/plain")
w.Header().Set("Access-Control-Allow-Origin", "*")
var content string

baseUrl := ""
if r.TLS == nil {
baseUrl = fmt.Sprintf("http://%s/stream", r.Host)
} else {
baseUrl = fmt.Sprintf("https://%s/stream", r.Host)
baseUrl := "" // Setup base URL logic
if r != nil {
if r.TLS == nil {
baseUrl = fmt.Sprintf("http://%s/stream", r.Host)
} else {
baseUrl = fmt.Sprintf("https://%s/stream", r.Host)
}
}

if debug {
log.Printf("[DEBUG] Base URL set to %s\n", baseUrl)
if customBase, ok := os.LookupEnv("BASE_URL"); ok {
customBase = strings.TrimSuffix(customBase, "/")
baseUrl = fmt.Sprintf("%s/stream", customBase)
}

// Write the M3U header
_, err := fmt.Fprintf(w, "#EXTM3U\n")
if err != nil {
log.Println(fmt.Errorf("Fprintf error: %v", err))
return
if debug {
utils.SafeLogPrintf(r, nil, "[DEBUG] Base URL set to %s\n", baseUrl)
}

content += "#EXTM3U\n"

// Retrieve the streams from the database using channels
streamChan := db.GetStreams()
for stream := range streamChan {
Expand All @@ -69,30 +77,63 @@ func GenerateM3UContent(w http.ResponseWriter, r *http.Request, db *database.Ins
}

if debug {
log.Printf("[DEBUG] Processing stream with TVG ID: %s\n", stream.TvgID)
utils.SafeLogPrintf(nil, nil, "[DEBUG] Processing stream with TVG ID: %s\n", stream.TvgID)
}

// Write the stream info to the response
_, err := fmt.Fprintf(w, "#EXTINF:-1 channelID=\"x-ID.%s\" tvg-chno=\"%s\" tvg-id=\"%s\" tvg-name=\"%s\" tvg-logo=\"%s\" group-title=\"%s\",%s\n",
// Append the stream info to content
content += fmt.Sprintf("#EXTINF:-1 channelID=\"x-ID.%s\" tvg-chno=\"%s\" tvg-id=\"%s\" tvg-name=\"%s\" tvg-logo=\"%s\" group-title=\"%s\",%s\n",
stream.TvgID, stream.TvgChNo, stream.TvgID, stream.Title, stream.LogoURL, stream.Group, stream.Title)
if err != nil {
if debug {
log.Printf("[DEBUG] Error writing #EXTINF line for stream %s: %v\n", stream.TvgID, err)
}
continue
}

// Write the actual stream URL to the response
_, err = fmt.Fprintf(w, "%s", GenerateStreamURL(baseUrl, stream.Slug, stream.URLs[0]))
if err != nil {
if debug {
log.Printf("[DEBUG] Error writing stream URL for stream %s: %v\n", stream.TvgID, err)
}
continue
}
// Append the actual stream URL to content
content += GenerateStreamURL(baseUrl, stream.Slug, stream.URLs[0])
}

if debug {
log.Println("[DEBUG] Finished generating M3U content")
}

// Update cache
M3uCache.Lock()
M3uCache.data = content
M3uCache.Revalidating = false
M3uCache.Unlock()

return content
}

func Handler(w http.ResponseWriter, r *http.Request, db *database.Instance) {
debug := os.Getenv("DEBUG") == "true"

if debug {
log.Println("[DEBUG] Generating M3U content")
}

// Set response headers
w.Header().Set("Content-Type", "text/plain")
w.Header().Set("Access-Control-Allow-Origin", "*")

M3uCache.Lock()
cacheData := M3uCache.data
M3uCache.Unlock()

// serve old cache and regenerate in the background
if cacheData != "" {
if debug {
log.Println("[DEBUG] Serving old cache and regenerating in background")
}
_, _ = w.Write([]byte(cacheData))
if !M3uCache.Revalidating {
M3uCache.Revalidating = true
go GenerateAndCacheM3UContent(db, r)
} else {
if debug {
log.Println("[DEBUG] Cache revalidation is already in progress. Skipping.")
}
}
return
}

// If no valid cache, generate content and update cache
content := GenerateAndCacheM3UContent(db, r)
_, _ = w.Write([]byte(content))
}
Loading
Loading