| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- #!/usr/bin/env python3
- import argparse
- import base64
- import json
- import os
- import re
- import sys
- def b64_decode_flexible(s):
- t = "".join(s.strip().split())
- pad = (-len(t)) % 4
- t = t + ("=" * pad)
- try:
- return base64.b64decode(t).decode("utf-8", errors="replace")
- except Exception:
- return base64.urlsafe_b64decode(t).decode("utf-8", errors="replace")
- def b64_encode_std(s):
- return base64.b64encode(s.encode("utf-8")).decode("ascii")
- def normalize_vmess_payload(payload):
- clean = "".join(payload.strip().split())
- pad = (-len(clean)) % 4
- return clean + ("=" * pad)
- def update_vmess_line(line, domain, name_rx=None):
- stripped = line.strip()
- if not stripped.startswith("vmess://"):
- return line, False, None
- payload = stripped[len("vmess://") :]
- try:
- body = b64_decode_flexible(normalize_vmess_payload(payload))
- obj = json.loads(body)
- except Exception as e:
- return line, False, f"decode_error: {e}"
- ps = str(obj.get("ps", ""))
- if name_rx is not None and not name_rx.search(ps):
- return line, False, None
- if str(obj.get("add", "")).strip() == domain:
- return line, False, None
- obj["add"] = domain
- encoded = b64_encode_std(json.dumps(obj, ensure_ascii=False, separators=(",", ":")))
- return f"vmess://{encoded}", True, None
- def read_domain(args):
- if args.domain:
- return args.domain.strip()
- if not args.domain_file:
- raise ValueError("must provide --domain or --domain-file")
- with open(args.domain_file, "r", encoding="utf-8") as f:
- return f.read().strip()
- def main():
- ap = argparse.ArgumentParser(description="Update vmess:// links by replacing add field")
- ap.add_argument("--input", required=True, help="Input subscription file")
- ap.add_argument("--output", required=True, help="Output subscription file")
- ap.add_argument("--domain", default="", help="Target domain to set as add")
- ap.add_argument("--domain-file", default="./runtime/current_domain.txt", help="Domain file path")
- ap.add_argument("--name-regex", default="", help="Only update nodes whose ps matches regex")
- ap.add_argument(
- "--subscription-base64",
- action="store_true",
- help="Input/output file is base64-encoded full subscription content",
- )
- args = ap.parse_args()
- domain = read_domain(args)
- if not domain:
- print(json.dumps({"status": "error", "error": "empty domain"}, ensure_ascii=True), file=sys.stderr)
- sys.exit(1)
- name_rx = re.compile(args.name_regex) if args.name_regex else None
- with open(args.input, "r", encoding="utf-8") as f:
- raw_input = f.read()
- content = raw_input
- if args.subscription_base64:
- content = b64_decode_flexible(raw_input)
- lines = content.splitlines()
- out_lines = []
- total_vmess = 0
- updated = 0
- errors = 0
- for line in lines:
- if line.strip().startswith("vmess://"):
- total_vmess += 1
- new_line, changed, err = update_vmess_line(line, domain, name_rx=name_rx)
- if changed:
- updated += 1
- if err:
- errors += 1
- out_lines.append(new_line)
- out_text = "\n".join(out_lines)
- if content.endswith("\n"):
- out_text += "\n"
- write_text = out_text
- if args.subscription_base64:
- write_text = b64_encode_std(out_text)
- os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True)
- with open(args.output, "w", encoding="utf-8") as f:
- f.write(write_text)
- result = {
- "status": "ok",
- "domain": domain,
- "total_vmess": total_vmess,
- "updated": updated,
- "errors": errors,
- "output": args.output,
- }
- print(json.dumps(result, ensure_ascii=True))
- if __name__ == "__main__":
- main()
|