speed.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. import asyncio
  2. import http.cookies
  3. import json
  4. import re
  5. import subprocess
  6. from logging import INFO
  7. from time import time
  8. from urllib.parse import quote, urljoin
  9. import m3u8
  10. from aiohttp import ClientSession, TCPConnector
  11. from multidict import CIMultiDictProxy
  12. import utils.constants as constants
  13. from utils.config import config
  14. from utils.tools import get_resolution_value, get_logger
  15. from utils.types import TestResult, ChannelTestResult, TestResultCacheData
  16. http.cookies._is_legal_key = lambda _: True
  17. cache: TestResultCacheData = {}
  18. speed_test_timeout = config.speed_test_timeout
  19. speed_test_filter_host = config.speed_test_filter_host
  20. open_filter_resolution = config.open_filter_resolution
  21. min_resolution_value = config.min_resolution_value
  22. max_resolution_value = config.max_resolution_value
  23. open_supply = config.open_supply
  24. open_filter_speed = config.open_filter_speed
  25. min_speed_value = config.min_speed
  26. m3u8_headers = ['application/x-mpegurl', 'application/vnd.apple.mpegurl', 'audio/mpegurl', 'audio/x-mpegurl']
  27. default_ipv6_delay = 0.1
  28. default_ipv6_resolution = "1920x1080"
  29. default_ipv6_result = {
  30. 'speed': float("inf"),
  31. 'delay': default_ipv6_delay,
  32. 'resolution': default_ipv6_resolution
  33. }
  34. logger = get_logger(constants.speed_test_log_path, level=INFO, init=True)
  35. async def get_speed_with_download(url: str, headers: dict = None, session: ClientSession = None,
  36. timeout: int = speed_test_timeout) -> dict[
  37. str, float | None]:
  38. """
  39. Get the speed of the url with a total timeout
  40. """
  41. start_time = time()
  42. delay = -1
  43. total_size = 0
  44. if session is None:
  45. session = ClientSession(connector=TCPConnector(ssl=False), trust_env=True)
  46. created_session = True
  47. else:
  48. created_session = False
  49. try:
  50. async with session.get(url, headers=headers, timeout=timeout) as response:
  51. if response.status != 200:
  52. raise Exception("Invalid response")
  53. delay = int(round((time() - start_time) * 1000))
  54. async for chunk in response.content.iter_any():
  55. if chunk:
  56. total_size += len(chunk)
  57. except:
  58. pass
  59. finally:
  60. total_time = time() - start_time
  61. if created_session:
  62. await session.close()
  63. return {
  64. 'speed': total_size / total_time / 1024 / 1024,
  65. 'delay': delay,
  66. 'size': total_size,
  67. 'time': total_time,
  68. }
  69. async def get_headers(url: str, headers: dict = None, session: ClientSession = None, timeout: int = 5) -> \
  70. CIMultiDictProxy[str] | dict[
  71. any, any]:
  72. """
  73. Get the headers of the url
  74. """
  75. if session is None:
  76. session = ClientSession(connector=TCPConnector(ssl=False), trust_env=True)
  77. created_session = True
  78. else:
  79. created_session = False
  80. res_headers = {}
  81. try:
  82. async with session.head(url, headers=headers, timeout=timeout) as response:
  83. res_headers = response.headers
  84. except:
  85. pass
  86. finally:
  87. if created_session:
  88. await session.close()
  89. return res_headers
  90. async def get_url_content(url: str, headers: dict = None, session: ClientSession = None,
  91. timeout: int = speed_test_timeout) -> str:
  92. """
  93. Get the content of the url
  94. """
  95. if session is None:
  96. session = ClientSession(connector=TCPConnector(ssl=False), trust_env=True)
  97. created_session = True
  98. else:
  99. created_session = False
  100. content = ""
  101. try:
  102. async with session.get(url, headers=headers, timeout=timeout) as response:
  103. if response.status == 200:
  104. content = await response.text()
  105. else:
  106. raise Exception("Invalid response")
  107. except:
  108. pass
  109. finally:
  110. if created_session:
  111. await session.close()
  112. return content
  113. def check_m3u8_valid(headers: CIMultiDictProxy[str] | dict[any, any]) -> bool:
  114. """
  115. Check if the m3u8 url is valid
  116. """
  117. content_type = headers.get('Content-Type', '').lower()
  118. if not content_type:
  119. return False
  120. return any(item in content_type for item in m3u8_headers)
  121. async def get_result(url: str, headers: dict = None, resolution: str = None,
  122. filter_resolution: bool = config.open_filter_resolution,
  123. timeout: int = speed_test_timeout) -> dict[str, float | None]:
  124. """
  125. Get the test result of the url
  126. """
  127. info = {'speed': 0, 'delay': -1, 'resolution': resolution}
  128. location = None
  129. try:
  130. url = quote(url, safe=':/?$&=@[]%').partition('$')[0]
  131. async with ClientSession(connector=TCPConnector(ssl=False), trust_env=True) as session:
  132. res_headers = await get_headers(url, headers, session)
  133. location = res_headers.get('Location')
  134. if location:
  135. info.update(await get_result(location, headers, resolution, filter_resolution, timeout))
  136. else:
  137. url_content = await get_url_content(url, headers, session, timeout)
  138. if url_content:
  139. m3u8_obj = m3u8.loads(url_content)
  140. playlists = m3u8_obj.playlists
  141. segments = m3u8_obj.segments
  142. if playlists:
  143. best_playlist = max(m3u8_obj.playlists, key=lambda p: p.stream_info.bandwidth)
  144. playlist_url = urljoin(url, best_playlist.uri)
  145. playlist_content = await get_url_content(playlist_url, headers, session, timeout)
  146. if playlist_content:
  147. media_playlist = m3u8.loads(playlist_content)
  148. segment_urls = [urljoin(playlist_url, segment.uri) for segment in media_playlist.segments]
  149. else:
  150. segment_urls = [urljoin(url, segment.uri) for segment in segments]
  151. if not segment_urls:
  152. raise Exception("Segment urls not found")
  153. else:
  154. res_info = await get_speed_with_download(url, headers, session, timeout)
  155. info.update({'speed': res_info['speed'], 'delay': res_info['delay']})
  156. raise Exception("No url content, use download with timeout to test")
  157. start_time = time()
  158. tasks = [get_speed_with_download(ts_url, headers, session, timeout) for ts_url in segment_urls[:5]]
  159. results = await asyncio.gather(*tasks, return_exceptions=True)
  160. total_size = sum(result['size'] for result in results if isinstance(result, dict))
  161. total_time = sum(result['time'] for result in results if isinstance(result, dict))
  162. info['speed'] = total_size / total_time / 1024 / 1024 if total_time > 0 else 0
  163. info['delay'] = int(round((time() - start_time) * 1000))
  164. except:
  165. pass
  166. finally:
  167. if not resolution and filter_resolution and not location and info['delay'] != -1:
  168. info['resolution'] = await get_resolution_ffprobe(url, headers, timeout)
  169. return info
  170. async def get_delay_requests(url, timeout=speed_test_timeout, proxy=None):
  171. """
  172. Get the delay of the url by requests
  173. """
  174. async with ClientSession(
  175. connector=TCPConnector(ssl=False), trust_env=True
  176. ) as session:
  177. start = time()
  178. end = None
  179. try:
  180. async with session.get(url, timeout=timeout, proxy=proxy) as response:
  181. if response.status == 404:
  182. return -1
  183. content = await response.read()
  184. if content:
  185. end = time()
  186. else:
  187. return -1
  188. except Exception as e:
  189. return -1
  190. return int(round((end - start) * 1000)) if end else -1
  191. def check_ffmpeg_installed_status():
  192. """
  193. Check ffmpeg is installed
  194. """
  195. status = False
  196. try:
  197. result = subprocess.run(
  198. ["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
  199. )
  200. status = result.returncode == 0
  201. except FileNotFoundError:
  202. status = False
  203. except Exception as e:
  204. print(e)
  205. finally:
  206. return status
  207. async def ffmpeg_url(url, timeout=speed_test_timeout):
  208. """
  209. Get url info by ffmpeg
  210. """
  211. args = ["ffmpeg", "-t", str(timeout), "-stats", "-i", url, "-f", "null", "-"]
  212. proc = None
  213. res = None
  214. try:
  215. proc = await asyncio.create_subprocess_exec(
  216. *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
  217. )
  218. out, err = await asyncio.wait_for(proc.communicate(), timeout=timeout + 2)
  219. if out:
  220. res = out.decode("utf-8")
  221. if err:
  222. res = err.decode("utf-8")
  223. return None
  224. except asyncio.TimeoutError:
  225. if proc:
  226. proc.kill()
  227. return None
  228. except Exception:
  229. if proc:
  230. proc.kill()
  231. return None
  232. finally:
  233. if proc:
  234. await proc.wait()
  235. return res
  236. async def get_resolution_ffprobe(url: str, headers: dict = None, timeout: int = speed_test_timeout) -> str | None:
  237. """
  238. Get the resolution of the url by ffprobe
  239. """
  240. resolution = None
  241. proc = None
  242. try:
  243. probe_args = [
  244. 'ffprobe',
  245. '-v', 'error',
  246. '-headers', ''.join(f'{k}: {v}\r\n' for k, v in headers.items()) if headers else '',
  247. '-select_streams', 'v:0',
  248. '-show_entries', 'stream=width,height',
  249. "-of", 'json',
  250. url
  251. ]
  252. proc = await asyncio.create_subprocess_exec(*probe_args, stdout=asyncio.subprocess.PIPE,
  253. stderr=asyncio.subprocess.PIPE)
  254. out, _ = await asyncio.wait_for(proc.communicate(), timeout)
  255. video_stream = json.loads(out.decode('utf-8'))["streams"][0]
  256. resolution = f"{video_stream['width']}x{video_stream['height']}"
  257. except:
  258. if proc:
  259. proc.kill()
  260. finally:
  261. if proc:
  262. await proc.wait()
  263. return resolution
  264. def get_video_info(video_info):
  265. """
  266. Get the video info
  267. """
  268. frame_size = -1
  269. resolution = None
  270. if video_info is not None:
  271. info_data = video_info.replace(" ", "")
  272. matches = re.findall(r"frame=(\d+)", info_data)
  273. if matches:
  274. frame_size = int(matches[-1])
  275. match = re.search(r"(\d{3,4}x\d{3,4})", video_info)
  276. if match:
  277. resolution = match.group(0)
  278. return frame_size, resolution
  279. async def check_stream_delay(url_info):
  280. """
  281. Check the stream delay
  282. """
  283. try:
  284. url = url_info["url"]
  285. video_info = await ffmpeg_url(url)
  286. if video_info is None:
  287. return -1
  288. frame, resolution = get_video_info(video_info)
  289. if frame is None or frame == -1:
  290. return -1
  291. url_info["resolution"] = resolution
  292. return url_info, frame
  293. except Exception as e:
  294. print(e)
  295. return -1
  296. def get_avg_result(result) -> TestResult:
  297. return {
  298. 'speed': sum(item['speed'] or 0 for item in result) / len(result),
  299. 'delay': max(
  300. int(sum(item['delay'] or -1 for item in result) / len(result)), -1),
  301. 'resolution': max((item['resolution'] for item in result), key=get_resolution_value)
  302. }
  303. def get_speed_result(key: str) -> TestResult:
  304. """
  305. Get the speed result of the url
  306. """
  307. if key in cache:
  308. return get_avg_result(cache[key])
  309. else:
  310. return {'speed': 0, 'delay': -1, 'resolution': 0}
  311. async def get_speed(data, headers=None, ipv6_proxy=None, filter_resolution=open_filter_resolution,
  312. timeout=speed_test_timeout, callback=None) -> TestResult:
  313. """
  314. Get the speed (response time and resolution) of the url
  315. """
  316. url = data['url']
  317. resolution = data['resolution']
  318. result: TestResult = {'speed': 0, 'delay': -1, 'resolution': resolution}
  319. try:
  320. cache_key = data['host'] if speed_test_filter_host else url
  321. if cache_key and cache_key in cache:
  322. result = get_avg_result(cache[cache_key])
  323. else:
  324. if data['ipv_type'] == "ipv6" and ipv6_proxy:
  325. result.update(default_ipv6_result)
  326. elif constants.rt_url_pattern.match(url) is not None:
  327. start_time = time()
  328. if not result['resolution'] and filter_resolution:
  329. result['resolution'] = await get_resolution_ffprobe(url, headers, timeout)
  330. result['delay'] = int(round((time() - start_time) * 1000))
  331. if result['resolution'] is not None:
  332. result['speed'] = float("inf")
  333. else:
  334. result.update(await get_result(url, headers, resolution, filter_resolution, timeout))
  335. if cache_key:
  336. cache.setdefault(cache_key, []).append(result)
  337. finally:
  338. if callback:
  339. callback()
  340. logger.info(
  341. f"Name: {data.get('name')}, URL: {data.get('url')}, IPv_Type: {data.get("ipv_type")}, Location: {data.get('location')}, ISP: {data.get('isp')}, Date: {data["date"]}, Delay: {result.get('delay') or -1} ms, Speed: {result.get('speed') or 0:.2f} M/s, Resolution: {result.get('resolution')}"
  342. )
  343. return result
  344. def get_sort_result(
  345. results,
  346. supply=open_supply,
  347. filter_speed=open_filter_speed,
  348. min_speed=min_speed_value,
  349. filter_resolution=open_filter_resolution,
  350. min_resolution=min_resolution_value,
  351. max_resolution=max_resolution_value,
  352. ipv6_support=True
  353. ) -> list[ChannelTestResult]:
  354. """
  355. get the sort result
  356. """
  357. total_result = []
  358. for result in results:
  359. if not ipv6_support and result["ipv_type"] == "ipv6":
  360. result.update(default_ipv6_result)
  361. result_speed, result_delay, resolution = (
  362. result.get("speed") or 0,
  363. result.get("delay"),
  364. result.get("resolution")
  365. )
  366. if result_delay == -1:
  367. continue
  368. if not supply:
  369. if filter_speed and result_speed < min_speed:
  370. continue
  371. if filter_resolution and resolution:
  372. resolution_value = get_resolution_value(resolution)
  373. if resolution_value < min_resolution or resolution_value > max_resolution:
  374. continue
  375. total_result.append(result)
  376. total_result.sort(key=lambda item: item.get("speed") or 0, reverse=True)
  377. return total_result