# -*- coding: utf-8 -*- # Module: AppleTV+ Downloader # Created on: 20-12-2020 # Authors: JUNi # Version: 1.0 import sys, os, time import urllib.parse import re, io, base64, requests, math import subprocess, pathlib, glob import ffmpy, json, html, http import shutil from unidecode import unidecode from m3u8 import parse as m3u8parser from os.path import isfile, join import pywidevine.clients.appletv.client as APTVCLient import pywidevine.clients.appletv.config as APTVConfig from pywidevine.clients.appletv.downloader import WvDownloader from pywidevine.clients.appletv.config import WvDownloaderConfig from pywidevine.muxer.muxer import Muxer currentFile = 'appletv' realPath = os.path.realpath(currentFile) dirPath = os.path.dirname(realPath) session = requests.Session() def main(args): if not os.path.exists(dirPath + '/KEYS'): os.makedirs(dirPath + '/KEYS') else: keys_file = dirPath + '/KEYS/APPLETVPLUS.txt' try: keys_file_apple = open(keys_file, 'r', encoding='utf8') keys_file_txt = keys_file_apple.readlines() except Exception: with open(keys_file, 'a', encoding='utf8') as (file): file.write('##### One KEY per line. #####\n') keys_file_apple = open(keys_file, 'r', encoding='utf8') keys_file_txt = keys_file_apple.readlines() def replace_words(x): x = re.sub(r'[]¡!"#$%\'()*+,:;<=>¿?@\\^_`{|}~[-]', '', x) x = x.replace('\\', '').replace('/', ' & ') return unidecode(x) def ReplaceCodeLanguages(X): X = X.lower() X = X.replace('_subtitle_dialog_0', '').replace('_narrative_dialog_0', '').replace('_caption_dialog_0', '').replace('_dialog_0', '').replace('_descriptive_0', '_descriptive').replace('_descriptive', '_descriptive').replace('_sdh', '-sdh').replace('es-es', 'es').replace('en-es', 'es').replace('kn-in', 'kn').replace('gu-in', 'gu').replace('ja-jp', 'ja').replace('mni-in', 'mni').replace('si-in', 'si').replace('as-in', 'as').replace('ml-in', 'ml').replace('sv-se', 'sv').replace('hy-hy', 'hy').replace('sv-sv', 'sv').replace('da-da', 'da').replace('fi-fi', 'fi').replace('nb-nb', 'nb').replace('is-is', 'is').replace('uk-uk', 'uk').replace('hu-hu', 'hu').replace('bg-bg', 'bg').replace('hr-hr', 'hr').replace('lt-lt', 'lt').replace('et-et', 'et').replace('el-el', 'el').replace('he-he', 'he').replace('ar-ar', 'ar').replace('fa-fa', 'fa').replace('ro-ro', 'ro').replace('sr-sr', 'sr').replace('cs-cs', 'cs').replace('sk-sk', 'sk').replace('mk-mk', 'mk').replace('hi-hi', 'hi').replace('bn-bn', 'bn').replace('ur-ur', 'ur').replace('pa-pa', 'pa').replace('ta-ta', 'ta').replace('te-te', 'te').replace('mr-mr', 'mr').replace('kn-kn', 'kn').replace('gu-gu', 'gu').replace('ml-ml', 'ml').replace('si-si', 'si').replace('as-as', 'as').replace('mni-mni', 'mni').replace('tl-tl', 'tl').replace('id-id', 'id').replace('ms-ms', 'ms').replace('vi-vi', 'vi').replace('th-th', 'th').replace('km-km', 'km').replace('ko-ko', 'ko').replace('zh-zh', 'zh').replace('ja-ja', 'ja').replace('ru-ru', 'ru').replace('tr-tr', 'tr').replace('it-it', 'it').replace('es-mx', 'es-la').replace('ar-sa', 'ar').replace('zh-cn', 'zh').replace('nl-nl', 'nl').replace('pl-pl', 'pl').replace('pt-pt', 'pt').replace('hi-in', 'hi').replace('mr-in', 'mr').replace('bn-in', 'bn').replace('te-in', 'te').replace('cmn-hans', 'zh-hans').replace('cmn-hant', 'zh-hant').replace('ko-kr', 'ko').replace('en-au', 'en').replace('es-419', 'es-la').replace('es-us', 'es-la').replace('en-us', 'en').replace('en-gb', 'en').replace('fr-fr', 'fr').replace('de-de', 'de').replace('las-419', 'es-la').replace('ar-ae', 'ar').replace('da-dk', 'da').replace('yue-hant', 'yue').replace('bn-in', 'bn').replace('ur-in', 'ur').replace('ta-in', 'ta').replace('sl-si', 'sl').replace('cs-cz', 'cs').replace('hi-jp', 'hi').replace('-001', '').replace('en-US', 'en').replace('deu', 'de').replace('eng', 'en').replace('ca-es', 'cat').replace('fil-ph', 'fil').replace('en-ca', 'en').replace('eu-es', 'eu').replace('ar-eg', 'ar').replace('he-il', 'he').replace('el-gr', 'he').replace('nb-no', 'nb').replace('es-ar', 'es-la').replace('en-ph', 'en').replace('sq-al', 'sq').replace('bs-ba', 'bs').replace('pt-BR', 'pt-br') return X def ReplaceSubs1(X): pattern1 = re.compile('(?!|||<\\/i>|<\\/b>|<\\/u>)(<)(?:[A-Za-z0-9_ -=]*)(>)') pattern2 = re.compile('(?!<\\/i>|<\\/b>|<\\/u>)(<\\/)(?:[A-Za-z0-9_ -=]*)(>)') X = X.replace('‏', '').replace('{\\an1}', '').replace('{\\an2}', '').replace('{\\an3}', '').replace('{\\an4}', '').replace('{\\an5}', '').replace('{\\an6}', '').replace('{\\an7}', '').replace('{\\an8}', '').replace('{\\an9}', '').replace('‎', '') X = pattern1.sub('', X) X = pattern2.sub('', X) return X def mediainfo_(file): mediainfo_output = subprocess.Popen([APTVConfig.MEDIAINFO, '--Output=JSON', '-f', file], stdout=(subprocess.PIPE)) mediainfo_json = json.load(mediainfo_output.stdout) return mediainfo_json def alphanumericSort(l): def convert(text): if text.isdigit(): return int(text) else: return text def alphanum_key(key): return [convert(c) for c in re.split('([0-9]+)', key)] return sorted(l, key=alphanum_key) def convert_size(size_bytes): if size_bytes == 0: return '0bps' else: s = round(size_bytes / 1000, 0) return '%ikbps' % s def get_size(size): power = 1024 n = 0 Dic_powerN = {0:'', 1:'K', 2:'M', 3:'G', 4:'T'} while size > power: size /= power n += 1 return str(round(size, 2)) + Dic_powerN[n] + 'B' def getKeyId(name): mp4dump = subprocess.Popen([APTVConfig.MP4DUMP, name], stdout=(subprocess.PIPE)) mp4dump = str(mp4dump.stdout.read()) A = find_str(mp4dump, 'default_KID') KEY_ID_ORI = '' KEY_ID_ORI = mp4dump[A:A + 63].replace('default_KID = ', '').replace('[', '').replace(']', '').replace(' ', '') if KEY_ID_ORI == '' or KEY_ID_ORI == "'": KEY_ID_ORI = 'nothing' return KEY_ID_ORI def find_str(s, char): index = 0 if char in s: c = char[0] for ch in s: if ch == c: if s[index:index + len(char)] == char: return index index += 1 return -1 AUTH_HEADERS, COOKIES = APTVCLient.get_auth_headers(args.url_season) def get_pssh(x): urlm3u8_request = requests.get(x).text m3u8_json = m3u8parser(urlm3u8_request) pssh = '' for typekey in m3u8_json['keys']: try: if typekey['keyformat'] == 'urn:uuid:edef8ba9-79d6-4ace-a3c8-27dcd51d21ed': pssh = typekey['uri'].replace('data:text/plain;base64,', '') except Exception: continue return pssh def download_subs(filename, sub_url): m3u8_json = m3u8parser(requests.get(sub_url).text) urls = [] for segment in m3u8_json['segments']: urls.append(segment['uri']) print('\n' + filename) aria2c_infile = "" num_segments = len(urls) digits = math.floor(math.log10(num_segments)) + 1 for (i, url) in enumerate(urls): aria2c_infile += f"{url}\n" aria2c_infile += f"\tout={filename}.{i:0{digits}d}.webvtt\n" aria2c_infile += f"\tdir={filename}\n" subprocess.run([APTVConfig.ARIA2C, "--allow-overwrite=true", "-q", "--file-allocation=none", "--console-log-level=warn", "--download-result=hide", "--summary-interval=0", "-x16", "-j16", "-s1", "-i-"], input=aria2c_infile.encode("utf-8")) source_files = pathlib.Path(filename).rglob(r'./*.webvtt') with open(filename + '.webvtt', mode='wb') as (destination): for vtt in source_files: with open(vtt, mode='rb') as (source): shutil.copyfileobj(source, destination) if os.path.exists(filename): shutil.rmtree(filename) print('Converting subtitles...') for f in glob.glob(f'{filename}*.webvtt'): with open(f, 'r+', encoding='utf-8-sig') as (x): old = x.read().replace('STYLE\n::cue() {\nfont-family: Arial, Helvetica, sans-serif;\n}', '').replace('WEBVTT', '').replace('X-TIMESTAMP-MAP=LOCAL:00:00:00.000,MPEGTS:9000', '').replace('X-TIMESTAMP-MAP=MPEGTS:900000,LOCAL:00:00:00.000', '').replace('\n\n\n', '\n') with open(f, 'w+', encoding='utf-8-sig') as (x): x.write(ReplaceSubs1(old)) SubtitleEdit_process = subprocess.Popen([APTVConfig.SUBTITLE_EDIT, '/convert', filename + ".webvtt", "srt"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).wait() for f in glob.glob(f'{filename}*.webvtt'): os.remove(f) print('Done!') def m3u8_parsing(m3u8_url): master_base_url = re.split('(/)(?i)', m3u8_url) del master_base_url[-1] master_base_url = ''.join(master_base_url) urlm3u8_master_request = requests.get(m3u8_url, headers=AUTH_HEADERS["wvHeaders"]).text m3u8_master_json = m3u8parser(urlm3u8_master_request) audio_List = [] audio_dict = {} subs_list = [] for media in m3u8_master_json['media']: if media['type'] == 'AUDIO': isAD = False try: if 'public.accessibility.' in media['characteristics']: isAD = True except Exception: isAD = False group_id = media['group_id'].replace('HE-stereo', 'aac').replace('stereo', 'aac') m = re.search(r"&g=(.+?)&", media['uri']) mx = re.search(r"audio-(.+?)(-|_)", group_id) Bitrate = m[1] Codec = mx[1] if int(Bitrate) > 2000: Bitrate = Bitrate[1:] if isAD: LanguageID = ReplaceCodeLanguages(media['language']) + '-ad' else: LanguageID = ReplaceCodeLanguages(media['language']) if isAD: trackType = 'NARRATION' else: trackType = 'PRIMARY' if 'https://' not in media['uri']: full_url = master_base_url + media['uri'] Full_URL_Type = False else: full_url = media['uri'] Full_URL_Type = True audio_dict = { 'trackType':trackType, 'Language':media['name'], 'LanguageID':LanguageID, 'Profile':group_id, 'Codec':Codec, 'Channels':media['channels'].replace('"', '').replace('/JOC', ''), 'isAD':isAD, 'url':full_url, 'Bitrate':Bitrate} if args.audiocodec: if audio_dict['Codec'] == str(args.audiocodec): if '-ap' in audio_dict['Profile']: audio_List.append(audio_dict) else: audio_List.append(audio_dict) if media['type'] == 'SUBTITLES': isCC = False try: if 'public.accessibility.' in media['characteristics']: isCC = True except Exception: isCC = False if '_NORMAL_' in media['uri']: isCC = False isNormal = False if isCC: LanguageID = ReplaceCodeLanguages(media['language']) + '-sdh' trackType = 'SDH' else: trackType = 'NORMAL' isNormal = True LanguageID = ReplaceCodeLanguages(media['language']) isForced = False if media['forced'] == 'YES': isForced = True isNormal = False trackType = 'FORCED' media['name'] = media['name'].replace('--forced--', '-forced') LanguageID = ReplaceCodeLanguages(media['language']) + '-forced' if 'https://' not in media['uri']: full_url = master_base_url + media['uri'] Full_URL_Type = False else: full_url = media['uri'] Full_URL_Type = True subs_dict = { 'trackType':trackType, 'Language':media['name'], 'LanguageID':LanguageID, 'Profile':media['group_id'], 'isCC':isCC, 'isForced':isForced, 'isNormal':isNormal, 'url':full_url} subs_list.append(subs_dict) video_list = [] for playlist in m3u8_master_json['playlists']: def get_height(width, height): if width == '1920': return '1920', '1080' if width == '1356': return '1280', '720' elif width == '864': return '640', '480' elif width == '3840': return '4096', '2160' else: return width, height video_range = playlist['stream_info']['video_range'].replace('"', '') resolution = re.split('x', playlist['stream_info']['resolution']) width, height = get_height(resolution[0], resolution[-1]) codecs = None codecs = re.search(r',(avc.+?|dvh1.+?|hvc1.+?)$', playlist['stream_info']['codecs']) if codecs == None: codecs = re.search(r'(avc.+?|dvh1.+?|hvc1.+?),', playlist['stream_info']['codecs']) if 'https://' not in playlist['uri']: full_url = master_base_url + playlist['uri'] Full_URL_Type = False else: full_url = playlist['uri'] Full_URL_Type = True video_dict = { 'Video_Range':video_range, 'Width':width, 'Height':height, 'Codec':codecs[1], 'frame_rate':playlist['stream_info']['frame_rate'], 'url':full_url, 'Bitrate':playlist['stream_info']['bandwidth'], 'Average_Bandwidth':playlist['stream_info']['average_bandwidth']} video_list.append(video_dict) if args.videocodec: if args.videocodec == 'h264': codec_s = 'avc1' if args.videocodec == 'hevc': codec_s = 'hvc1' if args.videocodec == 'hdr': codec_s = 'dvh1' video_list_tmp = [] for x in video_list: if codec_s in x['Codec']: video_list_tmp.append(x) video_list = video_list_tmp # AUDIO LIST # audio_List = sorted(audio_List, key=(lambda k: (str(k['Profile']), str(k['LanguageID']))), reverse=True) if args.only_2ch_audio: audioList_new = [] for x in audio_List: if int(x['Channels']) == 2: audioList_new.append(x) audio_List = audioList_new BitrateList = [] AudioLanguageList = [] for x in audio_List: BitrateList.append(str(x['Profile'])) AudioLanguageList.append(x['LanguageID']) BitrateList = alphanumericSort(list(set(BitrateList))) AudioLanguageList = alphanumericSort(list(set(AudioLanguageList))) audioList_new = [] audio_Dict_new = {} for y in AudioLanguageList: counter = 0 for x in audio_List: if x['LanguageID'] == y and counter == 0: audio_Dict_new = { 'trackType':x['trackType'], 'Language':x['Language'], 'LanguageID':x['LanguageID'], 'Profile':x['Profile'], 'Codec':x['Codec'], 'Channels':x['Channels'], 'isAD':x['isAD'], 'url':x['url'], 'Bitrate':x['Bitrate']} audioList_new.append(audio_Dict_new) counter = counter + 1 audio_List = audioList_new audio_List = sorted(audio_List, key=(lambda k: (str(k['Profile']), str(k['LanguageID'])))) if args.desc_audio: c = 0 while c != len(audio_List): if not audio_List[c]['isAD']: audio_List.remove(audio_List[c]) else: c += 1 else: c = 0 while c != len(audio_List): if audio_List[c]['isAD']: audio_List.remove(audio_List[c]) else: c += 1 audioList_new = [] if args.audiolang: for x in audio_List: langAbbrev = x['LanguageID'] if langAbbrev in list(args.audiolang): audioList_new.append(x) audio_List = audioList_new audio_List = audio_List # SUBTITLE LIST # SubsLanguageList = [] for x in subs_list: SubsLanguageList.append(x['LanguageID']) SubsLanguageList = alphanumericSort(list(set(SubsLanguageList))) subList_new = [] for y in SubsLanguageList: counter = 0 for x in subs_list: if x['LanguageID'] == y and counter == 0: subs_Dict_new = { 'trackType':x['trackType'], 'Language':x['Language'], 'LanguageID':x['LanguageID'], 'Profile':x['Profile'], 'isCC':x['isCC'], 'isForced':x['isForced'], 'isNormal':x['isNormal'], 'url':x['url']} subList_new.append(subs_Dict_new) counter = counter + 1 subs_list = subList_new subs_list_new = [] subs_for_list_new = [] for subs in subs_list: isForced = subs['isForced'] if isForced: subs_for_list_new.append(subs) else: subs_list_new.append(subs) subs_for_list = [] for subs in subs_for_list_new: lang = subs['LanguageID'] if args.forcedlang: if lang in args.forcedlang: subs_for_list.append(subs) else: subs_for_list.append(subs) subs_list = [] for subs in subs_list_new: lang = subs['LanguageID'] if args.sublang: if lang in args.sublang: subs_list.append(subs) else: subs_list.append(subs) subs_list_new = [] subs_list_new = subs_list + subs_for_list subs_list = subs_list_new # VIDEO LIST # video_list = sorted(video_list, key=(lambda k: int(k['Bitrate']))) video_List_temp = video_list height_all = [] for x in video_list: height_all.append(x['Height']) height_all_ord = alphanumericSort(list(set(height_all))) while args.customquality != [] and int(video_list[(-1)]['Height']) > int(args.customquality[0]): video_list.pop(-1) video_list_tmp = [] video_list_tmp.append(video_list[(-1)]) video_list = video_list_tmp return audio_List, video_list, subs_list def m3u8_parser_alt(m3u8_url): master_base_url = re.split('(/)(?i)', m3u8_url) del master_base_url[-1] master_base_url = ''.join(master_base_url) m3u8_master_json = m3u8parser(requests.get(m3u8_url, headers=AUTH_HEADERS["wvHeaders"]).text) for media in m3u8_master_json['media']: audio_List = [] if media['type'] == 'AUDIO': isAD = False try: if 'public.accessibility.' in media['characteristics']: isAD = True except Exception: isAD = False group_id = media['group_id'].replace('HE-stereo', 'aac').replace('stereo', 'aac') m = re.search(r"audio_gr(.+?)_", media['uri']) mx = re.search(r"audio-(.+?)(-|_)", group_id) Bitrate = m[1] Codec = mx[1] if int(Bitrate) > 2000: Bitrate = Bitrate[1:] if isAD: LanguageID = ReplaceCodeLanguages(media['language']) + '-ad' else: LanguageID = ReplaceCodeLanguages(media['language']) if isAD: trackType = 'NARRATION' else: trackType = 'PRIMARY' if 'https://' not in media['uri']: full_url = master_base_url + media['uri'] Full_URL_Type = False else: full_url = media['uri'] Full_URL_Type = True audio_dict = { 'trackType':trackType, 'Language':media['name'], 'LanguageID':LanguageID, 'Profile':group_id, 'Codec':Codec, 'Channels':media['channels'].replace('"', '').replace('/JOC', ''), 'isAD':isAD, 'url':full_url, 'Bitrate':Bitrate} audio_List.append(audio_dict) subs_list = [] if media['type'] == 'SUBTITLES': isCC = False try: if 'public.accessibility.' in media['characteristics']: isCC = True except Exception: isCC = False if '_NORMAL_' in media['uri']: isCC = False isNormal = False if isCC: LanguageID = ReplaceCodeLanguages(media['language']) + '-sdh' trackType = 'SDH' else: trackType = 'NORMAL' isNormal = True LanguageID = ReplaceCodeLanguages(media['language']) isForced = False if media['forced'] == 'YES': isForced = True isNormal = False trackType = 'FORCED' media['name'] = media['name'].replace('--forced--', '-forced') LanguageID = ReplaceCodeLanguages(media['language']) + '-forced' if 'https://' not in media['uri']: full_url = master_base_url + media['uri'] Full_URL_Type = False else: full_url = media['uri'] Full_URL_Type = True subs_dict = { 'trackType':trackType, 'Language':media['name'], 'LanguageID':LanguageID, 'Profile':media['group_id'], 'isCC':isCC, 'isForced':isForced, 'isNormal':isNormal, 'url':full_url} subs_list.append(subs_dict) video_list = [] for playlist in m3u8_master_json['playlists']: def get_height(width, height): if width == '1920': return '1920', '1080' if width == '1356': return '1280', '720' elif width == '864': return '640', '480' elif width == '3840': return '4096', '2160' else: return width, height video_range = playlist['stream_info']['video_range'].replace('"', '') resolution = re.split('x', playlist['stream_info']['resolution']) width, height = get_height(resolution[0], resolution[-1]) codecs = None codecs = re.search(r',(avc.+?|dvh1.+?|hvc1.+?)$', playlist['stream_info']['codecs']) if codecs == None: codecs = re.search(r'(avc.+?|dvh1.+?|hvc1.+?),', playlist['stream_info']['codecs']) if 'https://' not in playlist['uri']: full_url = master_base_url + playlist['uri'] Full_URL_Type = False else: full_url = playlist['uri'] Full_URL_Type = True video_dict = { 'Video_Range':video_range, 'Width':width, 'Height':height, 'Codec':codecs[1], 'frame_rate':playlist['stream_info']['frame_rate'], 'url':full_url, 'Bitrate':playlist['stream_info']['bandwidth'], 'Average_Bandwidth':playlist['stream_info']['average_bandwidth']} video_list.append(video_dict) if args.videocodec: if args.videocodec == 'h264': codec_s = 'avc1' if args.videocodec == 'hevc': codec_s = 'hvc1' if args.videocodec == 'hdr': codec_s = 'dvh1' video_list_tmp = [] for x in video_list: if codec_s in x['Codec']: video_list_tmp.append(x) video_list = video_list_tmp # AUDIO LIST # audio_List = sorted(audio_List, key=(lambda k: (str(k['Profile']), str(k['LanguageID']))), reverse=True) if args.only_2ch_audio: audioList_new = [] for x in audio_List: if int(x['Channels']) == 2: audioList_new.append(x) audio_List = audioList_new BitrateList = [] AudioLanguageList = [] for x in audio_List: BitrateList.append(str(x['Profile'])) AudioLanguageList.append(x['LanguageID']) BitrateList = alphanumericSort(list(set(BitrateList))) AudioLanguageList = alphanumericSort(list(set(AudioLanguageList))) audioList_new = [] audio_Dict_new = {} for y in AudioLanguageList: counter = 0 for x in audio_List: if x['LanguageID'] == y and counter == 0: audio_Dict_new = { 'trackType':x['trackType'], 'Language':x['Language'], 'LanguageID':x['LanguageID'], 'Profile':x['Profile'], 'Codec':x['Codec'], 'Channels':x['Channels'], 'isAD':x['isAD'], 'url':x['url'], 'Bitrate':x['Bitrate']} audioList_new.append(audio_Dict_new) counter = counter + 1 audio_List = audioList_new audio_List = sorted(audio_List, key=(lambda k: (str(k['Profile']), str(k['LanguageID'])))) if args.desc_audio: c = 0 while c != len(audio_List): if not audio_List[c]['isAD']: audio_List.remove(audio_List[c]) else: c += 1 else: c = 0 while c != len(audio_List): if audio_List[c]['isAD']: audio_List.remove(audio_List[c]) else: c += 1 audioList_new = [] if args.audiolang: for x in audio_List: langAbbrev = x['LanguageID'] if langAbbrev in list(args.audiolang): audioList_new.append(x) audio_List = audioList_new audio_List = audio_List # SUBTITLE LIST # SubsLanguageList = [] for x in subs_list: SubsLanguageList.append(x['LanguageID']) SubsLanguageList = alphanumericSort(list(set(SubsLanguageList))) subList_new = [] for y in SubsLanguageList: counter = 0 for x in subs_list: if x['LanguageID'] == y and counter == 0: subs_Dict_new = { 'trackType':x['trackType'], 'Language':x['Language'], 'LanguageID':x['LanguageID'], 'Profile':x['Profile'], 'isCC':x['isCC'], 'isForced':x['isForced'], 'isNormal':x['isNormal'], 'url':x['url']} subList_new.append(subs_Dict_new) counter = counter + 1 subs_list = subList_new subs_list_new = [] subs_for_list_new = [] for subs in subs_list: isForced = subs['isForced'] if isForced: subs_for_list_new.append(subs) else: subs_list_new.append(subs) subs_for_list = [] for subs in subs_for_list_new: lang = subs['LanguageID'] if args.forcedlang: if lang in args.forcedlang: subs_for_list.append(subs) else: subs_for_list.append(subs) subs_list = [] for subs in subs_list_new: lang = subs['LanguageID'] if args.sublang: if lang in args.sublang: subs_list.append(subs) else: subs_list.append(subs) subs_list_new = [] subs_list_new = subs_list + subs_for_list subs_list = subs_list_new # VIDEO LIST # video_list = sorted(video_list, key=(lambda k: int(k['Bitrate']))) video_List_temp = video_list height_all = [] for x in video_list: height_all.append(x['Height']) height_all_ord = alphanumericSort(list(set(height_all))) while args.customquality != [] and int(video_list[(-1)]['Height']) > int(args.customquality[0]): video_list.pop(-1) video_list_tmp = [] video_list_tmp.append(video_list[(-1)]) video_list = video_list_tmp return audio_List, video_list, subs_list def get_episodes(ep_str, num_eps): eps = ep_str.split(',') eps_final = [] for ep in eps: if '-' in ep: (start, end) = ep.split('-') start = int(start) end = int(end or num_eps) eps_final += list(range(start, end + 1)) else: eps_final.append(int(ep)) return eps_final def get_seasons(series_id): APTVConfig.APTV_PARAMS["skip"] = "0" APTVConfig.APTV_PARAMS["count"] = "100" resp = requests.get(url=APTVConfig.APTV_ENDPOINTS["SERIES"] % (series_id), params=APTVConfig.APTV_PARAMS, cookies=COOKIES).json() if not args.season: args.season = 'all' seasons = [] if args.season: if args.season == 'all': seasons = 'all' elif ',' in args.season: seasons = [int(x) for x in args.season.split(',')] elif '-' in args.season: (start, end) = args.season.split('-') seasons = list(range(int(start), int(end) + 1)) else: seasons = [int(args.season)] if 'Episode' in resp['data']['episodes'][0]['type']: if seasons == 'all': seasons_list = [x['seasonNumber'] for x in resp['data']['episodes']] seasons = sorted(set(seasons_list)) for season_num in seasons: for x in resp['data']['episodes']: episodes_list = [] episodes_dict = {} if x['seasonNumber'] == int(season_num): episodes_dict = { 'id':x['id'], 'episode_num':x['episodeNumber']} episodes_list.append(episodes_dict) if args.episodeStart: eps = get_episodes(args.episodeStart, len(episodes_list)) episodes_list = [x for x in episodes_list if x['episode_num'] in eps] for episode in episodes_list: get_metadata(content_id=episode['id']) global folderdownloader if args.output: if not os.path.exists(args.output): os.makedirs(args.output) os.chdir(args.output) if ":" in str(args.output): folderdownloader = str(args.output).replace('/','\\').replace('.\\','\\') else: folderdownloader = dirPath + str(args.output).replace('/','\\').replace('.\\','\\') else: folderdownloader = dirPath.replace('/','\\').replace('.\\','\\') def get_metadata(content_id): resp = requests.get(url=APTVConfig.APTV_ENDPOINTS["PRODUCT"] % (content_id), params=APTVConfig.APTV_PARAMS, cookies=COOKIES)#.json() if int(resp.status_code) == 404: print(resp.text) exit() resp = resp.json() if 'Episode' in resp['data']['content']['type']: aptv_type = "show" seriesTitles = resp['data']['content']['showTitle'] seasonNumber = resp['data']['content']['seasonNumber'] episodeNumber = resp['data']['content']['episodeNumber'] episodeTitle = resp['data']['content']['title'] if 'Movie' in resp['data']['content']['type']: aptv_type = "movie" seriesTitles = resp['data']['content']['title'] if aptv_type=="movie": seriesName = replace_words(seriesTitles) folderName = None if aptv_type=="show": seriesName = f'{replace_words(seriesTitles)} S{seasonNumber:02}E{episodeNumber:02} - {replace_words(episodeTitle)}' folderName = f'{replace_words(seriesTitles)} S{seasonNumber:02}' start_process(seriesName, folderName, aptv_type, get_drm_info(content_id, seriesName)) def start_process(seriesName, folderName, aptv_type, drm_info): try: audio_List, video_list, subs_list = m3u8_parsing(drm_info["manifest"]) except (KeyError, TypeError): audio_List, video_list, subs_list = m3u8_parser_alt(drm_info["manifest"]) video_bandwidth = dict(video_list[(-1)])['Average_Bandwidth'] video_height = str(dict(video_list[(-1)])['Height']) video_width = str(dict(video_list[(-1)])['Width']) video_codec = str(dict(video_list[(-1)])['Codec']) if not args.license: if not args.novideo: print('\nVIDEO - Bitrate: ' + convert_size(int(video_bandwidth)) + ' - Profile: ' + video_codec.split('=')[0] + ' - Dimensions: ' + video_width + 'x' + video_height) print() if not args.noaudio: if audio_List != []: for x in audio_List: audio_bandwidth = x['Bitrate'] audio_lang = x['LanguageID'] audio_codec = x['Codec'] audio_channels = x['Channels'] print('AUDIO - Bitrate: ' + audio_bandwidth + 'kbps - Profile: ' + audio_codec + ' - Channels: ' + audio_channels + ' - Language: ' + audio_lang) print() if not args.nosubs: if subs_list != []: for z in subs_list: trackType = z['trackType'] subs_lang = z['LanguageID'] print('SUBTITLE - Profile: '+ trackType +' - Language: ' + subs_lang) print() print('Name: ' + seriesName) if args.info: pass else: if args.license: keys_video = [] keys_audio = [] try: keys_video = get_keys(get_pssh(video_list[(-1)]['url']), drm_info["license_params"]) except KeyError: print('License request failed, using keys from txt') keys_video = keys_file_txt else: with open(keys_file, "a", encoding="utf8") as file: file.write(seriesName + " (video)" + "\n") print("\n" + seriesName + " (video) " ) for key in keys_video: with open(keys_file, "a", encoding="utf8") as file: file.write(key + "\n") print(key) try: keys_audio = get_keys(get_pssh(audio_List[(-1)]['url']), drm_info["license_params"]) except KeyError: print('License request failed, using keys from txt') keys_audio = keys_file_txt else: with open(keys_file, "a", encoding="utf8") as file: file.write(seriesName + " (audio)" + "\n") print("\n" + seriesName + " (audio)") for key in keys_audio: with open(keys_file, "a", encoding="utf8") as file: file.write(key + "\n") print(key) else: print("\nGetting KEYS...") keys_video = [] keys_audio = [] try: keys_video = get_keys(get_pssh(video_list[(-1)]['url']), drm_info["license_params"]) except KeyError: print('License request failed, using keys from txt') keys_video = keys_file_txt else: with open(keys_file, "a", encoding="utf8") as file: file.write(seriesName + "\n") for key in keys_video: with open(keys_file, "a", encoding="utf8") as file: file.write(key + "\n") try: keys_audio = get_keys(get_pssh(audio_List[(-1)]['url']), drm_info["license_params"]) except KeyError: print('License request failed, using keys from txt') keys_audio = keys_file_txt else: with open(keys_file, "a", encoding="utf8") as file: file.write(seriesName + "\n") for key in keys_audio: with open(keys_file, "a", encoding="utf8") as file: file.write(key + "\n") print("Done!") if aptv_type == 'show': CurrentName = seriesName CurrentHeigh = str(video_height) VideoOutputName = folderdownloader + '\\' + str(folderName) + str(CurrentName) + ' [' + str(CurrentHeigh) + 'p].mkv' else: CurrentName = seriesName CurrentHeigh = str(video_height) VideoOutputName = folderdownloader + str(CurrentName) + ' [' + str(CurrentHeigh) + 'p].mkv' if not os.path.isfile(VideoOutputName): if not args.novideo: inputVideo = CurrentName + ' [' + str(CurrentHeigh) + 'p].mp4' if os.path.isfile(inputVideo): print('\n' + inputVideo + '\nFile has already been successfully downloaded previously.\n') else: for m3u8_video in video_list: video_m3u8_url = m3u8_video['url'] wvdl_cfg = WvDownloaderConfig(video_m3u8_url, inputVideo) wvdownloader = WvDownloader(wvdl_cfg) url_list = wvdownloader.run() CorrectDecryptVideo = False counter = 0 for url_dic in url_list: url_id = url_dic['id'] count_id = str(counter).zfill(4) inputVideoAria = CurrentName + f' [{CurrentHeigh}p]_{count_id}_{url_id}.mp4' CorrectDecryptVideo = DecryptVideo(inputVideo=inputVideoAria, keys_video=keys_video) counter = counter + 1 counter = 0 for url_dic in url_list: url_id = url_dic['id'] url_list = url_dic['url_list'] count_id = str(counter).zfill(4) inputVideoAria = CurrentName + f' [{CurrentHeigh}p]_{count_id}_{url_id}.mp4' os.rename(inputVideoAria, inputVideo) if not args.noaudio: for x in audio_List: audio_m3u8_url = x['url'] langAbbrev = x['LanguageID'] inputAudio = CurrentName + ' ' + '(' + langAbbrev + ').mp4' inputAudiomka = CurrentName + ' ' + '(' + langAbbrev + ').mka' if os.path.isfile(inputAudio) or os.path.isfile(inputAudiomka): print('\n' + inputAudio + '\nFile has already been successfully downloaded previously.\n') else: wvdl_cfg = WvDownloaderConfig(audio_m3u8_url, inputAudio) wvdownloader = WvDownloader(wvdl_cfg) url_list = wvdownloader.run() CorrectDecryptAudio = False counter = 0 for url_dic in url_list: url_id = url_dic['id'] count_id = str(counter).zfill(4) inputAudioAria = CurrentName + f' ({langAbbrev})_{count_id}_{url_id}.mp4' CorrectDecryptAudio = DecryptAudio(inputAudio=inputAudioAria, keys_audio=keys_audio) counter = counter + 1 inputAudioAria = inputAudioAria.replace('.mp4', '.mka') os.rename(inputAudioAria, inputAudiomka) if not args.nosubs: if subs_list != []: for z in subs_list: sub_m3u8_url = z['url'] langAbbrev = z['LanguageID'] inputSubtitle = CurrentName + ' ' + '(' + langAbbrev + ')' if os.path.isfile(inputSubtitle + '.webvtt') or os.path.isfile(inputSubtitle + '.srt'): print('\n' + inputSubtitle + '\nFile has already been successfully downloaded previously.\n') else: download_subs(inputSubtitle, sub_m3u8_url) if not args.novideo or (not args.noaudio): if not args.nomux: if not args.novideo: if not args.noaudio: if CorrectDecryptVideo == True: if CorrectDecryptAudio == True: print('\nMuxing...') do_muxer(CurrentName, folderName, aptv_type, CurrentHeigh, audio_List, subs_list) if args.tag: inputName = CurrentName + ' [' + CurrentHeigh + 'p].mkv' release_group(base_filename=inputName, default_filename=CurrentName, folder_name=folderName, type=aptv_type, video_height=CurrentHeigh) if not args.keep: for f in os.listdir(): if re.fullmatch(re.escape(CurrentName) + r'.*\.(mp4|m4a|h264|h265|eac3|ac3|srt|txt|avs|lwi|mka)', f): os.remove(f) print("Done!") else: print("\nFile '" + str(VideoOutputName) + "' already exists.") def do_muxer(filename, foldername, contentype, video_height, audio_list, subs_list): inputVideo = filename + ' [' + video_height + 'p].mp4' if isfile(inputVideo): video_filename = inputVideo if contentype == 'show': muxed_filename = join(foldername, inputVideo[:-4]) else: muxed_filename = inputVideo[:-4] mkvmerge_command = [ APTVConfig.MKVMERGE, '-o', muxed_filename + '.mkv', ] mkvmerge_command += [ '--language', '0:und', '(', video_filename, ')', ] AudioExtensionsList = [ ".mka", ".eac3", ".m4a", ".dts", ".mp3", ".aac" ] for audio_track in audio_list: for audio_extension in AudioExtensionsList: audio_filename = filename + ' ' + '(' + audio_track["LanguageID"] + ')' + audio_extension if isfile(audio_filename): lang = '0:{}'.format(audio_track["LanguageID"]) lang_name = '0:{}'.format(audio_track["Language"]) if lang == '0:es-la': default = 'yes' else: default = 'no' mkvmerge_command += ["--language", lang, "--track-name", lang_name, "--default-track", f"0:{default}", "(", audio_filename, ")"] for subtitle_track in subs_list: subs_filename = filename + ' ' + '(' + subtitle_track["LanguageID"] + ').srt' if isfile(subs_filename): lang = '0:{}'.format(subtitle_track["LanguageID"]) lang_name = '0:{}'.format(subtitle_track["Language"]) if '-' in lang: newlang = lang.split('-')[0] lang = newlang forced = "yes" if "forced" in lang else "no" mkvmerge_command = mkvmerge_command + ["--language", lang, #"--sync", #"0:-9800", "--sub-charset", "0:UTF-8", "--track-name", lang_name, "--default-track", "0:no", "--forced-track", f"0:{forced}", "(", subs_filename, ")"] mkvmerge_process = subprocess.run(mkvmerge_command) def get_drm_info(id, titleName): resp = requests.get(url=APTVConfig.APTV_ENDPOINTS["CONTENT_DATA"] % (id), params=APTVConfig.APTV_PARAMS, headers=AUTH_HEADERS["wvHeaders"], cookies=COOKIES).json()['data']['content'] if "isComingSoon" in resp["playables"][0]: print('\n' + titleName) print("Release Date " + resp["playables"][0]["comingSoonDate"]) exit() hlsUrl = [] license_params = [] for x in resp["playables"]: if 'assets' in x: hlsUrl = x['assets']['hlsUrl'] license_params = x['assets']['fpsKeyServerQueryParameters'] return {"manifest": hlsUrl, "license_params": license_params} def release_group(base_filename, default_filename, folder_name, type, video_height): if type=='show': video_mkv = os.path.join(folder_name, base_filename) else: video_mkv = base_filename mediainfo = mediainfo_(video_mkv) for v in mediainfo['media']['track']: # mediainfo do video if v['@type'] == 'Video': video_format = v['Format'] video_codec = '' if video_format == "AVC": video_codec = 'H.264' elif video_format == "HEVC": video_codec = 'H.265' for m in mediainfo['media']['track']: # mediainfo do audio if m['@type'] == 'Audio': codec_name = m['Format'] channels_number = m['Channels'] audio_codec = '' audio_channels = '' if codec_name == "AAC": audio_codec = 'AAC' elif codec_name == "AC-3": audio_codec = "DD" elif codec_name == "E-AC-3": audio_codec = "DDP" elif codec_name == "E-AC-3 JOC": audio_codec = "Atmos" if channels_number == "2": audio_channels = "2.0" elif channels_number == "6": audio_channels = "5.1" audio_ = audio_codec + audio_channels # renomear arquivo default_filename = default_filename.replace('&', '.and.') default_filename = re.sub(r'[]!"#$%\'()*+,:;<=>?@\\^_`{|}~[-]', '', default_filename) default_filename = default_filename.replace(' ', '.') default_filename = re.sub(r'\.{2,}', '.', default_filename) output_name = '{}.{}p.ATVP.WEB-DL.{}.{}-{}'.format(default_filename, video_height, audio_, video_codec, args.tag) if type=='show': outputName = os.path.join(folder_name, output_name + '.mkv') else: outputName = output_name + '.mkv' os.rename(video_mkv, outputName) print("{} -> {}".format(base_filename, output_name)) def DecryptVideo(inputVideo, keys_video): key_video_id_original = getKeyId(inputVideo) inputVideo = inputVideo outputVideoTemp = inputVideo.replace('.mp4', '_dec.mp4') outputVideo = inputVideo if key_video_id_original != 'nothing': command_line = [APTVConfig.SHAKA_PACKAGER] command_line.append("-quiet") command_line.append("in={input},stream={stream},output={output},drm_label={drm_label}".format( input=inputVideo, stream="video", output=outputVideoTemp, drm_label="UHD1")) command_line.append("--enable_raw_key_decryption") for key in keys_video: print('\nDecrypting video...') #print('Using KEY: ' + key) command_line.append("--keys") command_line.append("label=UHD1:key_id={key_id}:key={key}".format(key_id="00000000000000000000000000000000", key=key.split(":")[1])) wvdecrypt_process = subprocess.run(command_line) print('\nRemuxing video...') ff = ffmpy.FFmpeg(executable=APTVConfig.FFMPEG, inputs={outputVideoTemp: None}, outputs={outputVideo: '-c copy'}, global_options='-y -hide_banner -loglevel quiet') ff.run() time.sleep(0.05) os.remove(outputVideoTemp) print('Done!') return True def DecryptAudio(inputAudio, keys_audio): key_audio_id_original = getKeyId(inputAudio) outputAudioTemp = inputAudio.replace(".mp4", "_dec.mp4") if key_audio_id_original != 'nothing': command_line = [APTVConfig.SHAKA_PACKAGER] command_line.append("-quiet") command_line.append("in={input},stream={stream},output={output},drm_label={drm_label}".format( input=inputAudio, stream="audio", output=outputAudioTemp, drm_label="HD")) command_line.append("--enable_raw_key_decryption") for key in keys_audio: print('\nDecrypting audio...') #print('Using KEY: ' + key) command_line.append("--keys") command_line.append("label=UHD1:key_id={key_id}:key={key}".format(key_id="00000000000000000000000000000000", key=key.split(":")[1])) wvdecrypt_process = subprocess.run(command_line) time.sleep(0.05) os.remove(inputAudio) print('\nDemuxing audio...') outputAudio = outputAudioTemp.replace('_dec.mp4', '.mka') print('{} -> {}'.format(outputAudioTemp, outputAudio)) ff = ffmpy.FFmpeg(executable=APTVConfig.FFMPEG, inputs={outputAudioTemp: None}, outputs={outputAudio: '-c copy'}, global_options='-y -hide_banner -loglevel warning') ff.run() time.sleep(0.05) os.remove(outputAudioTemp) print('Done!') return True elif key_audio_id_original == 'nothing': return True from pywidevine.decrypt.wvdecryptcustom import WvDecrypt from pywidevine.cdm import cdm, deviceconfig def get_keys(pssh, content_info): cert_b64 = base64.b64encode(requests.post(url=APTVConfig.APTV_ENDPOINTS["WV_CERT"], headers=AUTH_HEADERS["wvHeaders"]).content) wvdecrypt = WvDecrypt(init_data_b64=bytes(pssh.encode()), cert_data_b64=None, device=deviceconfig.device_asus_x00dd) licese_payload = {} licese_payload["challenge"] = base64.b64encode(wvdecrypt.get_challenge()).decode('utf-8') licese_payload["extra-server-parameters"] = content_info licese_payload["key-system"] = "com.widevine.alpha" licese_payload["license-action"] = "start" licese_payload["uri"] = 'data:text/plain;base64,%s' % (pssh) license_rep = requests.post(url=APTVConfig.APTV_ENDPOINTS["WV_LICENSE"], json=licese_payload, headers=AUTH_HEADERS["wvHeaders"]) license_b64 = license_rep.json()['license'] wvdecrypt.update_license(license_b64) status, keys = wvdecrypt.start_process() return keys if not args.url_season: url_season = input('Enter the AppleTV+ url (with https): ') else: url_season = str(args.url_season) m = re.search(r'(umc.cmc.)(.+?)$', url_season) if '/show/' in url_season: get_seasons(m[2]) elif '/movie/' in url_season: get_metadata(m[1] + m[2])