update_vmess_links.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. #!/usr/bin/env python3
  2. import argparse
  3. import base64
  4. import json
  5. import os
  6. import re
  7. import sys
  8. def b64_decode_flexible(s):
  9. t = "".join(s.strip().split())
  10. pad = (-len(t)) % 4
  11. t = t + ("=" * pad)
  12. try:
  13. return base64.b64decode(t).decode("utf-8", errors="replace")
  14. except Exception:
  15. return base64.urlsafe_b64decode(t).decode("utf-8", errors="replace")
  16. def b64_encode_std(s):
  17. return base64.b64encode(s.encode("utf-8")).decode("ascii")
  18. def normalize_vmess_payload(payload):
  19. clean = "".join(payload.strip().split())
  20. pad = (-len(clean)) % 4
  21. return clean + ("=" * pad)
  22. def update_vmess_line(line, domain, name_rx=None):
  23. stripped = line.strip()
  24. if not stripped.startswith("vmess://"):
  25. return line, False, None
  26. payload = stripped[len("vmess://") :]
  27. try:
  28. body = b64_decode_flexible(normalize_vmess_payload(payload))
  29. obj = json.loads(body)
  30. except Exception as e:
  31. return line, False, f"decode_error: {e}"
  32. ps = str(obj.get("ps", ""))
  33. if name_rx is not None and not name_rx.search(ps):
  34. return line, False, None
  35. if str(obj.get("add", "")).strip() == domain:
  36. return line, False, None
  37. obj["add"] = domain
  38. encoded = b64_encode_std(json.dumps(obj, ensure_ascii=False, separators=(",", ":")))
  39. return f"vmess://{encoded}", True, None
  40. def read_domain(args):
  41. if args.domain:
  42. return args.domain.strip()
  43. if not args.domain_file:
  44. raise ValueError("must provide --domain or --domain-file")
  45. with open(args.domain_file, "r", encoding="utf-8") as f:
  46. return f.read().strip()
  47. def main():
  48. ap = argparse.ArgumentParser(description="Update vmess:// links by replacing add field")
  49. ap.add_argument("--input", required=True, help="Input subscription file")
  50. ap.add_argument("--output", required=True, help="Output subscription file")
  51. ap.add_argument("--domain", default="", help="Target domain to set as add")
  52. ap.add_argument("--domain-file", default="./runtime/current_domain.txt", help="Domain file path")
  53. ap.add_argument("--name-regex", default="", help="Only update nodes whose ps matches regex")
  54. ap.add_argument(
  55. "--subscription-base64",
  56. action="store_true",
  57. help="Input/output file is base64-encoded full subscription content",
  58. )
  59. args = ap.parse_args()
  60. domain = read_domain(args)
  61. if not domain:
  62. print(json.dumps({"status": "error", "error": "empty domain"}, ensure_ascii=True), file=sys.stderr)
  63. sys.exit(1)
  64. name_rx = re.compile(args.name_regex) if args.name_regex else None
  65. with open(args.input, "r", encoding="utf-8") as f:
  66. raw_input = f.read()
  67. content = raw_input
  68. if args.subscription_base64:
  69. content = b64_decode_flexible(raw_input)
  70. lines = content.splitlines()
  71. out_lines = []
  72. total_vmess = 0
  73. updated = 0
  74. errors = 0
  75. for line in lines:
  76. if line.strip().startswith("vmess://"):
  77. total_vmess += 1
  78. new_line, changed, err = update_vmess_line(line, domain, name_rx=name_rx)
  79. if changed:
  80. updated += 1
  81. if err:
  82. errors += 1
  83. out_lines.append(new_line)
  84. out_text = "\n".join(out_lines)
  85. if content.endswith("\n"):
  86. out_text += "\n"
  87. write_text = out_text
  88. if args.subscription_base64:
  89. write_text = b64_encode_std(out_text)
  90. os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True)
  91. with open(args.output, "w", encoding="utf-8") as f:
  92. f.write(write_text)
  93. result = {
  94. "status": "ok",
  95. "domain": domain,
  96. "total_vmess": total_vmess,
  97. "updated": updated,
  98. "errors": errors,
  99. "output": args.output,
  100. }
  101. print(json.dumps(result, ensure_ascii=True))
  102. if __name__ == "__main__":
  103. main()