123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- import argparse
- import glob
- import json
- import os
- import uuid
- from base64 import b64decode, b64encode
- from multiprocessing import Pool
- from Crypto.Cipher import AES, PKCS1_OAEP
- from Crypto.PublicKey import RSA
- from Crypto.Random import get_random_bytes
- PUBLIC_KEY: RSA.RsaKey = None
- PRIVATE_KEY: RSA.RsaKey = None
- VERBOSE = False
- def vprint(*args, **kvargs):
- if VERBOSE:
- print(*args, **kvargs)
- def stat2dict(stat: os.stat_result) -> dict:
- return {
- "mode": stat.st_mode,
- "size": stat.st_size,
- "ctime": stat.st_ctime,
- "ctime_ns": stat.st_ctime_ns,
- "mtime": stat.st_mtime,
- "mtime_ns": stat.st_mtime_ns,
- "atime": stat.st_atime,
- "atime_ns": stat.st_atime_ns,
- "uid": stat.st_uid,
- "gid": stat.st_gid,
- }
- def setfattr(filename: str, attr: dict):
- try:
- os.utime(filename, (attr["atime"], attr["mtime"]))
- os.utime(filename, ns=(attr["atime_ns"], attr["mtime_ns"]))
- os.chmod(filename, attr["mode"])
- os.chown(filename, attr["uid"], attr["gid"])
- except Exception as e:
- vprint(e)
- def obfuscator_task(x) -> tuple[str, dict]:
- directory, filename = x
- f = uuid.uuid4().hex
- path = os.path.join(directory, f)
- if PUBLIC_KEY:
- session = get_random_bytes(32)
- cipher = AES.new(session, AES.MODE_EAX)
- with open(filename, "rb") as file:
- enc, tag = cipher.encrypt_and_digest(file.read())
- fstat = os.stat(filename)
- vprint(f"{filename} -> {path}, mode: {fstat.st_mode}")
- with open(path, "wb") as file:
- file.write(enc)
- os.remove(filename)
- return (
- filename,
- {
- "path": path,
- "attrs": stat2dict(fstat),
- "session": b64encode(
- PKCS1_OAEP.new(PUBLIC_KEY).encrypt(session)
- ).decode(),
- "nonce": b64encode(cipher.nonce).decode(),
- "tag": b64encode(tag).decode(),
- },
- )
- fstat = os.stat(filename)
- vprint(f"{filename} -> {path}, mode: {fstat.st_mode}")
- os.rename(filename, path)
- return (filename, {"path": path, "attrs": stat2dict(fstat)})
- def obfuscate(directory: str, files: list[str]) -> dict:
- with Pool(os.cpu_count()) as pool:
- catalog = dict(pool.map(obfuscator_task, [(directory, f) for f in files]))
- for i in files:
- path = os.path.dirname(i)
- if os.path.exists(path) and path != directory:
- vprint("rmdir", path)
- try:
- os.removedirs(path)
- except Exception:
- pass
- return catalog
- def deobfuscator_task(x):
- directory, old_path, entry = x
- if PRIVATE_KEY:
- vprint(f"{entry['path']} -> {old_path}")
- os.makedirs(os.path.dirname(old_path), exist_ok=True)
- cipher = AES.new(
- PKCS1_OAEP.new(PRIVATE_KEY).decrypt(b64decode(entry["session"])),
- AES.MODE_EAX,
- b64decode(entry["nonce"]),
- )
- with open(old_path, "wb") as output, open(entry["path"], "rb") as source:
- output.write(
- cipher.decrypt_and_verify(source.read(), b64decode(entry["tag"]))
- )
- os.remove(entry["path"])
- setfattr(old_path, entry["attrs"])
- return old_path
- vprint(f"{entry['path']} -> {old_path}")
- os.makedirs(os.path.dirname(old_path), exist_ok=True)
- os.rename(entry["path"], old_path)
- setfattr(old_path, entry["attrs"])
- return old_path
- def deobfuscate(directory: str, catalog: dict):
- with Pool(os.cpu_count()) as pool:
- pool.map(
- deobfuscator_task,
- [(directory, old_path, entry) for old_path, entry in catalog.items()],
- )
- def get_args():
- p = argparse.ArgumentParser()
- p.add_argument("dir")
- p.add_argument("-d", "--decode", action="store_false")
- p.add_argument("-e", "--encrypt", action="store_true")
- p.add_argument("-m", "--megafile", action="store_true")
- p.add_argument("-v", "--verbose", action="store_true")
- return p.parse_args()
- if __name__ == "__main__":
- args = get_args()
- VERBOSE = args.verbose
- if args.decode:
- old_files = []
- for dirname, _, filenames in os.walk(args.dir):
- for filename in filenames:
- old_path = os.path.join(dirname, filename)
- if os.path.isfile(old_path):
- old_files.append(old_path)
- if args.encrypt:
- with open("public.pem", "rb") as file:
- PUBLIC_KEY = RSA.import_key(file.read())
- catalog = obfuscate(args.dir, old_files)
- with open(args.dir + ".catalog", "w") as file:
- json.dump(catalog, file)
- if args.megafile:
- with open(args.dir + ".megafile", "wb") as file:
- for _, entry in catalog.items():
- with open(entry["path"], "rb") as x:
- vprint("to megafile:", entry["path"])
- file.write(x.read())
- else:
- with open(args.dir + ".catalog", "r") as file:
- catalog = json.load(file)
- if args.megafile:
- os.makedirs(args.dir, exist_ok=True)
- with open(args.dir + ".megafile", "rb") as mf:
- for _, entry in catalog.items():
- with open(entry["path"], "wb") as file:
- vprint("from megafile:", entry["path"])
- file.write(mf.read(entry["attrs"]["size"]))
- if args.encrypt:
- with open("private.pem", "rb") as file:
- PRIVATE_KEY = RSA.import_key(file.read())
- deobfuscate(args.dir, catalog)
|