1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327 |
- """
- Rough API documentation
- -----------------------
- SHA/getGameConfigs?...
- Game paths (audio/voiceover, screenshots, logs, crash dumps) for all HoYo games
- SHA/getAllGameBasicInfo?...
- Launcher background data for the specified game
- SHA/getGames?...
- Launcher images and links for all HoYo games
- SHA/getGameContent?...
- Event preview data for the specified game
- SGP/getLatestRelease?...
- Launcher update information
- SDA/getPatchBuild (POST)
- List of manifests information (same as getBuild)
- Seriously guys, why don't you provide the chunk URL, diff URL and the two manifests in the same file?
- Functional description
- ----------------------
- 1 ) getBuild
- JSON file that provides information about the available game and voiceover pack files
- Provides manifests and the base URL to download chunks from
- 2 ) manifest
- Provides information for all chunks or ldiff files
- 3a) chunks
- zstd-compressed sections of files for installing from scratch (or new ones)
- 3b) diffs
- hdiff files to patch installed game files
- TODO
- ----
- Low priority
- Parallelization for file downloads and patching
- Apply patches for non-existent files (apparently the official launcher can do that)
- Hints for developers:
- 1. Investigate the JSON files downloaded to `tmp/` -> variable `EXPORT_JSON_FILES`
- """
- from __future__ import annotations
- import argparse
- import hashlib
- import io
- import json
- import pathlib
- import re
- import shutil
- import subprocess
- import sys
- import tempfile
- import time
- import urllib.error
- import urllib.request as request
- from typing import TYPE_CHECKING
- import zstandard
- from google.protobuf.json_format import MessageToJson
- import manifest_pb2
- import manifest_ldiff_pb2
- if TYPE_CHECKING:
- from os import PathLike
- SCRIPTDIR = pathlib.Path(__file__).resolve().parent
- HPATCHZ_APP = SCRIPTDIR / "HDiffPatch/hpatchz"
- assert HPATCHZ_APP.is_file(), f"{HPATCHZ_APP.resolve()} not found."
- EXPORT_JSON_FILES = True
- class Options(argparse.Namespace):
- gamedir: pathlib.Path | None = None
- tempdir: pathlib.Path | None = SCRIPTDIR / "tmp"
-
-
- force_use_cache: bool = False
- predownload: bool = False
- install_reltype: str | None = None
- do_install: bool = False
- do_update: bool = False
- repair_mode: str | None = None
- dry_run: bool = False
- disallow_download: bool = False
-
- ignore_conditions: bool = False
- TESTING_FILE: str | None = None
-
- selected_lang_packs: str = ""
- OPT = Options()
- VOICEOVERS_LUT = {
-
- "English(US)": {"short": "en-us"},
- "Japanese": {"short": "ja-jp"},
- "Korean": {"short": "ko-kr"},
- "Chinese": {"short": "zh-cn"}
- }
- if True:
- keys: list = list(VOICEOVERS_LUT.keys())
- for k in keys:
- v = VOICEOVERS_LUT[k]
- v["friendly"] = k
-
- VOICEOVERS_LUT[v["short"]] = v
- def _handle_kwargs(kwargs):
- sys.stdout.write("\33[2K")
- def tempdir(*args: str | PathLike[str]) -> pathlib.Path:
- return OPT.tempdir.joinpath(*args)
- def gamedir(*args: str | PathLike[str]) -> pathlib.Path:
- return OPT.gamedir.joinpath(*args)
- def debuglog(*args, **kwargs):
- _handle_kwargs(kwargs)
- print("\033[37mDEBUG ", *args, "\033[0m", **kwargs)
- def infolog(*args, **kwargs):
- _handle_kwargs(kwargs)
- print("INFO ", *args, **kwargs)
- def warnlog(*args, **kwargs):
- _handle_kwargs(kwargs)
- print("\033[36mWARN ", *args, "\033[0m", **kwargs)
- def abortlog(*args, **kwargs):
- _handle_kwargs(kwargs)
- print("\033[31mERROR ", *args, "\033[0m", **kwargs)
- exit(1)
- def try_get_file_size(filename: pathlib.Path):
- """
- Returns -1 if the file was not found
- """
- try:
- return filename.stat().st_size
- except FileNotFoundError:
- return -1
- def filename_safety_check(filename):
- """
- Checks whether the path is relative AND within this tree
- This ensures that no files are written to unpredictable locations.
- """
- assert (".." not in str(filename)), f"Security alert! {filename}"
- assert (str(filename)[0] != '/'), f"Security alert! {filename}"
- def bytes_to_MiB(n: float):
- return int(n / (1024 * 1024 / 10) + 0.5) / 10
- def cmp_versions(lhs: list, rhs: list) -> int:
- """
- Returns [1 if lhs > rhs], [-1 if lhs < rhs], [0 if equal]
- """
- assert len(lhs) == len(rhs)
- for i in range(len(lhs)):
- if lhs[i] < rhs[i]:
- return -1
- if lhs[i] > rhs[i]:
- return 1
- return 0
- def hpatchz_patch_file(oldfile: pathlib.Path, dstfile: pathlib.Path, patchfile: pathlib.Path,
- p_offset: int, p_len: int, timeout: int = 50):
- """
- Patches a file, throws an exception upon failure
- One ldiff file may contain multiple patches, thus the offset
- Returns `True` on success, `False` on timeout
- """
- pfile_in = None
-
-
-
- pfile_in = patchfile.open("rb")
- pfile_in.seek(p_offset)
- pfile_out = tempfile.NamedTemporaryFile("wb")
- pfile_out.write(pfile_in.read(p_len))
- pfile_out.flush()
- proc = subprocess.Popen(
-
- [HPATCHZ_APP, "-f", oldfile, pfile_out.name, dstfile],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- text=True
- )
-
-
- try:
- pout, perr = proc.communicate(timeout=timeout)
- except subprocess.TimeoutExpired:
- proc.terminate()
- pout, perr = proc.communicate()
- dstfile.unlink(True)
- warnlog(f"hpatchz timeout ({timeout} s) reached on file '{dstfile.name}'.")
- return False
- retcode = proc.returncode
- if retcode != 0 or perr != "":
- dstfile.unlink(True)
- abortlog(f"Failed to patch file '{oldfile.name}' using '{patchfile.name}':"
- + f"\n\t Exit code: {retcode}"
- + "\n\t Message: " + perr)
-
- """
- Error Messages And Their Meaning
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- oldFile dataSize <integer> != diffFile saved oldDataSize <integer> ERROR!
- Wrong diff file; it does not match the file "signature"
- open oldFile for read ERROR!
- Missing source file
- patch run ERROR!
- Patch file has an unexpected length
- """
- return True
- class DownloadInfo:
- name: str | None = None
- getBuild_json = None
-
- manifest: manifest_pb2.Manifest | manifest_ldiff_pb2.DiffManifest | None = None
- category_json: None
- class SophonClient:
- installed_ver: None
- rel_type: str | None = None
- gamedatadir: str | None= None
- branch: str
- branches_json = None
-
-
- di_chunks = DownloadInfo()
- di_diffs = DownloadInfo()
- new_files_to_download = set()
- ldiff_files_to_remove = set()
- def initialize(self, opts: Options):
- global OPT
- OPT = opts
- self.di_chunks.name = "[chunks]"
- self.di_diffs.name = "[diffs]"
- if OPT.install_reltype:
- OPT.do_install = True
- self.rel_type = OPT.install_reltype
- if OPT.do_install + OPT.do_update + isinstance(OPT.repair_mode, str) > 1:
- abortlog("Do either install, update or repair!")
- assert OPT.gamedir != None, "Game directory not specified."
- self.branch = "pre_download" if OPT.predownload else "main"
- infolog(f"Selected branch '{self.branch}'")
- OPT.tempdir.mkdir(exist_ok=True)
- if OPT.dry_run:
- infolog("Simulation mode is enabled.")
-
- if OPT.do_install:
- self._initialize_install()
- if OPT.do_update or OPT.repair_mode:
- self._initialize_update()
- if not OPT.gamedir.is_dir():
- abortlog("Game directory does not exist.")
- def _initialize_install(self):
- self.installed_ver = None
- OPT.gamedir.mkdir(exist_ok=True)
- if not OPT.ignore_conditions:
-
- assert len(list(OPT.gamedir.glob("*"))) < 2, "The specified install path is not empty"
-
- templates = {
- "os": "[General]\r\nchannel=1\r\ncps=mihoyo\r\ngame_version=0.0.0\r\nsdk_version=\r\nsub_channel=0\r\n",
- "cn": "[General]\r\nchannel=1\r\ncps=mihoyo\r\ngame_version=0.0.0\r\nsdk_version=\r\nsub_channel=1\r\n",
- "bb": "[General]\r\nchannel=14\r\ncps=bilibili\r\ngame_version=0.0.0\r\nsdk_version=\r\nsub_channel=0\r\n"
- }
- assert templates[self.rel_type], "Unknown reltype"
- with gamedir("config.ini").open("w") as fh:
- fh.write(templates[self.rel_type])
- infolog("Created config.ini")
- def _get_gamedatadir(self):
-
- path = next(OPT.gamedir.glob("*_Data"), None)
- assert path, "Cannot determine game data dir"
- self.gamedatadir = path.name
- def _initialize_update(self):
- """
- Find out what kind of installation we need to update
- """
- self._get_gamedatadir()
- if gamedir("GenshinImpact.exe").is_file():
- self.rel_type = "os"
- elif gamedir("YuanShen.exe").is_file():
- if self.gamedatadir.joinpath("Plugins", "PCGameSDK.dll").is_file():
- self.rel_type = "bb"
- else:
- self.rel_type = "cn"
- if not isinstance(self.rel_type, str):
- abortlog("Failed to detect release type. " \
- + f"Game executable in '{OPT.gamedir}' could not be found.")
- infolog(f"Release type: {self.rel_type}")
-
- if not OPT.ignore_conditions:
- fullname = gamedir(self.gamedatadir, "globalgamemanagers")
- assert fullname.is_file(), "Game install is incomplete!"
- contents = fullname.read_bytes()
- ver = re.findall(br"\0(\d+\.\d+\.\d+)_\d+_\d+\0", contents)
- assert len(ver) == 1, "Broken script or corrupted game installation"
- self.installed_ver = ver[0].decode("utf-8")
- infolog(f"Installed game version: {self.installed_ver} (anchor 1: globalgamemanagers)")
- else:
-
- self.installed_ver = "5.5.0"
-
- self.check_config_ini()
- def check_config_ini(self):
- """
- Internal function. Picks the version as follows: min(config.ini, globalgamemanagers)
- """
- fullname = gamedir("config.ini")
- if not fullname.is_file():
- warnlog("config.ini not found")
- return
- contents = fullname.read_text()
- ver = re.findall(r"game_version=(\d+\.\d+\.\d+)", contents)
- if len(ver) != 1:
- warnlog("config.ini is incomplete or corrupt")
- return
- infolog(f"Installed game version: {ver[0]} (anchor 2: config.ini)")
-
- ver_cfg = [int(v) for v in ver[0].split(".")]
- ver_ggm = [int(v) for v in self.installed_ver.split(".")]
- if cmp_versions(ver_cfg, ver_ggm) == 1:
- warnlog("Potential issue: config.ini documents a more recent version!")
-
- else:
-
- self.installed_ver = ver[0]
- def get_voiceover_packs(self):
- """
- Returns a set of the installed packs: { "en-us", "ja-jp", "ko-kr", "zh-cn" }
- """
-
- fullname = gamedir(self.gamedatadir, "Persistent/audio_lang_14")
- packs = set()
- for line in fullname.open("r"):
- line = line.strip()
- if line == "":
- continue
- if not (line in VOICEOVERS_LUT):
- warnlog("Unknown voiceover pack in 'audio_lang_14': " + line)
- continue
- mediapath = gamedir(self.gamedatadir, "StreamingAssets/AudioAssets", line)
- num_files = len(list(mediapath.glob("*.*")))
- if num_files < 10:
-
- infolog(f"Skipping voiceover pack '{line}': Pack was installed in-game.")
- continue
- packs.add(VOICEOVERS_LUT[line]["short"])
- debuglog("Found voiceover packs:", ", ".join(packs))
- return packs
- def update_voiceover_meta_file(self):
- """
- [Install only] Auto-detect installed language packs and update audio_lang_14
- """
- self._get_gamedatadir()
- languages = set()
- filename: pathlib.Path
- for filename in OPT.gamedir.glob("*"):
- groups = re.findall(r"^Audio_(.+)_pkg_version$", filename.name)
- if len(groups) != 1:
- continue
- longname = groups[0]
- if not (longname in VOICEOVERS_LUT):
- warnlog(f"Unknown voiceover pack '{filename.name}'")
- continue
- languages.add(longname)
- languages = list(languages)
- languages.sort()
-
- langfile = gamedir(self.gamedatadir, "Persistent/audio_lang_14")
- lang_str = ", ".join(languages)
- if OPT.dry_run:
- infolog(f"[update lang file: {lang_str}]")
- return
- langfile.parent.mkdir(parents=True, exist_ok=True)
- with langfile.open("w", newline="\r\n") as fh:
- for lang in languages:
- fh.write(lang + "\n")
- infolog(f"Wrote the lang file to contain '{lang_str}'")
- def cleanup_temp(self):
- """
- Removes all temporary files
- """
-
- if OPT.tempdir.resolve() in OPT.gamedir.resolve():
- abortlog("Temp is within the game directory.")
- if OPT.gamedir.resolve() in OPT.tempdir.resolve():
- abortlog("Temp is a parent of the game directory.")
- if OPT.tempdir.resolve() in SCRIPTDIR:
- abortlog("Temp is a parent of this script.")
- assert False
- if OPT.dry_run:
- info(f"[Delete temp dir '{OPT.tempdir}']")
- return
- shutil.rmtree(OPT.tempdir)
- def load_cached_api_file(self, fname, url, POST_data = None):
- """
- Cached file download. For JSON (API) files only!
- fname: file name without path prefix
- url: str or function ptr to retrieve the URL
- Returns: File handle
- """
- fullname = tempdir(fname)
- do_download = True
- if fullname.is_file():
-
- do_download = time.time() - fullname.stat().st_mtime > (24 * 3600)
- if OPT.force_use_cache:
- do_download = False
- if do_download:
-
- if callable(url):
- url = url()
- if POST_data != None:
- req = request.Request(url, data=POST_data)
- resp = request.urlopen(req)
- with fullname.open("wb") as fh:
- fh.write(resp.read())
- else:
- request.urlretrieve(url, fullname)
- debuglog(f"Downloaded new file '{fname}'")
- else:
- debuglog(f"Loaded existing file '{fname}'")
- return fullname
- def load_or_download_json(self, fname, url):
- path = self.load_cached_api_file(fname, url)
- with path.open("rb") as fh:
- js = json.load(fh)
- ret = js["retcode"]
- assert ret == 0, (f"Failed to retrieve '{fname}': " +
- f"server returned status code {ret} ({js['message']})")
- return js["data"]
- def retrieve_API_keys(self):
- """
- Retrieves passkeys for authentication to download URLs
- Depends on "initialize_*".
- """
- assert isinstance(self.rel_type, str), "Missing initialize"
- base_url: str = None
- if self.rel_type == "os":
- base_url = "https://sg-hyp-api.hoy" + "overse.com/hyp/hyp-connect/api"
- else:
- base_url = "https://hyp-api.mih" + "oyo.com/hyp/hyp-connect/api"
- warnlog("CN/BB is yet not tested!")
- game_ids: str = None
- launcher_id: str = None
- if self.rel_type == "os":
-
- game_ids = "gopR6Cufr3"
- launcher_id = "VYTpXlbWo8"
- elif self.rel_type == "cn":
-
- launcher_id = "jGHBHlcOq1"
- game_ids = "1Z8W5NHUQb"
- elif self.rel_type == "bb":
-
- launcher_id = "umfgRO5gh5"
- game_ids = "T2S0Gz4Dr2"
- else:
- assert False, "unhandled rel_type"
- tail = f"game_ids[]={game_ids}&launcher_id={launcher_id}"
- if not self.branches_json:
-
- js = self.load_or_download_json("getGameBranches.json", f"{base_url}/getGameBranches?{tail}")
-
- self.branches_json = js["game_branches"][0][self.branch]
- assert self.branches_json is not None, \
- "Cannot find API keys for the selected branch. Maybe retry without pre-download?"
- ver = self.branches_json["tag"]
- infolog(f"Sophon provides game version {ver}")
- if False:
-
- self.load_cached_api_file("getGameConfigs.json", f"{base_url}/getGameConfigs?{tail}")
- if False:
-
- channel = 1
- sub_channel = 0
- self.load_cached_api_file("getGameChannelSDKs.json",
- f"{base_url}/getGameChannelSDKs?channel={channel}&{tail}&sub_channel={sub_channel}")
- def make_getBuild_url(self, api_file):
- """
- Compose the URL for the main JSON file for chunk-based downloads (sophon)
- api_file: 'getPatchBuild' or 'getBuild'
- Returns: URL
- """
- if not self.branches_json:
- self.retrieve_API_keys()
- url: str = None
- if OPT.do_update:
- if self.rel_type == "os":
- url = "sg-downloader-api.ho" + "yoverse.com"
- elif self.rel_type == "cn":
- assert False, "TODO"
- else:
- if self.rel_type == "os":
- url = "sg-public-api.ho" + "yoverse.com"
- elif self.rel_type == "cn":
- url = "api-takumi.mih" + "oyo.com"
- assert not (url is None), f"Unhandled release type {self.rel_type}"
- url = (
- "https://" + url + "/downloader/sophon_chunk/api/" + api_file
- + "?branch=" + self.branches_json["branch"]
- + "&package_id=" + self.branches_json["package_id"]
- + "&password=" + self.branches_json["password"]
- )
- infolog("Created " + api_file + " JSON URL")
- return url
- def get_getBuild_json(self, is_new_file: bool):
- """
- Returns the main JSON for manifest and chunk/diff information
- is_new_file:
- True: For new files manifest
- False: For patch files manifest
- """
- api = "getBuild" if is_new_file else "getPatchBuild"
- path = self.load_cached_api_file(f"{api}.json", lambda : self.make_getBuild_url(api),
-
- None if is_new_file else []
- )
- contents = None
- with path.open("rb") as fh:
- contents = json.load(fh)
- debuglog(f"Loaded {api} JSON")
- return contents
- def _select_category(self, dlinfo: DownloadInfo, cat_name):
- """
- Retrieves the manifest to download the specified category
- Fills in the 'DownloadInfo' values
- """
- jd = dlinfo.getBuild_json["data"]
- infolog(dlinfo.name, f"Server provides game version {jd['tag']}")
- category = None
- fuzzy_str = ""
-
- for jdm in jd["manifests"]:
- if jdm["matching_field"] == cat_name:
- category = jdm
- break
- if not category and not cat_name == "main":
- fuzzy_str = " (fuzzy match)"
-
- for jdm in jd["manifests"]:
- if cat_name in jdm["matching_field"]:
- if category:
- abortlog(f"Ambigous category '{cat_name}'")
- category = jdm
- cat_name = category["matching_field"]
- assert not (category is None), f"Cannot find the specified field '{cat_name}'"
- debuglog(dlinfo.name, f"Found category '{cat_name}'" + fuzzy_str)
- dlinfo.category_json = category
-
- fname_raw = category["manifest"]["id"]
- url = category["manifest_download"]["url_prefix"] + "/" + category["manifest"]["id"]
- zstd_path = self.load_cached_api_file(fname_raw + ".zstd", url)
- with zstd_path.open('br') as zfh:
- reader = zstandard.ZstdDecompressor().stream_reader(zfh)
- pb = None
- if dlinfo == self.di_diffs:
- pb = manifest_ldiff_pb2.DiffManifest()
- elif dlinfo == self.di_chunks:
- pb = manifest_pb2.Manifest()
- else:
- assert False, "unknown instance"
- pb.ParseFromString(reader.read())
- nfiles = len(pb.files)
- debuglog(dlinfo.name, f"Decompressed manifest protobuf ({nfiles} files)")
- if EXPORT_JSON_FILES:
-
-
- json_fname = tempdir(fname_raw + ".json")
- if not json_fname.is_file():
- with json_fname.open("w+") as jfh:
- json.dump(json.loads(MessageToJson(pb)), jfh)
- infolog(dlinfo.name, "Exported protobuf to JSON file")
- dlinfo.manifest = pb
- def load_manifest(self, cat_name):
- """
- Retrieve information about available patches/chunks for each game version
- cat_name: "game", "en-us", "zh-cn", "ja-jp", "ko-kr"
- """
- if not self.di_chunks.getBuild_json:
- self.di_chunks.getBuild_json = self.get_getBuild_json(True)
- if OPT.do_update:
-
- if self.installed_ver == self.di_chunks.getBuild_json["data"]["tag"]:
- abortlog("There is no update available.")
-
- self._select_category(self.di_chunks, cat_name)
- if OPT.do_update:
-
- if not self.di_diffs.getBuild_json:
- self.di_diffs.getBuild_json = self.get_getBuild_json(False)
- self._select_category(self.di_diffs, cat_name)
- def ldiff_manifest_required(self):
- assert isinstance(self.di_diffs.manifest, manifest_ldiff_pb2.DiffManifest), \
- "ldiff manifest is missing or invalid"
- def find_chunks_by_file_name(self, file_name):
- """
- Helper function. Searches a specific file name in the manifest
- Returns: FileInfo or None
- """
- assert isinstance(file_name, str)
- for v in self.di_chunks.manifest.files:
- if v.filename == file_name:
- return v
- warnlog(f"Cannot find chunks for file: {file_name}")
- return None
- def get_chunk_download_size(self, filter_by_new: bool) -> int:
- """
- Returns the sum of the chunk sizes
- """
- download_size_total: int = 0
- for v in self.di_chunks.manifest.files:
- if filter_by_new:
- if not (v.filename in self.new_files_to_download):
- continue
- for c in v.chunks:
- download_size_total += c.compressed_size
- return download_size_total
- def _download_file_resume(self, url: str, dstfile: pathlib.Path, dstsize: int):
- """
- Helper function.
- Downloads a file. Resumes interrupted downloads.
- """
- filesize = try_get_file_size(dstfile)
- if filesize == dstsize:
-
- return
- if filesize > dstsize:
-
- if OPT.dry_run:
- warnlog(f"[remove corrupted file '{dstfile.name}'")
- else:
- warnlog(f"Removing corrupted file: {dstfile.name}")
- dstfile.unlink()
- filesize = 0
- req = request.Request(url)
-
- if filesize > 0:
- req.add_header("Range", f"bytes={filesize}-")
- resp = None
- try:
- resp = request.urlopen(req)
-
-
- except urllib.error.HTTPError as e:
-
- if e.code != 416:
- abortlog(f"Cannot download: {e}")
- resp = None
- if not resp:
- return
-
- fh = dstfile.open("ab")
- while True:
- data = resp.read(10 * 1024 * 1024)
- if not data:
- break
- fh.write(data)
-
- def download_game_file(self, file_info: manifest_pb2.FileInfo):
- """
- Downloads the chunks and patches a file
- file_info: FileInfo, one of the manifest.files[] objects
- Returns `True` if the file is (now) present.
- """
- if file_info.flags == 64:
-
- infolog(f"Skipping directory entry: {file_info.filename}")
- return False
- assert (file_info.flags == 0), f"Unknown flags {file_info.flags} for '{file_info.filename}'"
- if OPT.TESTING_FILE and not (OPT.TESTING_FILE in file_info.filename):
- return True
- filename_safety_check(file_info.filename)
- filename = pathlib.Path(file_info.filename)
-
- if try_get_file_size(gamedir(filename)) == file_info.size:
-
- return True
- CHUNK_URL_PREFIX = self.di_chunks.category_json["chunk_download"]["url_prefix"]
-
- size_mib = bytes_to_MiB(file_info.size)
- infolog(f"Downloading '{filename.name}', {size_mib} MiB, {len(file_info.chunks)} chunks")
- if OPT.disallow_download:
- warnlog(f"NOT downloading chunks for {filename.name}")
- return
-
- dstfile = tempdir(filename.name)
- bytes_written = 0
- while True:
- if try_get_file_size(dstfile) == file_info.size:
-
- break
- fh = dstfile.open("wb")
-
- for chunk in file_info.chunks:
- cfname = tempdir(chunk.chunk_id)
- if chunk.offset != bytes_written:
- warnlog("\t Unexpected offset. Seek may fail.")
-
- self._download_file_resume(CHUNK_URL_PREFIX + "/" + chunk.chunk_id, cfname, chunk.compressed_size)
-
- with cfname.open("rb") as zfh:
- reader = zstandard.ZstdDecompressor().stream_reader(zfh)
- data = reader.read()
- fh.seek(chunk.offset)
- fh.write(data)
- bytes_written += len(data)
- debuglog(f"\t Progress: {(bytes_written * 100 / file_info.size):2.0f} % | "
- + f" {bytes_to_MiB(bytes_written)} / {size_mib} MiB", end="\r")
- print("")
-
- md5 = hashlib.md5(dstfile.read_bytes()).hexdigest()
- if file_info.md5 == md5:
- infolog("\t File is correct (md5 check)")
- else:
- dstfile.unlink()
- abortlog(f"\t File is corrupt after download: {filename.name}. Please retry.")
-
- for chunk in file_info.chunks:
- tempdir(chunk.chunk_id).unlink(True)
-
- if OPT.dry_run:
- infolog(f"[move new '{filename.name}' -> game dir]")
- return True
- gamefile = gamedir(filename).resolve()
- gamefile.parent.mkdir(parents=True, exist_ok=True)
- shutil.move(dstfile, gamefile)
- return True
- def update_config_ini_version(self):
- """
- Quick file sanity check + file update after install or update
- """
- if OPT.do_install + OPT.do_update != 1:
- abortlog("Invalid operation")
- confname = gamedir("config.ini")
- contents = confname.read_text()
- ver = re.findall(r"game_version=(\d+\.\d+\.\d+)", contents)
- if len(ver) != 1:
- warnlog("config.ini is incomplete or corrupt")
- return
- infolog("Checking game file integrity (quick) ...")
-
- error_fn = warnlog if OPT.dry_run else abortlog
- if OPT.do_install:
- for v in self.di_chunks.manifest.files:
- if v.flags == 64:
- continue
- if try_get_file_size(gamedir(v.filename)) != v.size:
- error_fn(f"File missing or invalid size: {v.filename}")
-
- if OPT.do_update:
- self.ldiff_manifest_required()
- for v in self.di_diffs.manifest.files:
- if try_get_file_size(gamedir(v.filename)) != v.size:
- error_fn(f"File missing or invalid size: {v.filename}")
-
-
-
- deletelist: manifest_ldiff_pb2.PatchInfo = []
- for v in self.di_diffs.manifest.files_delete:
- if v.key == self.installed_ver:
- deletelist = v.info.list
- for v in deletelist:
- if gamedir(v.filename).is_file():
- error_fn(f"Old file still exists: {v.filename}")
- self.installed_ver = self.di_chunks.getBuild_json["data"]["tag"]
- contents = contents.replace(ver[0], self.installed_ver)
- if OPT.dry_run:
- infolog(f"[update config.ini to {self.installed_ver}]")
- return
- confname.write_text(contents)
- infolog(f"Updated config.ini to {self.installed_ver}")
- def get_ldiff_patchinfo(self, v) -> manifest_ldiff_pb2.PatchInfo:
- """
- Helper function. Find the patch file for our installed binary
- May return `None` if the file has not been changed.
- """
- pinfo: manifest_ldiff_pb2.PatchInfo = None
- for w in v.patches:
- if w.key == self.installed_ver:
- pinfo = w.info
- break
- return pinfo
- def _download_ldiff_file(self, ldiff_dir: pathlib.Path, v: manifest_ldiff_pb2.DiffFileInfo):
- """
- Helper function to download one diff file.
- Returns the patch file name on success, else None.
- """
- if len(v.patches) == 0:
-
-
-
- self.new_files_to_download.add(v.filename)
- return None
- if OPT.TESTING_FILE:
- if not (OPT.TESTING_FILE in v.filename):
- return None
- else:
- print("ENTER TO DOWNLOAD: ", v.filename)
- input()
- filename_safety_check(v.filename)
-
- pinfo = self.get_ldiff_patchinfo(v)
- if not pinfo:
- return None
-
- while True:
- gamefile = gamedir(v.filename)
- gamefilesize = try_get_file_size(gamefile)
- md5 = None
- if gamefilesize == pinfo.original_size:
-
- break
- if gamefilesize == v.size:
-
- md5 = hashlib.md5(gamefile.read_bytes()).hexdigest()
- if md5 == v.hash:
- debuglog(f"File '{gamefile.name}' is already up-to-date. Skipping.")
- return None
- if gamefilesize == -1:
-
-
- infolog(f"Cannot find file '{gamefile.name}'. Adding to chunk download queue.")
- self.new_files_to_download.add(v.filename)
- return None
- md5 = md5 if md5 else hashlib.md5(gamefile.read_bytes()).hexdigest()
- warnlog(f"md5 hash mismatch in '{gamefile.name}'. is={md5}, should={pinfo.original_hash} or {v.hash}")
- self.new_files_to_download.add(v.filename)
-
- return None
- ldiffname = ldiff_dir.joinpath(pinfo.patch_id)
- if try_get_file_size(ldiffname) == pinfo.patch_size:
-
-
- debuglog(f"Diff '{ldiffname.name}' is already present. Skipping download.")
- return ldiffname.name
- tmp_file = pathlib.Path(f"{ldiffname}_tmp")
- size_mib = bytes_to_MiB(pinfo.patch_size)
- infolog(f"Downloading diff for '{v.filename}', {size_mib} MiB\n"
- f"\t -> {pinfo.patch_id}"
- )
- if OPT.disallow_download:
- warnlog(f"NOT downloading diff for {ldiffname.name}")
- return None
- DIFF_URL_PREFIX = self.di_diffs.category_json["diff_download"]["url_prefix"]
- self._download_file_resume(DIFF_URL_PREFIX + "/" + pinfo.patch_id, tmp_file, pinfo.patch_size)
- debuglog("Download done")
-
- assert tmp_file.stat().st_size == pinfo.patch_size, "Corrupted patch download"
-
-
- shutil.move(tmp_file, ldiffname)
- return ldiffname.name
- def _apply_ldiff_file(self, ldiff_dir: pathlib.Path, v: manifest_ldiff_pb2.DiffFileInfo):
- """
- Helper function to apply one diff file.
- """
- assert (not OPT.predownload or OPT.TESTING_FILE), "Not allowed for pre-downloads."
- assert not (v.filename in self.new_files_to_download), "invalid script usage"
- if OPT.TESTING_FILE and not (OPT.TESTING_FILE in v.filename):
- return
- filename_safety_check(v.filename)
-
- pinfo = self.get_ldiff_patchinfo(v)
- if not pinfo:
- return
- gamefile = gamedir(v.filename)
-
- dstfile = tempdir(pathlib.Path(v.filename).name)
- dstfile.unlink(True)
- ldiffname = ldiff_dir.joinpath(pinfo.patch_id)
- if not ldiffname.is_file():
- if OPT.disallow_download:
- return
- abortlog(f"Diff file {ldiffname.name} is missing. Please redownload.")
-
- done = hpatchz_patch_file(gamefile, dstfile, ldiffname, pinfo.patch_offset, pinfo.patch_length)
- if not done:
-
- done = hpatchz_patch_file(gamefile, dstfile, ldiffname, pinfo.patch_offset, pinfo.patch_length, 300)
-
- assert dstfile.stat().st_size == v.size
- md5 = hashlib.md5(dstfile.read_bytes()).hexdigest()
- if md5 != v.hash:
- warnlog(f"Checksum failed on file {v.filename}. Corrupt?")
-
- self.new_files_to_download.add(v.filename)
- else:
- infolog(f"Patched file {v.filename}")
-
- if OPT.dry_run:
- infolog(f"[move patched '{dstfile.name}' -> game dir]")
- return
- shutil.move(dstfile, gamefile)
- def apply_or_prepare_ldiff_files(self):
- """
- Downloads the ldiff files and patches the destination file if not predownload.
- Requires self.load_manifest(CATEGORY)
- """
- self.ldiff_manifest_required()
- assert len(self.new_files_to_download) == 0, "List must be empty!"
- ldiff_dir = gamedir("ldiff")
- ldiff_dir.mkdir(exist_ok=True)
-
- download_sizes_checked = set()
- download_size_total = 0
- for v in self.di_diffs.manifest.files:
- pinfo = self.get_ldiff_patchinfo(v)
- if not pinfo:
- continue
- if pinfo.patch_id in download_sizes_checked:
- continue
- download_sizes_checked.add(pinfo.patch_id)
- download_size_total += pinfo.patch_length
- infolog(f"Downloading ldiff files (up to {bytes_to_MiB(download_size_total)} MiB) ...")
- del download_sizes_checked
- del download_size_total
-
- files_total = len(self.di_diffs.manifest.files)
- files_done = 0
- what_txt = " and patched" if OPT.predownload else ""
-
- for v in self.di_diffs.manifest.files:
-
-
- downloaded = self._download_ldiff_file(ldiff_dir, v)
- if downloaded:
- self.ldiff_files_to_remove.add(downloaded)
- if not OPT.predownload:
-
- self._apply_ldiff_file(ldiff_dir, v)
- elif OPT.TESTING_FILE and (OPT.TESTING_FILE in v.filename):
-
- warnlog(f"ENTER TO APPLY PATCH (will create backup file): ", OPT.TESTING_FILE)
- input()
- gamefile = gamedir(v.filename)
- shutil.copy2(gamefile, f"{gamefile}.bak")
- self._apply_ldiff_file(ldiff_dir, v)
- files_done += 1
- relname = pathlib.Path(v.filename).name
- infolog(f"Progress: {files_done} / {files_total} files | Downloaded: {relname}", end="\r")
- if files_done % 100 == 0:
- print("")
- infolog("\nFiles downloaded" + what_txt + ".")
-
- def process_deletefiles(self):
- """
- [Update only] Remove old files
- """
- self.ldiff_manifest_required()
- assert not OPT.predownload, "Not allowed for pre-downloads."
-
- deletelist: manifest_ldiff_pb2.PatchInfo = []
- for v in self.di_diffs.manifest.files_delete:
- if v.key == self.installed_ver:
- deletelist = v.info.list
- infolog(f"Deleting {len(deletelist)} old files ...")
- for v in deletelist:
- filename_safety_check(v.filename)
- gamefile = gamedir(v.filename)
- if not gamefile.is_file():
- continue
-
- if OPT.dry_run:
- infolog(f"[delete old file {v.filename}]")
- continue
- infolog(f"Deleted old file: {v.filename}")
- gamefile.unlink()
- def diff_download_new_files(self):
- """
- [Update/repair only] Downloads files that were added in the new version.
- """
- if OPT.predownload:
- infolog("New files download is DISABLED for predownloads!")
- self.new_files_to_download.clear()
- return
- download_size_total = self.get_chunk_download_size(True)
- infolog(f"Downloading newly added files (up to {bytes_to_MiB(download_size_total)} MiB) ...")
- del download_size_total
-
- files_total = len(self.new_files_to_download)
- files_done = 0
- for v in self.di_chunks.manifest.files:
- if not (v.filename in self.new_files_to_download):
- continue
- self.download_game_file(v)
- files_done += 1
- relname = pathlib.Path(v.filename).name
- infolog(f"Progress: {files_done} / {files_total} files | Downloaded: {relname}",
- end="\r")
- if files_done % 100 == 0:
- print("")
- infolog("Download complete.")
- self.new_files_to_download.clear()
- def remove_ldiff_files(self):
- """
- [Update only] Removes all downloaded ldiff files
- """
- self.ldiff_manifest_required()
- assert not OPT.predownload, "Not allowed for pre-downloads."
- ldiff_dir = gamedir("ldiff")
- if not ldiff_dir.is_dir():
- warnlog(f"Directory {ldiff_dir} not found. Cannot cleanup.")
- return
- count = 0
-
-
- for v in self.ldiff_files_to_remove:
- filename : pathlib.Path = ldiff_dir.joinpath(v)
- if not filename.is_file():
- continue
- count += 1
- if OPT.dry_run:
- infolog(f"[remove now unused ldiff '{v}']")
- continue
- filename.unlink()
- infolog(f"Cleaned up {count} now unused ldiff files.")
- def repair_by_category(self, cat_name: str):
- """
- Use sophon chunks to restore missing or incorrect files
- `load_manifest` must be used to select the correct category to repair
- """
- assert not OPT.predownload, "Not allowed for pre-downloads."
-
- self.load_manifest(cat_name)
- if self.installed_ver != self.di_chunks.getBuild_json["data"]["tag"]:
- abortlog("The installed version is outdated. Run an update first.")
- self.new_files_to_download.clear()
- reliable_checking = (OPT.repair_mode == "reliable")
- infolog(f"Repair started. Mode: {reliable_checking}")
- files_checked = 0
- files_total = len(self.di_chunks.manifest.files)
- for v in self.di_chunks.manifest.files:
- files_checked += 1
- debuglog(f"\t Progress: {(files_checked * 100 / files_total):2.0f} % | "
- + f" {files_checked} / {files_total} files checked", end="\r")
- reason = None
- gamefile = gamedir(v.filename)
- gamefilesize = try_get_file_size(gamefile)
- if gamefilesize != v.size:
- reason = f"size mismatch. is={gamefilesize}, should={v.size}"
- elif reliable_checking:
- md5 = hashlib.md5(gamefile.read_bytes()).hexdigest()
- if md5 != v.md5:
- reason = f"md5 mismatch. is={md5}, should={v.md5}"
- if not reason:
- continue
- print("")
- infolog(f"Need to repair file '{v.filename}': " + reason)
- self.new_files_to_download.add(v.filename)
- print("")
- self.diff_download_new_files()
|