wip2
This commit is contained in:
@@ -28,13 +28,20 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
// Checker выполняет проверку плейлистов и каналов.
|
||||
type Checker struct {
|
||||
tagBlocks []tagfile.TagBlock
|
||||
ctx = context.Background()
|
||||
)
|
||||
}
|
||||
|
||||
// NewChecker создаёт новый экземпляр Checker.
|
||||
func NewChecker(tagsPath string) *Checker {
|
||||
return &Checker{
|
||||
tagBlocks: tagfile.Init(tagsPath),
|
||||
}
|
||||
}
|
||||
|
||||
// PrepareListsToCheck готовит список плейлистов для проверки
|
||||
func PrepareListsToCheck(files []string, urls []string, codes []string) []playlist.Playlist {
|
||||
func (c *Checker) PrepareListsToCheck(files []string, urls []string, codes []string) []playlist.Playlist {
|
||||
var lists []playlist.Playlist
|
||||
|
||||
if len(files) > 0 {
|
||||
@@ -51,7 +58,11 @@ func PrepareListsToCheck(files []string, urls []string, codes []string) []playli
|
||||
|
||||
if len(urls) > 0 {
|
||||
for _, url := range urls {
|
||||
pls, _ := playlist.MakeFromUrl(url)
|
||||
pls, err := playlist.MakeFromUrl(url)
|
||||
if err != nil {
|
||||
log.Printf("Warning: %s, skipping\n", err)
|
||||
continue
|
||||
}
|
||||
lists = append(lists, pls)
|
||||
}
|
||||
}
|
||||
@@ -74,7 +85,7 @@ func PrepareListsToCheck(files []string, urls []string, codes []string) []playli
|
||||
}
|
||||
} else {
|
||||
if app.Config.Cache.IsActive {
|
||||
cachedLists := getCachedPlaylists()
|
||||
cachedLists := c.getCachedPlaylists()
|
||||
for key := range ini.Lists {
|
||||
if _, ok := cachedLists[key]; ok {
|
||||
continue
|
||||
@@ -97,20 +108,30 @@ func PrepareListsToCheck(files []string, urls []string, codes []string) []playli
|
||||
}
|
||||
|
||||
// getCachedPlaylists возвращает из кеша проверенные ранее плейлисты
|
||||
func getCachedPlaylists() map[string]playlist.Playlist {
|
||||
func (c *Checker) getCachedPlaylists() map[string]playlist.Playlist {
|
||||
result := make(map[string]playlist.Playlist)
|
||||
keys := app.Cache.Keys(ctx, "*")
|
||||
for _, key := range keys.Val() {
|
||||
value := app.Cache.Get(ctx, key).Val()
|
||||
ctx := context.Background()
|
||||
iter := app.Cache.Scan(ctx, 0, "*", 100).Iterator()
|
||||
for iter.Next(ctx) {
|
||||
key := iter.Val()
|
||||
value, err := app.Cache.Get(ctx, key).Result()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
var pls playlist.Playlist
|
||||
_ = json.Unmarshal([]byte(value), &pls)
|
||||
if err := json.Unmarshal([]byte(value), &pls); err != nil {
|
||||
continue
|
||||
}
|
||||
result[pls.Code] = pls
|
||||
}
|
||||
if err := iter.Err(); err != nil {
|
||||
log.Printf("Error scanning cache: %s", err)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// CheckPlaylists проверяет плейлисты и возвращает их же с результатами проверки
|
||||
func CheckPlaylists(lists []playlist.Playlist) (int, int) {
|
||||
func (c *Checker) CheckPlaylists(ctx context.Context, lists []playlist.Playlist) (int, int) {
|
||||
count := len(lists)
|
||||
if count == 0 {
|
||||
log.Println("There are no playlists to check")
|
||||
@@ -119,7 +140,6 @@ func CheckPlaylists(lists []playlist.Playlist) (int, int) {
|
||||
|
||||
log.Printf("%d playlists will be checked\n", len(lists))
|
||||
step, onlineCount, offlineCount := 0, 0, 0
|
||||
tagBlocks = tagfile.Init(app.Args.TagsPath)
|
||||
|
||||
for idx := range lists {
|
||||
pls := lists[idx]
|
||||
@@ -146,7 +166,7 @@ func CheckPlaylists(lists []playlist.Playlist) (int, int) {
|
||||
if err != nil {
|
||||
log.Printf("Cannot read playlist [%s]: %s\n", pls.Url, err)
|
||||
offlineCount++
|
||||
cachePlaylist(pls)
|
||||
c.cachePlaylist(pls)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -156,15 +176,15 @@ func CheckPlaylists(lists []playlist.Playlist) (int, int) {
|
||||
pls = pls.Parse()
|
||||
|
||||
log.Printf("Parsed, checking channels (%d)...\n", len(pls.Channels))
|
||||
pls = CheckChannels(pls)
|
||||
pls = c.CheckChannels(ctx, pls)
|
||||
lists[idx] = pls
|
||||
cachePlaylist(pls)
|
||||
c.cachePlaylist(pls)
|
||||
}
|
||||
|
||||
return onlineCount, offlineCount
|
||||
}
|
||||
|
||||
func cachePlaylist(pls playlist.Playlist) {
|
||||
func (c *Checker) cachePlaylist(pls playlist.Playlist) {
|
||||
if !app.Config.Cache.IsActive {
|
||||
return
|
||||
}
|
||||
@@ -172,19 +192,27 @@ func cachePlaylist(pls playlist.Playlist) {
|
||||
jsonBytes, err := json.Marshal(pls)
|
||||
if err != nil {
|
||||
log.Printf("Error while saving playlist to cache: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
key := pls.Code
|
||||
if key == "" {
|
||||
key = "raw:" + utils.Md5str(pls.Url)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
ttl := time.Duration(app.Config.Cache.Ttl) * time.Second
|
||||
written := app.Cache.Set(ctx, pls.Code, string(jsonBytes), ttl)
|
||||
written := app.Cache.Set(ctx, key, string(jsonBytes), ttl)
|
||||
if written.Err() != nil {
|
||||
log.Printf("Error while saving playlist to cache: %s", err)
|
||||
log.Printf("Error while saving playlist to cache: %s", written.Err())
|
||||
return
|
||||
}
|
||||
|
||||
log.Println("Cached sucessfully")
|
||||
}
|
||||
|
||||
// CheckChannels проверяет каналы и возвращает их же с результатами проверки
|
||||
func CheckChannels(pls playlist.Playlist) playlist.Playlist {
|
||||
func (c *Checker) CheckChannels(ctx context.Context, pls playlist.Playlist) playlist.Playlist {
|
||||
type errorData struct {
|
||||
tvChannel playlist.Channel
|
||||
err error
|
||||
@@ -200,7 +228,13 @@ func CheckChannels(pls playlist.Playlist) playlist.Playlist {
|
||||
pls.OfflineCount = 0
|
||||
|
||||
timeout, routines := calcParameters(count)
|
||||
httpClient := http.Client{Timeout: timeout}
|
||||
tr := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
}
|
||||
httpClient := http.Client{
|
||||
Timeout: timeout,
|
||||
Transport: tr,
|
||||
}
|
||||
chSemaphores := make(chan struct{}, routines)
|
||||
chOnline := make(chan playlist.Channel, len(pls.Channels))
|
||||
chOffline := make(chan playlist.Channel, len(pls.Channels))
|
||||
@@ -211,16 +245,18 @@ func CheckChannels(pls playlist.Playlist) playlist.Playlist {
|
||||
for _, tvChannel := range pls.Channels {
|
||||
wg.Add(1)
|
||||
go func(tvChannel playlist.Channel) {
|
||||
chSemaphores <- struct{}{}
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Printf("Panic while checking channel '%s': %v", tvChannel.Title, r)
|
||||
}
|
||||
<-chSemaphores
|
||||
wg.Done()
|
||||
}()
|
||||
chSemaphores <- struct{}{}
|
||||
|
||||
tvChannel.Tags = getTagsForChannel(tvChannel)
|
||||
tvChannel.Tags = c.getTagsForChannel(tvChannel)
|
||||
|
||||
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
|
||||
req, err := http.NewRequest("GET", tvChannel.URL, nil)
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", tvChannel.URL, nil)
|
||||
if err != nil {
|
||||
data := errorData{tvChannel: tvChannel, err: err}
|
||||
chError <- data
|
||||
@@ -241,7 +277,13 @@ func CheckChannels(pls playlist.Playlist) playlist.Playlist {
|
||||
tvChannel.IsOnline = tvChannel.Status < http.StatusBadRequest
|
||||
tvChannel.ContentType = resp.Header.Get("Content-Type")
|
||||
chunk := io.LimitReader(resp.Body, 512) // just for sure
|
||||
bodyBytes, _ := io.ReadAll(chunk)
|
||||
bodyBytes, err := io.ReadAll(chunk)
|
||||
if err != nil {
|
||||
_ = resp.Body.Close()
|
||||
data := errorData{tvChannel: tvChannel, err: err}
|
||||
chError <- data
|
||||
return
|
||||
}
|
||||
bodyString := string(bodyBytes)
|
||||
_ = resp.Body.Close()
|
||||
contentType := http.DetectContentType(bodyBytes)
|
||||
@@ -325,6 +367,10 @@ func CheckChannels(pls playlist.Playlist) playlist.Playlist {
|
||||
|
||||
// calcParameters вычисляет оптимальное количество горутин и таймаут запроса
|
||||
func calcParameters(count int) (time.Duration, int) {
|
||||
if count <= 0 {
|
||||
return 10 * time.Second, 1
|
||||
}
|
||||
|
||||
routines := count
|
||||
if routines > 3000 {
|
||||
routines = 3000
|
||||
@@ -360,10 +406,10 @@ func calcParameters(count int) (time.Duration, int) {
|
||||
}
|
||||
|
||||
// getTagsForChannel ищет и возвращает теги для канала
|
||||
func getTagsForChannel(tvChannel playlist.Channel) []string {
|
||||
func (c *Checker) getTagsForChannel(tvChannel playlist.Channel) []string {
|
||||
var foundTags []string
|
||||
|
||||
for _, block := range tagBlocks {
|
||||
for _, block := range c.tagBlocks {
|
||||
tags := block.GetTags(tvChannel)
|
||||
if tags != nil {
|
||||
foundTags = append(foundTags, tags...)
|
||||
|
||||
Reference in New Issue
Block a user