#!/usr/bin/env python # coding: utf-8 from __future__ import unicode_literals # Allow direct execution import os import sys import unittest sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import contextlib import gzip import io import ssl import tempfile import threading import zlib # avoid deprecated alias assertRaisesRegexp if hasattr(unittest.TestCase, 'assertRaisesRegex'): unittest.TestCase.assertRaisesRegexp = unittest.TestCase.assertRaisesRegex try: import brotli except ImportError: brotli = None try: from urllib.request import pathname2url except ImportError: from urllib import pathname2url from youtube_dl.compat import ( compat_http_cookiejar_Cookie, compat_http_server, compat_str as str, compat_urllib_error, compat_urllib_HTTPError, compat_urllib_parse, compat_urllib_request, ) from youtube_dl.utils import ( sanitized_Request, update_Request, urlencode_postdata, ) from test.helper import ( expectedFailureIf, FakeYDL, FakeLogger, http_server_port, ) from youtube_dl import YoutubeDL TEST_DIR = os.path.dirname(os.path.abspath(__file__)) class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): protocol_version = 'HTTP/1.1' # work-around old/new -style class inheritance def super(self, meth_name, *args, **kwargs): from types import MethodType try: super() fn = lambda s, m, *a, **k: getattr(super(), m)(*a, **k) except TypeError: fn = lambda s, m, *a, **k: getattr(compat_http_server.BaseHTTPRequestHandler, m)(s, *a, **k) self.super = MethodType(fn, self) return self.super(meth_name, *args, **kwargs) def log_message(self, format, *args): pass def _headers(self): payload = str(self.headers).encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Content-Length', str(len(payload))) self.end_headers() self.wfile.write(payload) def _redirect(self): self.send_response(int(self.path[len('/redirect_'):])) self.send_header('Location', '/method') self.send_header('Content-Length', '0') self.end_headers() def _method(self, method, payload=None): self.send_response(200) self.send_header('Content-Length', str(len(payload or ''))) self.send_header('Method', method) self.end_headers() if payload: self.wfile.write(payload) def _status(self, status): payload = '<html>{0} NOT FOUND</html>'.format(status).encode('utf-8') self.send_response(int(status)) self.send_header('Content-Type', 'text/html; charset=utf-8') self.send_header('Content-Length', str(len(payload))) self.end_headers() self.wfile.write(payload) def _read_data(self): if 'Content-Length' in self.headers: return self.rfile.read(int(self.headers['Content-Length'])) def _test_url(self, path, host='127.0.0.1', scheme='http', port=None): return '{0}://{1}:{2}/{3}'.format( scheme, host, port if port is not None else http_server_port(self.server), path) def do_POST(self): data = self._read_data() if self.path.startswith('/redirect_'): self._redirect() elif self.path.startswith('/method'): self._method('POST', data) elif self.path.startswith('/headers'): self._headers() else: self._status(404) def do_HEAD(self): if self.path.startswith('/redirect_'): self._redirect() elif self.path.startswith('/method'): self._method('HEAD') else: self._status(404) def do_PUT(self): data = self._read_data() if self.path.startswith('/redirect_'): self._redirect() elif self.path.startswith('/method'): self._method('PUT', data) else: self._status(404) def do_GET(self): def respond(payload=b'<html><video src="/vid.mp4" /></html>', payload_type='text/html; charset=utf-8', payload_encoding=None, resp_code=200): self.send_response(resp_code) self.send_header('Content-Type', payload_type) if payload_encoding: self.send_header('Content-Encoding', payload_encoding) self.send_header('Content-Length', str(len(payload))) # required for persistent connections self.end_headers() self.wfile.write(payload) def gzip_compress(p): buf = io.BytesIO() with contextlib.closing(gzip.GzipFile(fileobj=buf, mode='wb')) as f: f.write(p) return buf.getvalue() if self.path == '/video.html': respond() elif self.path == '/vid.mp4': respond(b'\x00\x00\x00\x00\x20\x66\x74[video]', 'video/mp4') elif self.path == '/302': if sys.version_info[0] == 3: # XXX: Python 3 http server does not allow non-ASCII header values self.send_response(404) self.end_headers() return new_url = self._test_url('中文.html') self.send_response(302) self.send_header(b'Location', new_url.encode('utf-8')) self.end_headers() elif self.path == '/%E4%B8%AD%E6%96%87.html': respond() elif self.path == '/%c7%9f': respond() elif self.path == '/redirect_dotsegments': self.send_response(301) # redirect to /headers but with dot segments before self.send_header('Location', '/a/b/./../../headers') self.send_header('Content-Length', '0') self.end_headers() elif self.path.startswith('/redirect_'): self._redirect() elif self.path.startswith('/method'): self._method('GET') elif self.path.startswith('/headers'): self._headers() elif self.path.startswith('/308-to-headers'): self.send_response(308) self.send_header('Location', '/headers') self.send_header('Content-Length', '0') self.end_headers() elif self.path == '/trailing_garbage': payload = b'<html><video src="/vid.mp4" /></html>' compressed = gzip_compress(payload) + b'trailing garbage' respond(compressed, payload_encoding='gzip') elif self.path == '/302-non-ascii-redirect': new_url = self._test_url('中文.html') # actually respond with permanent redirect self.send_response(301) self.send_header('Location', new_url) self.send_header('Content-Length', '0') self.end_headers() elif self.path == '/content-encoding': encodings = self.headers.get('ytdl-encoding', '') payload = b'<html><video src="/vid.mp4" /></html>' for encoding in filter(None, (e.strip() for e in encodings.split(','))): if encoding == 'br' and brotli: payload = brotli.compress(payload) elif encoding == 'gzip': payload = gzip_compress(payload) elif encoding == 'deflate': payload = zlib.compress(payload) elif encoding == 'unsupported': payload = b'raw' break else: self._status(415) return respond(payload, payload_encoding=encodings) else: self._status(404) def send_header(self, keyword, value): """ Forcibly allow HTTP server to send non percent-encoded non-ASCII characters in headers. This is against what is defined in RFC 3986: but we need to test that we support this since some sites incorrectly do this. """ if keyword.lower() == 'connection': return self.super('send_header', keyword, value) if not hasattr(self, '_headers_buffer'): self._headers_buffer = [] self._headers_buffer.append('{0}: {1}\r\n'.format(keyword, value).encode('utf-8')) def end_headers(self): if hasattr(self, '_headers_buffer'): self.wfile.write(b''.join(self._headers_buffer)) self._headers_buffer = [] self.super('end_headers') class TestHTTP(unittest.TestCase): # when does it make sense to check the SSL certificate? _check_cert = ( sys.version_info >= (3, 2) or (sys.version_info[0] == 2 and sys.version_info[1:] >= (7, 19))) def setUp(self): # HTTP server self.http_httpd = compat_http_server.HTTPServer( ('127.0.0.1', 0), HTTPTestRequestHandler) self.http_port = http_server_port(self.http_httpd) self.http_server_thread = threading.Thread(target=self.http_httpd.serve_forever) self.http_server_thread.daemon = True self.http_server_thread.start() try: from http.server import ThreadingHTTPServer except ImportError: try: from socketserver import ThreadingMixIn except ImportError: from SocketServer import ThreadingMixIn class ThreadingHTTPServer(ThreadingMixIn, compat_http_server.HTTPServer): pass # HTTPS server certfn = os.path.join(TEST_DIR, 'testcert.pem') self.https_httpd = ThreadingHTTPServer( ('127.0.0.1', 0), HTTPTestRequestHandler) try: sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslctx.verify_mode = ssl.CERT_NONE sslctx.check_hostname = False sslctx.load_cert_chain(certfn, None) self.https_httpd.socket = sslctx.wrap_socket( self.https_httpd.socket, server_side=True) except AttributeError: self.https_httpd.socket = ssl.wrap_socket( self.https_httpd.socket, certfile=certfn, server_side=True) self.https_port = http_server_port(self.https_httpd) self.https_server_thread = threading.Thread(target=self.https_httpd.serve_forever) self.https_server_thread.daemon = True self.https_server_thread.start() def tearDown(self): def closer(svr): def _closer(): svr.shutdown() svr.server_close() return _closer shutdown_thread = threading.Thread(target=closer(self.http_httpd)) shutdown_thread.start() self.http_server_thread.join(2.0) shutdown_thread = threading.Thread(target=closer(self.https_httpd)) shutdown_thread.start() self.https_server_thread.join(2.0) def _test_url(self, path, host='127.0.0.1', scheme='http', port=None): return '{0}://{1}:{2}/{3}'.format( scheme, host, port if port is not None else self.https_port if scheme == 'https' else self.http_port, path) @unittest.skipUnless(_check_cert, 'No support for certificate check in SSL') def test_nocheckcertificate(self): with FakeYDL({'logger': FakeLogger()}) as ydl: with self.assertRaises(compat_urllib_error.URLError): ydl.urlopen(sanitized_Request(self._test_url('headers', scheme='https'))) with FakeYDL({'logger': FakeLogger(), 'nocheckcertificate': True}) as ydl: r = ydl.urlopen(sanitized_Request(self._test_url('headers', scheme='https'))) self.assertEqual(r.getcode(), 200) r.close() def test_percent_encode(self): with FakeYDL() as ydl: # Unicode characters should be encoded with uppercase percent-encoding res = ydl.urlopen(sanitized_Request(self._test_url('中文.html'))) self.assertEqual(res.getcode(), 200) res.close() # don't normalize existing percent encodings res = ydl.urlopen(sanitized_Request(self._test_url('%c7%9f'))) self.assertEqual(res.getcode(), 200) res.close() def test_unicode_path_redirection(self): with FakeYDL() as ydl: r = ydl.urlopen(sanitized_Request(self._test_url('302-non-ascii-redirect'))) self.assertEqual(r.url, self._test_url('%E4%B8%AD%E6%96%87.html')) r.close() def test_redirect(self): with FakeYDL() as ydl: def do_req(redirect_status, method, check_no_content=False): data = b'testdata' if method in ('POST', 'PUT') else None res = ydl.urlopen(sanitized_Request( self._test_url('redirect_{0}'.format(redirect_status)), method=method, data=data)) if check_no_content: self.assertNotIn('Content-Type', res.headers) return res.read().decode('utf-8'), res.headers.get('method', '') # A 303 must either use GET or HEAD for subsequent request self.assertEqual(do_req(303, 'POST'), ('', 'GET')) self.assertEqual(do_req(303, 'HEAD'), ('', 'HEAD')) self.assertEqual(do_req(303, 'PUT'), ('', 'GET')) # 301 and 302 turn POST only into a GET, with no Content-Type self.assertEqual(do_req(301, 'POST', True), ('', 'GET')) self.assertEqual(do_req(301, 'HEAD'), ('', 'HEAD')) self.assertEqual(do_req(302, 'POST', True), ('', 'GET')) self.assertEqual(do_req(302, 'HEAD'), ('', 'HEAD')) self.assertEqual(do_req(301, 'PUT'), ('testdata', 'PUT')) self.assertEqual(do_req(302, 'PUT'), ('testdata', 'PUT')) # 307 and 308 should not change method for m in ('POST', 'PUT'): self.assertEqual(do_req(307, m), ('testdata', m)) self.assertEqual(do_req(308, m), ('testdata', m)) self.assertEqual(do_req(307, 'HEAD'), ('', 'HEAD')) self.assertEqual(do_req(308, 'HEAD'), ('', 'HEAD')) # These should not redirect and instead raise an HTTPError for code in (300, 304, 305, 306): with self.assertRaises(compat_urllib_HTTPError): do_req(code, 'GET') # Jython 2.7.1 times out for some reason @expectedFailureIf(sys.platform.startswith('java') and sys.version_info < (2, 7, 2)) def test_content_type(self): # https://github.com/yt-dlp/yt-dlp/commit/379a4f161d4ad3e40932dcf5aca6e6fb9715ab28 with FakeYDL({'nocheckcertificate': True}) as ydl: # method should be auto-detected as POST r = sanitized_Request(self._test_url('headers', scheme='https'), data=urlencode_postdata({'test': 'test'})) headers = ydl.urlopen(r).read().decode('utf-8') self.assertIn('Content-Type: application/x-www-form-urlencoded', headers) # test http r = sanitized_Request(self._test_url('headers'), data=urlencode_postdata({'test': 'test'})) headers = ydl.urlopen(r).read().decode('utf-8') self.assertIn('Content-Type: application/x-www-form-urlencoded', headers) def test_update_req(self): req = sanitized_Request('http://example.com') assert req.data is None assert req.get_method() == 'GET' assert not req.has_header('Content-Type') # Test that zero-byte payloads will be sent req = update_Request(req, data=b'') assert req.data == b'' assert req.get_method() == 'POST' # yt-dl expects data to be encoded and Content-Type to be added by sender # assert req.get_header('Content-Type') == 'application/x-www-form-urlencoded' def test_cookiejar(self): with FakeYDL() as ydl: ydl.cookiejar.set_cookie(compat_http_cookiejar_Cookie( 0, 'test', 'ytdl', None, False, '127.0.0.1', True, False, '/headers', True, False, None, False, None, None, {})) data = ydl.urlopen(sanitized_Request( self._test_url('headers'))).read().decode('utf-8') self.assertIn('Cookie: test=ytdl', data) def test_passed_cookie_header(self): # We should accept a Cookie header being passed as in normal headers and handle it appropriately. with FakeYDL() as ydl: # Specified Cookie header should be used res = ydl.urlopen(sanitized_Request( self._test_url('headers'), headers={'Cookie': 'test=test'})).read().decode('utf-8') self.assertIn('Cookie: test=test', res) # Specified Cookie header should be removed on any redirect res = ydl.urlopen(sanitized_Request( self._test_url('308-to-headers'), headers={'Cookie': 'test=test'})).read().decode('utf-8') self.assertNotIn('Cookie: test=test', res) # Specified Cookie header should override global cookiejar for that request ydl.cookiejar.set_cookie(compat_http_cookiejar_Cookie( 0, 'test', 'ytdlp', None, False, '127.0.0.1', True, False, '/headers', True, False, None, False, None, None, {})) data = ydl.urlopen(sanitized_Request( self._test_url('headers'), headers={'Cookie': 'test=test'})).read().decode('utf-8') self.assertNotIn('Cookie: test=ytdlp', data) self.assertIn('Cookie: test=test', data) def test_no_compression_compat_header(self): with FakeYDL() as ydl: data = ydl.urlopen( sanitized_Request( self._test_url('headers'), headers={'Youtubedl-no-compression': True})).read() self.assertIn(b'Accept-Encoding: identity', data) self.assertNotIn(b'youtubedl-no-compression', data.lower()) def test_gzip_trailing_garbage(self): # https://github.com/ytdl-org/youtube-dl/commit/aa3e950764337ef9800c936f4de89b31c00dfcf5 # https://github.com/ytdl-org/youtube-dl/commit/6f2ec15cee79d35dba065677cad9da7491ec6e6f with FakeYDL() as ydl: data = ydl.urlopen(sanitized_Request(self._test_url('trailing_garbage'))).read().decode('utf-8') self.assertEqual(data, '<html><video src="/vid.mp4" /></html>') def __test_compression(self, encoding): with FakeYDL() as ydl: res = ydl.urlopen( sanitized_Request( self._test_url('content-encoding'), headers={'ytdl-encoding': encoding})) # decoded encodings are removed: only check for valid decompressed data self.assertEqual(res.read(), b'<html><video src="/vid.mp4" /></html>') @unittest.skipUnless(brotli, 'brotli support is not installed') def test_brotli(self): self.__test_compression('br') def test_deflate(self): self.__test_compression('deflate') def test_gzip(self): self.__test_compression('gzip') def test_multiple_encodings(self): # https://www.rfc-editor.org/rfc/rfc9110.html#section-8.4 for pair in ('gzip,deflate', 'deflate, gzip', 'gzip, gzip', 'deflate, deflate'): self.__test_compression(pair) def test_unsupported_encoding(self): # it should return the raw content with FakeYDL() as ydl: res = ydl.urlopen( sanitized_Request( self._test_url('content-encoding'), headers={'ytdl-encoding': 'unsupported'})) self.assertEqual(res.headers.get('Content-Encoding'), 'unsupported') self.assertEqual(res.read(), b'raw') def test_remove_dot_segments(self): with FakeYDL() as ydl: res = ydl.urlopen(sanitized_Request(self._test_url('a/b/./../../headers'))) self.assertEqual(compat_urllib_parse.urlparse(res.geturl()).path, '/headers') res = ydl.urlopen(sanitized_Request(self._test_url('redirect_dotsegments'))) self.assertEqual(compat_urllib_parse.urlparse(res.geturl()).path, '/headers') def _build_proxy_handler(name): class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler): proxy_name = name def log_message(self, format, *args): pass def do_GET(self): self.send_response(200) self.send_header('Content-Type', 'text/plain; charset=utf-8') self.end_headers() self.wfile.write('{0}: {1}'.format(self.proxy_name, self.path).encode('utf-8')) return HTTPTestRequestHandler class TestProxy(unittest.TestCase): def setUp(self): self.proxy = compat_http_server.HTTPServer( ('127.0.0.1', 0), _build_proxy_handler('normal')) self.port = http_server_port(self.proxy) self.proxy_thread = threading.Thread(target=self.proxy.serve_forever) self.proxy_thread.daemon = True self.proxy_thread.start() self.geo_proxy = compat_http_server.HTTPServer( ('127.0.0.1', 0), _build_proxy_handler('geo')) self.geo_port = http_server_port(self.geo_proxy) self.geo_proxy_thread = threading.Thread(target=self.geo_proxy.serve_forever) self.geo_proxy_thread.daemon = True self.geo_proxy_thread.start() def tearDown(self): def closer(svr): def _closer(): svr.shutdown() svr.server_close() return _closer shutdown_thread = threading.Thread(target=closer(self.proxy)) shutdown_thread.start() self.proxy_thread.join(2.0) shutdown_thread = threading.Thread(target=closer(self.geo_proxy)) shutdown_thread.start() self.geo_proxy_thread.join(2.0) def _test_proxy(self, host='127.0.0.1', port=None): return '{0}:{1}'.format( host, port if port is not None else self.port) def test_proxy(self): geo_proxy = self._test_proxy(port=self.geo_port) ydl = YoutubeDL({ 'proxy': self._test_proxy(), 'geo_verification_proxy': geo_proxy, }) url = 'http://foo.com/bar' response = ydl.urlopen(url).read().decode('utf-8') self.assertEqual(response, 'normal: {0}'.format(url)) req = compat_urllib_request.Request(url) req.add_header('Ytdl-request-proxy', geo_proxy) response = ydl.urlopen(req).read().decode('utf-8') self.assertEqual(response, 'geo: {0}'.format(url)) def test_proxy_with_idn(self): ydl = YoutubeDL({ 'proxy': self._test_proxy(), }) url = 'http://中文.tw/' response = ydl.urlopen(url).read().decode('utf-8') # b'xn--fiq228c' is '中文'.encode('idna') self.assertEqual(response, 'normal: http://xn--fiq228c.tw/') class TestFileURL(unittest.TestCase): # See https://github.com/ytdl-org/youtube-dl/issues/8227 def test_file_urls(self): tf = tempfile.NamedTemporaryFile(delete=False) tf.write(b'foobar') tf.close() url = compat_urllib_parse.urljoin('file://', pathname2url(tf.name)) with FakeYDL() as ydl: self.assertRaisesRegexp( compat_urllib_error.URLError, 'file:// scheme is explicitly disabled in youtube-dl for security reasons', ydl.urlopen, url) # not yet implemented """ with FakeYDL({'enable_file_urls': True}) as ydl: res = ydl.urlopen(url) self.assertEqual(res.read(), b'foobar') res.close() """ os.unlink(tf.name) if __name__ == '__main__': unittest.main()