speed.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. import asyncio
  2. import http.cookies
  3. import json
  4. import re
  5. import subprocess
  6. from time import time
  7. from urllib.parse import quote, urljoin
  8. import m3u8
  9. from aiohttp import ClientSession, TCPConnector
  10. from multidict import CIMultiDictProxy
  11. import utils.constants as constants
  12. from utils.config import config
  13. from utils.tools import get_resolution_value
  14. from utils.types import TestResult, ChannelTestResult, TestResultCacheData
  15. http.cookies._is_legal_key = lambda _: True
  16. cache: TestResultCacheData = {}
  17. speed_test_timeout = config.speed_test_timeout
  18. speed_test_filter_host = config.speed_test_filter_host
  19. open_filter_resolution = config.open_filter_resolution
  20. min_resolution_value = config.min_resolution_value
  21. max_resolution_value = config.max_resolution_value
  22. open_supply = config.open_supply
  23. open_filter_speed = config.open_filter_speed
  24. min_speed_value = config.min_speed
  25. m3u8_headers = ['application/x-mpegurl', 'application/vnd.apple.mpegurl', 'audio/mpegurl', 'audio/x-mpegurl']
  26. default_ipv6_delay = 0.1
  27. default_ipv6_resolution = "1920x1080"
  28. default_ipv6_result = {
  29. 'speed': float("inf"),
  30. 'delay': default_ipv6_delay,
  31. 'resolution': default_ipv6_resolution
  32. }
  33. async def get_speed_with_download(url: str, headers: dict = None, session: ClientSession = None,
  34. timeout: int = speed_test_timeout) -> dict[
  35. str, float | None]:
  36. """
  37. Get the speed of the url with a total timeout
  38. """
  39. start_time = time()
  40. delay = -1
  41. total_size = 0
  42. if session is None:
  43. session = ClientSession(connector=TCPConnector(ssl=False), trust_env=True)
  44. created_session = True
  45. else:
  46. created_session = False
  47. try:
  48. async with session.get(url, headers=headers, timeout=timeout) as response:
  49. if response.status != 200:
  50. raise Exception("Invalid response")
  51. delay = int(round((time() - start_time) * 1000))
  52. async for chunk in response.content.iter_any():
  53. if chunk:
  54. total_size += len(chunk)
  55. except:
  56. pass
  57. finally:
  58. total_time = time() - start_time
  59. if created_session:
  60. await session.close()
  61. return {
  62. 'speed': total_size / total_time / 1024 / 1024,
  63. 'delay': delay,
  64. 'size': total_size,
  65. 'time': total_time,
  66. }
  67. async def get_headers(url: str, headers: dict = None, session: ClientSession = None, timeout: int = 5) -> \
  68. CIMultiDictProxy[str] | dict[
  69. any, any]:
  70. """
  71. Get the headers of the url
  72. """
  73. if session is None:
  74. session = ClientSession(connector=TCPConnector(ssl=False), trust_env=True)
  75. created_session = True
  76. else:
  77. created_session = False
  78. res_headers = {}
  79. try:
  80. async with session.head(url, headers=headers, timeout=timeout) as response:
  81. res_headers = response.headers
  82. except:
  83. pass
  84. finally:
  85. if created_session:
  86. await session.close()
  87. return res_headers
  88. async def get_url_content(url: str, headers: dict = None, session: ClientSession = None,
  89. timeout: int = speed_test_timeout) -> str:
  90. """
  91. Get the content of the url
  92. """
  93. if session is None:
  94. session = ClientSession(connector=TCPConnector(ssl=False), trust_env=True)
  95. created_session = True
  96. else:
  97. created_session = False
  98. content = ""
  99. try:
  100. async with session.get(url, headers=headers, timeout=timeout) as response:
  101. if response.status == 200:
  102. content = await response.text()
  103. else:
  104. raise Exception("Invalid response")
  105. except:
  106. pass
  107. finally:
  108. if created_session:
  109. await session.close()
  110. return content
  111. def check_m3u8_valid(headers: CIMultiDictProxy[str] | dict[any, any]) -> bool:
  112. """
  113. Check if the m3u8 url is valid
  114. """
  115. content_type = headers.get('Content-Type', '').lower()
  116. if not content_type:
  117. return False
  118. return any(item in content_type for item in m3u8_headers)
  119. async def get_result(url: str, headers: dict = None, resolution: str = None,
  120. filter_resolution: bool = config.open_filter_resolution,
  121. timeout: int = speed_test_timeout) -> dict[str, float | None]:
  122. """
  123. Get the test result of the url
  124. """
  125. info = {'speed': 0, 'delay': -1, 'resolution': resolution}
  126. location = None
  127. try:
  128. url = quote(url, safe=':/?$&=@[]%').partition('$')[0]
  129. async with ClientSession(connector=TCPConnector(ssl=False), trust_env=True) as session:
  130. res_headers = await get_headers(url, headers, session)
  131. location = res_headers.get('Location')
  132. if location:
  133. info.update(await get_result(location, headers, resolution, filter_resolution, timeout))
  134. else:
  135. url_content = await get_url_content(url, headers, session, timeout)
  136. if url_content:
  137. m3u8_obj = m3u8.loads(url_content)
  138. playlists = m3u8_obj.playlists
  139. segments = m3u8_obj.segments
  140. if playlists:
  141. best_playlist = max(m3u8_obj.playlists, key=lambda p: p.stream_info.bandwidth)
  142. playlist_url = urljoin(url, best_playlist.uri)
  143. playlist_content = await get_url_content(playlist_url, headers, session, timeout)
  144. if playlist_content:
  145. media_playlist = m3u8.loads(playlist_content)
  146. segment_urls = [urljoin(playlist_url, segment.uri) for segment in media_playlist.segments]
  147. else:
  148. segment_urls = [urljoin(url, segment.uri) for segment in segments]
  149. if not segment_urls:
  150. raise Exception("Segment urls not found")
  151. else:
  152. res_info = await get_speed_with_download(url, headers, session, timeout)
  153. info.update({'speed': res_info['speed'], 'delay': res_info['delay']})
  154. raise Exception("No url content, use download with timeout to test")
  155. start_time = time()
  156. tasks = [get_speed_with_download(ts_url, headers, session, timeout) for ts_url in segment_urls[:5]]
  157. results = await asyncio.gather(*tasks, return_exceptions=True)
  158. total_size = sum(result['size'] for result in results if isinstance(result, dict))
  159. total_time = sum(result['time'] for result in results if isinstance(result, dict))
  160. info['speed'] = total_size / total_time / 1024 / 1024 if total_time > 0 else 0
  161. info['delay'] = int(round((time() - start_time) * 1000))
  162. except:
  163. pass
  164. finally:
  165. if not resolution and filter_resolution and not location and info['delay'] != -1:
  166. info['resolution'] = await get_resolution_ffprobe(url, headers, timeout)
  167. return info
  168. async def get_delay_requests(url, timeout=speed_test_timeout, proxy=None):
  169. """
  170. Get the delay of the url by requests
  171. """
  172. async with ClientSession(
  173. connector=TCPConnector(ssl=False), trust_env=True
  174. ) as session:
  175. start = time()
  176. end = None
  177. try:
  178. async with session.get(url, timeout=timeout, proxy=proxy) as response:
  179. if response.status == 404:
  180. return -1
  181. content = await response.read()
  182. if content:
  183. end = time()
  184. else:
  185. return -1
  186. except Exception as e:
  187. return -1
  188. return int(round((end - start) * 1000)) if end else -1
  189. def check_ffmpeg_installed_status():
  190. """
  191. Check ffmpeg is installed
  192. """
  193. status = False
  194. try:
  195. result = subprocess.run(
  196. ["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
  197. )
  198. status = result.returncode == 0
  199. except FileNotFoundError:
  200. status = False
  201. except Exception as e:
  202. print(e)
  203. finally:
  204. return status
  205. async def ffmpeg_url(url, timeout=speed_test_timeout):
  206. """
  207. Get url info by ffmpeg
  208. """
  209. args = ["ffmpeg", "-t", str(timeout), "-stats", "-i", url, "-f", "null", "-"]
  210. proc = None
  211. res = None
  212. try:
  213. proc = await asyncio.create_subprocess_exec(
  214. *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
  215. )
  216. out, err = await asyncio.wait_for(proc.communicate(), timeout=timeout + 2)
  217. if out:
  218. res = out.decode("utf-8")
  219. if err:
  220. res = err.decode("utf-8")
  221. return None
  222. except asyncio.TimeoutError:
  223. if proc:
  224. proc.kill()
  225. return None
  226. except Exception:
  227. if proc:
  228. proc.kill()
  229. return None
  230. finally:
  231. if proc:
  232. await proc.wait()
  233. return res
  234. async def get_resolution_ffprobe(url: str, headers: dict = None, timeout: int = speed_test_timeout) -> str | None:
  235. """
  236. Get the resolution of the url by ffprobe
  237. """
  238. resolution = None
  239. proc = None
  240. try:
  241. probe_args = [
  242. 'ffprobe',
  243. '-v', 'error',
  244. '-headers', ''.join(f'{k}: {v}\r\n' for k, v in headers.items()) if headers else '',
  245. '-select_streams', 'v:0',
  246. '-show_entries', 'stream=width,height',
  247. "-of", 'json',
  248. url
  249. ]
  250. proc = await asyncio.create_subprocess_exec(*probe_args, stdout=asyncio.subprocess.PIPE,
  251. stderr=asyncio.subprocess.PIPE)
  252. out, _ = await asyncio.wait_for(proc.communicate(), timeout)
  253. video_stream = json.loads(out.decode('utf-8'))["streams"][0]
  254. resolution = f"{video_stream['width']}x{video_stream['height']}"
  255. except:
  256. if proc:
  257. proc.kill()
  258. finally:
  259. if proc:
  260. await proc.wait()
  261. return resolution
  262. def get_video_info(video_info):
  263. """
  264. Get the video info
  265. """
  266. frame_size = -1
  267. resolution = None
  268. if video_info is not None:
  269. info_data = video_info.replace(" ", "")
  270. matches = re.findall(r"frame=(\d+)", info_data)
  271. if matches:
  272. frame_size = int(matches[-1])
  273. match = re.search(r"(\d{3,4}x\d{3,4})", video_info)
  274. if match:
  275. resolution = match.group(0)
  276. return frame_size, resolution
  277. async def check_stream_delay(url_info):
  278. """
  279. Check the stream delay
  280. """
  281. try:
  282. url = url_info["url"]
  283. video_info = await ffmpeg_url(url)
  284. if video_info is None:
  285. return -1
  286. frame, resolution = get_video_info(video_info)
  287. if frame is None or frame == -1:
  288. return -1
  289. url_info["resolution"] = resolution
  290. return url_info, frame
  291. except Exception as e:
  292. print(e)
  293. return -1
  294. def get_avg_result(result) -> TestResult:
  295. return {
  296. 'speed': sum(item['speed'] or 0 for item in result) / len(result),
  297. 'delay': max(
  298. int(sum(item['delay'] or -1 for item in result) / len(result)), -1),
  299. 'resolution': max((item['resolution'] for item in result), key=get_resolution_value)
  300. }
  301. def get_speed_result(key: str) -> TestResult:
  302. """
  303. Get the speed result of the url
  304. """
  305. if key in cache:
  306. return get_avg_result(cache[key])
  307. else:
  308. return {'speed': 0, 'delay': -1, 'resolution': 0}
  309. async def get_speed(data, headers=None, ipv6_proxy=None, filter_resolution=open_filter_resolution,
  310. timeout=speed_test_timeout, callback=None) -> TestResult:
  311. """
  312. Get the speed (response time and resolution) of the url
  313. """
  314. url = data['url']
  315. resolution = data['resolution']
  316. result: TestResult = {'speed': 0, 'delay': -1, 'resolution': resolution}
  317. try:
  318. cache_key = data['host'] if speed_test_filter_host else url
  319. if cache_key and cache_key in cache:
  320. result = get_avg_result(cache[cache_key])
  321. else:
  322. if data['ipv_type'] == "ipv6" and ipv6_proxy:
  323. result.update(default_ipv6_result)
  324. elif constants.rt_url_pattern.match(url) is not None:
  325. start_time = time()
  326. if not result['resolution'] and filter_resolution:
  327. result['resolution'] = await get_resolution_ffprobe(url, headers, timeout)
  328. result['delay'] = int(round((time() - start_time) * 1000))
  329. if result['resolution'] is not None:
  330. result['speed'] = float("inf")
  331. else:
  332. result.update(await get_result(url, headers, resolution, filter_resolution, timeout))
  333. if cache_key:
  334. cache.setdefault(cache_key, []).append(result)
  335. finally:
  336. if callback:
  337. callback()
  338. return result
  339. def get_sort_result(
  340. results,
  341. supply=open_supply,
  342. filter_speed=open_filter_speed,
  343. min_speed=min_speed_value,
  344. filter_resolution=open_filter_resolution,
  345. min_resolution=min_resolution_value,
  346. max_resolution=max_resolution_value,
  347. ipv6_support=True
  348. ) -> list[ChannelTestResult]:
  349. """
  350. get the sort result
  351. """
  352. total_result = []
  353. for result in results:
  354. if not ipv6_support and result["ipv_type"] == "ipv6":
  355. result.update(default_ipv6_result)
  356. result_speed, result_delay, resolution = (
  357. result.get("speed") or 0,
  358. result.get("delay"),
  359. result.get("resolution")
  360. )
  361. if result_delay == -1:
  362. continue
  363. if not supply:
  364. if filter_speed and result_speed < min_speed:
  365. continue
  366. if filter_resolution and resolution:
  367. resolution_value = get_resolution_value(resolution)
  368. if resolution_value < min_resolution or resolution_value > max_resolution:
  369. continue
  370. total_result.append(result)
  371. total_result.sort(key=lambda item: item.get("speed") or 0, reverse=True)
  372. return total_result