#!/usr/bin/env python3 import argparse import datetime as dt import json import os import re import socket import ssl import subprocess import sys import time import urllib.parse import urllib.request DOMAIN_RE = re.compile(r"^(?=.{1,253}$)(?!-)[A-Za-z0-9-]{1,63}(?= len(parts): return [cur] part = parts[idx] if part.endswith("[]"): key = part[:-2] if isinstance(cur, dict): arr = cur.get(key) else: arr = None if not isinstance(arr, list): return [] out = [] for item in arr: out.extend(walk(item, idx + 1)) return out if isinstance(cur, dict) and part in cur: return walk(cur[part], idx + 1) return [] return walk(data, 0) def parse_domains(payload, parser_cfg): domains = [] for p in parser_cfg.get("field_paths", []): values = get_values_by_path(payload, p) domains.extend(flatten_values(values)) for p in parser_cfg.get("json_paths", []): v = get_by_json_path(payload, p) if v is not None: domains.extend(flatten_values(v)) if not domains: regex_s = parser_cfg.get("regex", r"[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}") text = json.dumps(payload, ensure_ascii=True) domains.extend(re.findall(regex_s, text)) clean = [] seen = set() for d in domains: d = d.strip().lower().rstrip(".") if (DOMAIN_RE.match(d) or IPV4_RE.match(d)) and d not in seen: seen.add(d) clean.append(d) return clean def parse_created_time(s): if not s: return None try: return dt.datetime.strptime(str(s).strip(), "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.timezone.utc) except Exception: return None def parse_scored_records(payload, scoring_cfg): if not scoring_cfg.get("enabled", False): return [] records_path = scoring_cfg.get("records_path", "data.good[]") ip_field = scoring_cfg.get("ip_field", "ip") created_time_field = scoring_cfg.get("created_time_field", "createdTime") score_fields = scoring_cfg.get("score_fields", ["avgScore", "ydScore", "dxScore", "ltScore"]) raw_records = get_values_by_path(payload, records_path) out = [] for r in raw_records: if not isinstance(r, dict): continue domain = str(r.get(ip_field, "")).strip().lower().rstrip(".") if not domain: continue created = parse_created_time(r.get(created_time_field)) scores = [] for f in score_fields: v = r.get(f) try: scores.append(float(v)) except Exception: scores.append(float("inf")) out.append( { "domain": domain, "created_at": created, "created_raw": r.get(created_time_field), "scores": scores, } ) return out def rank_scored_records(records, scoring_cfg): if not records: return [] within_hours = float(scoring_cfg.get("within_hours", 24)) prefer_lower = bool(scoring_cfg.get("prefer_lower", True)) use_api_order = bool(scoring_cfg.get("use_api_order", False)) now = dt.datetime.now(dt.timezone.utc) cutoff = now - dt.timedelta(hours=within_hours) recent = [r for r in records if r.get("created_at") is not None and r["created_at"] >= cutoff] candidates = recent if recent else records if use_api_order: seen = set() ordered = [] for r in candidates: d = r["domain"] if d in seen: continue seen.add(d) ordered.append(r) return ordered def key_lower(r): return tuple(r["scores"] + [r["domain"]]) def key_higher(r): return tuple([-x if x != float("inf") else float("inf") for x in r["scores"]] + [r["domain"]]) ranked = sorted(candidates, key=key_lower if prefer_lower else key_higher) return ranked def apply_filter(domains, filter_cfg): include_suffixes = [s.lower() for s in filter_cfg.get("include_suffixes", []) if s] exclude_regex = [re.compile(x) for x in filter_cfg.get("exclude_regex", []) if x] out = [] for d in domains: if include_suffixes and not any(d.endswith(s) for s in include_suffixes): continue if any(rx.search(d) for rx in exclude_regex): continue out.append(d) return out def single_tls_check(domain, timeout_ms, port, tls_verify=True): start = time.perf_counter() timeout_sec = max(0.2, timeout_ms / 1000.0) try: infos = socket.getaddrinfo(domain, port, proto=socket.IPPROTO_TCP) if not infos: return False, None, "dns_empty" af, socktype, proto, _, sockaddr = infos[0] with socket.socket(af, socktype, proto) as sock: sock.settimeout(timeout_sec) sock.connect(sockaddr) if tls_verify: ctx = ssl.create_default_context() else: ctx = ssl._create_unverified_context() with ctx.wrap_socket(sock, server_hostname=domain) as ssock: ssock.do_handshake() elapsed = int((time.perf_counter() - start) * 1000) return True, elapsed, "ok" except Exception as e: return False, None, str(e) def check_domains(domains, hc_cfg): attempts = int(hc_cfg.get("attempts", 2)) timeout_ms = int(hc_cfg.get("timeout_ms", 1800)) port = int(hc_cfg.get("port", 443)) tls_verify = bool(hc_cfg.get("tls_verify", True)) results = [] for d in domains: ok_count = 0 latencies = [] errors = [] for _ in range(attempts): ok, latency, err = single_tls_check(d, timeout_ms, port, tls_verify=tls_verify) if ok: ok_count += 1 latencies.append(latency) else: errors.append(err) success_ratio = ok_count / attempts if attempts else 0.0 avg_latency = int(sum(latencies) / len(latencies)) if latencies else 999999 results.append( { "domain": d, "success_ratio": success_ratio, "avg_latency_ms": avg_latency, "ok_count": ok_count, "attempts": attempts, "errors": errors[:3], } ) results.sort(key=lambda x: (-x["success_ratio"], x["avg_latency_ms"], x["domain"])) return results def render_v2ray(template_file, output_file, token, domain): if not template_file or not output_file: return False if not os.path.exists(template_file): return False with open(template_file, "r", encoding="utf-8") as f: tpl = f.read() rendered = tpl.replace(token, domain) os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, "w", encoding="utf-8") as f: f.write(rendered) return True def run_notify(cmd, domain, status): if not cmd: return env = os.environ.copy() env["AUTODOMAIN"] = domain env["AUTODOMAIN_STATUS"] = status subprocess.run(cmd, shell=True, check=False, env=env) def choose_domain(filtered_domains, check_results, top_n, ranked_scored): if ranked_scored: domains_by_score = [x["domain"] for x in ranked_scored] if check_results: check_map = {x["domain"]: x for x in check_results} top = [] for d in domains_by_score: if d in check_map and check_map[d]["success_ratio"] > 0: top.append(check_map[d]) if len(top) >= top_n: break if top: return top[0]["domain"], top score_only = [{"domain": x["domain"], "scores": x["scores"], "created_raw": x["created_raw"]} for x in ranked_scored[:top_n]] return score_only[0]["domain"], score_only top_scored = [{"domain": x["domain"], "scores": x["scores"], "created_raw": x["created_raw"]} for x in ranked_scored[:top_n]] if top_scored: return top_scored[0]["domain"], top_scored if check_results: top = [x for x in check_results if x["success_ratio"] > 0][:top_n] if top: return top[0]["domain"], top return None, check_results[:top_n] if filtered_domains: return filtered_domains[0], [{"domain": x} for x in filtered_domains[:top_n]] return None, [] def main(): ap = argparse.ArgumentParser(description="Auto select VMess preferred domain") ap.add_argument("--config", default="config.json", help="Path to config JSON") args = ap.parse_args() config_path_abs = os.path.abspath(args.config) if not os.path.exists(config_path_abs): print(json.dumps({"status": "error", "error": f"config file not found: {config_path_abs}"}, ensure_ascii=True), file=sys.stderr) sys.exit(1) cfg = read_json_file(config_path_abs) output_cfg = cfg.get("output", {}) runtime_dir_cfg = output_cfg.get("runtime_dir", "./runtime") if os.path.isabs(runtime_dir_cfg): runtime_dir = runtime_dir_cfg else: runtime_dir = os.path.normpath(os.path.join(os.path.dirname(config_path_abs), runtime_dir_cfg)) v2_cfg = cfg.get("v2ray", {}) notify_cfg = cfg.get("notify", {}) current_domain_file = os.path.join(runtime_dir, output_cfg.get("current_domain_file", "current_domain.txt")) current_domain_json = os.path.join(runtime_dir, output_cfg.get("current_domain_json", "current_domain.json")) state_file = os.path.join(runtime_dir, output_cfg.get("state_file", "state.json")) substore_vars_file = os.path.join(runtime_dir, output_cfg.get("substore_vars_file", "substore_vars.json")) state = read_json_file(state_file, default={}) last_good = state.get("last_good_domain", "") try: payload = fetch_api_json(cfg) parsed = parse_domains(payload, cfg.get("parser", {})) filtered = apply_filter(parsed, cfg.get("domain_filter", {})) scored_records = parse_scored_records(payload, cfg.get("scoring", {})) scored_records = [r for r in scored_records if r["domain"] in set(filtered)] ranked_scored = rank_scored_records(scored_records, cfg.get("scoring", {})) check_results = [] if cfg.get("healthcheck", {}).get("enabled", True): check_results = check_domains(filtered, cfg.get("healthcheck", {})) top_n = int(cfg.get("selection", {}).get("top_n", 3)) selected, top_candidates = choose_domain(filtered, check_results, top_n, ranked_scored) status = "ok" if not selected and last_good: selected = last_good status = "fallback_last_good" if not selected: raise RuntimeError("No valid domain available from API and no fallback in state") write_text_file(current_domain_file, selected + "\n") current_json = { "domain": selected, "updated_at": utc_now_iso(), "status": status, "source_count": len(parsed), "checked_count": len(check_results), "top_candidates": top_candidates, } write_json_file(current_domain_json, current_json) write_json_file( substore_vars_file, { "AUTO_DOMAIN": selected, "UPDATED_AT": current_json["updated_at"], "STATUS": status, }, ) rendered = render_v2ray( template_file=v2_cfg.get("template_file", ""), output_file=v2_cfg.get("output_file", ""), token=v2_cfg.get("replace_token", "__AUTO_DOMAIN__"), domain=selected, ) new_state = { "updated_at": current_json["updated_at"], "last_good_domain": selected, "status": status, "source_count": len(parsed), "checked_count": len(check_results), "rendered_v2ray": rendered, } write_json_file(state_file, new_state) run_notify(notify_cfg.get("command", ""), selected, status) print(json.dumps(current_json, ensure_ascii=True)) except Exception as e: now = utc_now_iso() err_state = { "updated_at": now, "status": "error", "error": str(e), "last_good_domain": last_good, } write_json_file(state_file, err_state) if last_good: write_text_file(current_domain_file, last_good + "\n") write_json_file( current_domain_json, { "domain": last_good, "updated_at": now, "status": "error_use_last_good", "error": str(e), }, ) run_notify(notify_cfg.get("command", ""), last_good, "error_use_last_good") print(json.dumps({"status": "error_use_last_good", "error": str(e)}, ensure_ascii=True)) return print(json.dumps({"status": "error", "error": str(e)}, ensure_ascii=True), file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()