This repository has been archived on 2024-07-02. You can view files and clone it, but cannot push or open issues or pull requests.
NETFLIX-DL-6.1.0/helpers/pssh_generator.py

133 lines
3.9 KiB
Python
Raw Normal View History

2021-12-27 21:54:34 +00:00
from utils.modules.pymp4.parser import Box
from io import BytesIO
import base64
import requests
import uuid
import binascii
import subprocess
import logging
import json
class pssh_generator(object):
def __init__(self, init, **kwargs):
self.init = init
self.logger = logging.getLogger(__name__)
self.proxies = kwargs.get("proxies", None)
self.mp4dumpexe = kwargs.get("mp4dumpexe", None)
def from_kid(self):
array_of_bytes = bytearray(b"\x00\x00\x002pssh\x00\x00\x00\x00")
array_of_bytes.extend(bytes.fromhex("edef8ba979d64acea3c827dcd51d21ed"))
array_of_bytes.extend(b"\x00\x00\x00\x12\x12\x10")
array_of_bytes.extend(bytes.fromhex(self.init.replace("-", "")))
pssh = base64.b64encode(bytes.fromhex(array_of_bytes.hex()))
return pssh.decode()
def Get_PSSH(self):
WV_SYSTEM_ID = "[ed ef 8b a9 79 d6 4a ce a3 c8 27 dc d5 1d 21 ed]"
pssh = None
data = subprocess.check_output(
[self.mp4dumpexe, "--format", "json", "--verbosity", "1", self.init]
)
data = json.loads(data)
for atom in data:
if atom["name"] == "moov":
for child in atom["children"]:
if child["name"] == "pssh":
if child["system_id"] == WV_SYSTEM_ID:
pssh = child["data"][1:-1].replace(" ", "")
pssh = binascii.unhexlify(pssh)
if pssh.startswith(b"\x08\x01"):
pssh = pssh[0:]
pssh = base64.b64encode(pssh).decode("utf-8")
return pssh
if not pssh:
self.logger.error("Error while generate pssh from file.")
return pssh
def get_moov_pssh(self, moov):
while True:
x = Box.parse_stream(moov)
if x.type == b"moov":
for y in x.children:
if y.type == b"pssh" and y.system_ID == uuid.UUID(
"edef8ba9-79d6-4ace-a3c8-27dcd51d21ed"
):
data = base64.b64encode(y.init_data)
return data
def build_init_segment_mp4(self, bytes_):
moov = BytesIO(bytes_)
data = self.get_moov_pssh(moov)
pssh = data.decode("utf-8")
return pssh
def getInitWithRange2(self, headers):
initbytes = requests.get(url=self.init, proxies=self.proxies, headers=headers,)
try:
pssh = self.build_init_segment_mp4(initbytes.content)
return pssh
except Exception as e:
self.logger.info("Error: " + str(e))
return None
def getInitWithRange(self, start: int, end: int):
initbytes = requests.get(
url=self.init,
proxies=self.proxies,
headers={"Range": "bytes={}-{}".format(start, end)},
)
try:
pssh = self.build_init_segment_mp4(initbytes.content)
return pssh
except Exception as e:
self.logger.info("Error: " + str(e))
return None
def loads(self):
req = requests.get(url=self.init, proxies=self.proxies)
initbytes = req.content
try:
pssh = self.build_init_segment_mp4(initbytes)
return pssh
except Exception as e:
self.logger.error("Error: " + str(e))
return None
def load(self):
with open(self.init, "rb") as f:
initbytes = f.read()
try:
pssh = self.build_init_segment_mp4(initbytes)
return pssh
except Exception as e:
self.logger.error("Error: " + str(e))
return None
def from_str(self):
initbytes = self.init
try:
pssh = self.build_init_segment_mp4(initbytes)
return pssh
except Exception as e:
self.logger.info("Error: " + str(e))
return None