obfuscate.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. import argparse
  2. import glob
  3. import json
  4. import os
  5. import uuid
  6. from base64 import b64decode, b64encode
  7. from multiprocessing import Pool
  8. from Crypto.Cipher import AES, PKCS1_OAEP
  9. from Crypto.PublicKey import RSA
  10. from Crypto.Random import get_random_bytes
  11. PUBLIC_KEY: RSA.RsaKey = None
  12. PRIVATE_KEY: RSA.RsaKey = None
  13. VERBOSE = False
  14. def vprint(*args, **kvargs):
  15. if VERBOSE:
  16. print(*args, **kvargs)
  17. def stat2dict(stat: os.stat_result) -> dict:
  18. return {
  19. "mode": stat.st_mode,
  20. "size": stat.st_size,
  21. "ctime": stat.st_ctime,
  22. "ctime_ns": stat.st_ctime_ns,
  23. "mtime": stat.st_mtime,
  24. "mtime_ns": stat.st_mtime_ns,
  25. "atime": stat.st_atime,
  26. "atime_ns": stat.st_atime_ns,
  27. "uid": stat.st_uid,
  28. "gid": stat.st_gid,
  29. }
  30. def setfattr(filename: str, attr: dict):
  31. try:
  32. os.utime(filename, (attr["atime"], attr["mtime"]))
  33. os.utime(filename, ns=(attr["atime_ns"], attr["mtime_ns"]))
  34. os.chmod(filename, attr["mode"])
  35. os.chown(filename, attr["uid"], attr["gid"])
  36. except Exception as e:
  37. vprint(e)
  38. def obfuscator_task(x) -> tuple[str, dict]:
  39. directory, filename = x
  40. f = uuid.uuid4().hex
  41. path = os.path.join(directory, f)
  42. if PUBLIC_KEY:
  43. session = get_random_bytes(32)
  44. cipher = AES.new(session, AES.MODE_EAX)
  45. with open(filename, "rb") as file:
  46. enc, tag = cipher.encrypt_and_digest(file.read())
  47. fstat = os.stat(filename)
  48. vprint(f"{filename} -> {path}, mode: {fstat.st_mode}")
  49. with open(path, "wb") as file:
  50. file.write(enc)
  51. os.remove(filename)
  52. return (
  53. filename,
  54. {
  55. "path": path,
  56. "attrs": stat2dict(fstat),
  57. "session": b64encode(
  58. PKCS1_OAEP.new(PUBLIC_KEY).encrypt(session)
  59. ).decode(),
  60. "nonce": b64encode(cipher.nonce).decode(),
  61. "tag": b64encode(tag).decode(),
  62. },
  63. )
  64. fstat = os.stat(filename)
  65. vprint(f"{filename} -> {path}, mode: {fstat.st_mode}")
  66. os.rename(filename, path)
  67. return (filename, {"path": path, "attrs": stat2dict(fstat)})
  68. def obfuscate(directory: str, files: list[str]) -> dict:
  69. with Pool(os.cpu_count()) as pool:
  70. catalog = dict(pool.map(obfuscator_task, [(directory, f) for f in files]))
  71. for i in files:
  72. path = os.path.dirname(i)
  73. if os.path.exists(path) and path != directory:
  74. vprint("rmdir", path)
  75. try:
  76. os.removedirs(path)
  77. except Exception:
  78. pass
  79. return catalog
  80. def deobfuscator_task(x):
  81. directory, old_path, entry = x
  82. if PRIVATE_KEY:
  83. vprint(f"{entry['path']} -> {old_path}")
  84. os.makedirs(os.path.dirname(old_path), exist_ok=True)
  85. cipher = AES.new(
  86. PKCS1_OAEP.new(PRIVATE_KEY).decrypt(b64decode(entry["session"])),
  87. AES.MODE_EAX,
  88. b64decode(entry["nonce"]),
  89. )
  90. with open(old_path, "wb") as output, open(entry["path"], "rb") as source:
  91. output.write(
  92. cipher.decrypt_and_verify(source.read(), b64decode(entry["tag"]))
  93. )
  94. os.remove(entry["path"])
  95. setfattr(old_path, entry["attrs"])
  96. return old_path
  97. vprint(f"{entry['path']} -> {old_path}")
  98. os.makedirs(os.path.dirname(old_path), exist_ok=True)
  99. os.rename(entry["path"], old_path)
  100. setfattr(old_path, entry["attrs"])
  101. return old_path
  102. def deobfuscate(directory: str, catalog: dict):
  103. with Pool(os.cpu_count()) as pool:
  104. pool.map(
  105. deobfuscator_task,
  106. [(directory, old_path, entry) for old_path, entry in catalog.items()],
  107. )
  108. def get_args():
  109. p = argparse.ArgumentParser()
  110. p.add_argument("dir")
  111. p.add_argument("-d", "--decode", action="store_false")
  112. p.add_argument("-e", "--encrypt", action="store_true")
  113. p.add_argument("-m", "--megafile", action="store_true")
  114. p.add_argument("-v", "--verbose", action="store_true")
  115. return p.parse_args()
  116. if __name__ == "__main__":
  117. args = get_args()
  118. VERBOSE = args.verbose
  119. if args.decode:
  120. old_files = []
  121. for dirname, _, filenames in os.walk(args.dir):
  122. for filename in filenames:
  123. old_path = os.path.join(dirname, filename)
  124. if os.path.isfile(old_path):
  125. old_files.append(old_path)
  126. if args.encrypt:
  127. with open("public.pem", "rb") as file:
  128. PUBLIC_KEY = RSA.import_key(file.read())
  129. catalog = obfuscate(args.dir, old_files)
  130. with open(args.dir + ".catalog", "w") as file:
  131. json.dump(catalog, file)
  132. if args.megafile:
  133. with open(args.dir + ".megafile", "wb") as file:
  134. for _, entry in catalog.items():
  135. with open(entry["path"], "rb") as x:
  136. vprint("to megafile:", entry["path"])
  137. file.write(x.read())
  138. else:
  139. with open(args.dir + ".catalog", "r") as file:
  140. catalog = json.load(file)
  141. if args.megafile:
  142. os.makedirs(args.dir, exist_ok=True)
  143. with open(args.dir + ".megafile", "rb") as mf:
  144. for _, entry in catalog.items():
  145. with open(entry["path"], "wb") as file:
  146. vprint("from megafile:", entry["path"])
  147. file.write(mf.read(entry["attrs"]["size"]))
  148. if args.encrypt:
  149. with open("private.pem", "rb") as file:
  150. PRIVATE_KEY = RSA.import_key(file.read())
  151. deobfuscate(args.dir, catalog)