| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- #!/usr/bin/env python3
- import argparse
- import datetime as dt
- import json
- import os
- import re
- import socket
- import ssl
- import subprocess
- import sys
- import time
- import urllib.parse
- import urllib.request
- DOMAIN_RE = re.compile(r"^(?=.{1,253}$)(?!-)[A-Za-z0-9-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9-]{1,63}(?<!-))+$")
- IPV4_RE = re.compile(r"^(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(?:\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}$")
- def utc_now_iso():
- return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
- def read_json_file(path, default=None):
- if default is None:
- default = {}
- if not os.path.exists(path):
- return default
- with open(path, "r", encoding="utf-8") as f:
- return json.load(f)
- def write_json_file(path, data):
- os.makedirs(os.path.dirname(path), exist_ok=True)
- with open(path, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=True, indent=2)
- def write_text_file(path, data):
- os.makedirs(os.path.dirname(path), exist_ok=True)
- with open(path, "w", encoding="utf-8") as f:
- f.write(data)
- def build_url(base_url, params):
- if not params:
- return base_url
- parsed = urllib.parse.urlparse(base_url)
- current = urllib.parse.parse_qs(parsed.query)
- for k, v in params.items():
- current[k] = [str(v)]
- query = urllib.parse.urlencode(current, doseq=True)
- return urllib.parse.urlunparse(parsed._replace(query=query))
- def fetch_api_json(cfg):
- api = cfg["api"]
- url = build_url(api["url"], api.get("params", {}))
- method = api.get("method", "GET").upper()
- headers = api.get("headers", {})
- timeout = int(api.get("timeout_sec", 10))
- body_obj = api.get("body")
- body = None
- if body_obj is not None:
- body = json.dumps(body_obj).encode("utf-8")
- headers = {**headers, "Content-Type": "application/json"}
- req = urllib.request.Request(url=url, data=body, headers=headers, method=method)
- with urllib.request.urlopen(req, timeout=timeout) as resp:
- raw = resp.read().decode("utf-8", errors="replace")
- return json.loads(raw)
- def flatten_values(value):
- out = []
- if isinstance(value, str):
- out.append(value)
- elif isinstance(value, list):
- for item in value:
- out.extend(flatten_values(item))
- elif isinstance(value, dict):
- for item in value.values():
- out.extend(flatten_values(item))
- return out
- def get_by_json_path(data, path):
- cur = data
- for part in path.split("."):
- if isinstance(cur, dict) and part in cur:
- cur = cur[part]
- else:
- return None
- return cur
- def get_values_by_path(data, path):
- parts = path.split(".")
- def walk(cur, idx):
- if idx >= len(parts):
- return [cur]
- part = parts[idx]
- if part.endswith("[]"):
- key = part[:-2]
- if isinstance(cur, dict):
- arr = cur.get(key)
- else:
- arr = None
- if not isinstance(arr, list):
- return []
- out = []
- for item in arr:
- out.extend(walk(item, idx + 1))
- return out
- if isinstance(cur, dict) and part in cur:
- return walk(cur[part], idx + 1)
- return []
- return walk(data, 0)
- def parse_domains(payload, parser_cfg):
- domains = []
- for p in parser_cfg.get("field_paths", []):
- values = get_values_by_path(payload, p)
- domains.extend(flatten_values(values))
- for p in parser_cfg.get("json_paths", []):
- v = get_by_json_path(payload, p)
- if v is not None:
- domains.extend(flatten_values(v))
- if not domains:
- regex_s = parser_cfg.get("regex", r"[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}")
- text = json.dumps(payload, ensure_ascii=True)
- domains.extend(re.findall(regex_s, text))
- clean = []
- seen = set()
- for d in domains:
- d = d.strip().lower().rstrip(".")
- if (DOMAIN_RE.match(d) or IPV4_RE.match(d)) and d not in seen:
- seen.add(d)
- clean.append(d)
- return clean
- def parse_created_time(s):
- if not s:
- return None
- try:
- return dt.datetime.strptime(str(s).strip(), "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.timezone.utc)
- except Exception:
- return None
- def parse_scored_records(payload, scoring_cfg):
- if not scoring_cfg.get("enabled", False):
- return []
- records_path = scoring_cfg.get("records_path", "data.good[]")
- ip_field = scoring_cfg.get("ip_field", "ip")
- created_time_field = scoring_cfg.get("created_time_field", "createdTime")
- score_fields = scoring_cfg.get("score_fields", ["avgScore", "ydScore", "dxScore", "ltScore"])
- raw_records = get_values_by_path(payload, records_path)
- out = []
- for r in raw_records:
- if not isinstance(r, dict):
- continue
- domain = str(r.get(ip_field, "")).strip().lower().rstrip(".")
- if not domain:
- continue
- created = parse_created_time(r.get(created_time_field))
- scores = []
- for f in score_fields:
- v = r.get(f)
- try:
- scores.append(float(v))
- except Exception:
- scores.append(float("inf"))
- out.append(
- {
- "domain": domain,
- "created_at": created,
- "created_raw": r.get(created_time_field),
- "scores": scores,
- }
- )
- return out
- def rank_scored_records(records, scoring_cfg):
- if not records:
- return []
- within_hours = float(scoring_cfg.get("within_hours", 24))
- prefer_lower = bool(scoring_cfg.get("prefer_lower", True))
- use_api_order = bool(scoring_cfg.get("use_api_order", False))
- now = dt.datetime.now(dt.timezone.utc)
- cutoff = now - dt.timedelta(hours=within_hours)
- recent = [r for r in records if r.get("created_at") is not None and r["created_at"] >= cutoff]
- candidates = recent if recent else records
- if use_api_order:
- seen = set()
- ordered = []
- for r in candidates:
- d = r["domain"]
- if d in seen:
- continue
- seen.add(d)
- ordered.append(r)
- return ordered
- def key_lower(r):
- return tuple(r["scores"] + [r["domain"]])
- def key_higher(r):
- return tuple([-x if x != float("inf") else float("inf") for x in r["scores"]] + [r["domain"]])
- ranked = sorted(candidates, key=key_lower if prefer_lower else key_higher)
- return ranked
- def apply_filter(domains, filter_cfg):
- include_suffixes = [s.lower() for s in filter_cfg.get("include_suffixes", []) if s]
- exclude_regex = [re.compile(x) for x in filter_cfg.get("exclude_regex", []) if x]
- out = []
- for d in domains:
- if include_suffixes and not any(d.endswith(s) for s in include_suffixes):
- continue
- if any(rx.search(d) for rx in exclude_regex):
- continue
- out.append(d)
- return out
- def single_tls_check(domain, timeout_ms, port, tls_verify=True):
- start = time.perf_counter()
- timeout_sec = max(0.2, timeout_ms / 1000.0)
- try:
- infos = socket.getaddrinfo(domain, port, proto=socket.IPPROTO_TCP)
- if not infos:
- return False, None, "dns_empty"
- af, socktype, proto, _, sockaddr = infos[0]
- with socket.socket(af, socktype, proto) as sock:
- sock.settimeout(timeout_sec)
- sock.connect(sockaddr)
- if tls_verify:
- ctx = ssl.create_default_context()
- else:
- ctx = ssl._create_unverified_context()
- with ctx.wrap_socket(sock, server_hostname=domain) as ssock:
- ssock.do_handshake()
- elapsed = int((time.perf_counter() - start) * 1000)
- return True, elapsed, "ok"
- except Exception as e:
- return False, None, str(e)
- def check_domains(domains, hc_cfg):
- attempts = int(hc_cfg.get("attempts", 2))
- timeout_ms = int(hc_cfg.get("timeout_ms", 1800))
- port = int(hc_cfg.get("port", 443))
- tls_verify = bool(hc_cfg.get("tls_verify", True))
- results = []
- for d in domains:
- ok_count = 0
- latencies = []
- errors = []
- for _ in range(attempts):
- ok, latency, err = single_tls_check(d, timeout_ms, port, tls_verify=tls_verify)
- if ok:
- ok_count += 1
- latencies.append(latency)
- else:
- errors.append(err)
- success_ratio = ok_count / attempts if attempts else 0.0
- avg_latency = int(sum(latencies) / len(latencies)) if latencies else 999999
- results.append(
- {
- "domain": d,
- "success_ratio": success_ratio,
- "avg_latency_ms": avg_latency,
- "ok_count": ok_count,
- "attempts": attempts,
- "errors": errors[:3],
- }
- )
- results.sort(key=lambda x: (-x["success_ratio"], x["avg_latency_ms"], x["domain"]))
- return results
- def render_v2ray(template_file, output_file, token, domain):
- if not template_file or not output_file:
- return False
- if not os.path.exists(template_file):
- return False
- with open(template_file, "r", encoding="utf-8") as f:
- tpl = f.read()
- rendered = tpl.replace(token, domain)
- os.makedirs(os.path.dirname(output_file), exist_ok=True)
- with open(output_file, "w", encoding="utf-8") as f:
- f.write(rendered)
- return True
- def run_notify(cmd, domain, status):
- if not cmd:
- return
- env = os.environ.copy()
- env["AUTODOMAIN"] = domain
- env["AUTODOMAIN_STATUS"] = status
- subprocess.run(cmd, shell=True, check=False, env=env)
- def choose_domain(filtered_domains, check_results, top_n, ranked_scored):
- if ranked_scored:
- domains_by_score = [x["domain"] for x in ranked_scored]
- if check_results:
- check_map = {x["domain"]: x for x in check_results}
- top = []
- for d in domains_by_score:
- if d in check_map and check_map[d]["success_ratio"] > 0:
- top.append(check_map[d])
- if len(top) >= top_n:
- break
- if top:
- return top[0]["domain"], top
- score_only = [{"domain": x["domain"], "scores": x["scores"], "created_raw": x["created_raw"]} for x in ranked_scored[:top_n]]
- return score_only[0]["domain"], score_only
- top_scored = [{"domain": x["domain"], "scores": x["scores"], "created_raw": x["created_raw"]} for x in ranked_scored[:top_n]]
- if top_scored:
- return top_scored[0]["domain"], top_scored
- if check_results:
- top = [x for x in check_results if x["success_ratio"] > 0][:top_n]
- if top:
- return top[0]["domain"], top
- return None, check_results[:top_n]
- if filtered_domains:
- return filtered_domains[0], [{"domain": x} for x in filtered_domains[:top_n]]
- return None, []
- def main():
- ap = argparse.ArgumentParser(description="Auto select VMess preferred domain")
- ap.add_argument("--config", default="config.json", help="Path to config JSON")
- args = ap.parse_args()
- config_path_abs = os.path.abspath(args.config)
- if not os.path.exists(config_path_abs):
- print(json.dumps({"status": "error", "error": f"config file not found: {config_path_abs}"}, ensure_ascii=True), file=sys.stderr)
- sys.exit(1)
- cfg = read_json_file(config_path_abs)
- output_cfg = cfg.get("output", {})
- runtime_dir_cfg = output_cfg.get("runtime_dir", "./runtime")
- if os.path.isabs(runtime_dir_cfg):
- runtime_dir = runtime_dir_cfg
- else:
- runtime_dir = os.path.normpath(os.path.join(os.path.dirname(config_path_abs), runtime_dir_cfg))
- v2_cfg = cfg.get("v2ray", {})
- notify_cfg = cfg.get("notify", {})
- current_domain_file = os.path.join(runtime_dir, output_cfg.get("current_domain_file", "current_domain.txt"))
- current_domain_json = os.path.join(runtime_dir, output_cfg.get("current_domain_json", "current_domain.json"))
- state_file = os.path.join(runtime_dir, output_cfg.get("state_file", "state.json"))
- substore_vars_file = os.path.join(runtime_dir, output_cfg.get("substore_vars_file", "substore_vars.json"))
- state = read_json_file(state_file, default={})
- last_good = state.get("last_good_domain", "")
- try:
- payload = fetch_api_json(cfg)
- parsed = parse_domains(payload, cfg.get("parser", {}))
- filtered = apply_filter(parsed, cfg.get("domain_filter", {}))
- scored_records = parse_scored_records(payload, cfg.get("scoring", {}))
- scored_records = [r for r in scored_records if r["domain"] in set(filtered)]
- ranked_scored = rank_scored_records(scored_records, cfg.get("scoring", {}))
- check_results = []
- if cfg.get("healthcheck", {}).get("enabled", True):
- check_results = check_domains(filtered, cfg.get("healthcheck", {}))
- top_n = int(cfg.get("selection", {}).get("top_n", 3))
- selected, top_candidates = choose_domain(filtered, check_results, top_n, ranked_scored)
- status = "ok"
- if not selected and last_good:
- selected = last_good
- status = "fallback_last_good"
- if not selected:
- raise RuntimeError("No valid domain available from API and no fallback in state")
- write_text_file(current_domain_file, selected + "\n")
- current_json = {
- "domain": selected,
- "updated_at": utc_now_iso(),
- "status": status,
- "source_count": len(parsed),
- "checked_count": len(check_results),
- "top_candidates": top_candidates,
- }
- write_json_file(current_domain_json, current_json)
- write_json_file(
- substore_vars_file,
- {
- "AUTO_DOMAIN": selected,
- "UPDATED_AT": current_json["updated_at"],
- "STATUS": status,
- },
- )
- rendered = render_v2ray(
- template_file=v2_cfg.get("template_file", ""),
- output_file=v2_cfg.get("output_file", ""),
- token=v2_cfg.get("replace_token", "__AUTO_DOMAIN__"),
- domain=selected,
- )
- new_state = {
- "updated_at": current_json["updated_at"],
- "last_good_domain": selected,
- "status": status,
- "source_count": len(parsed),
- "checked_count": len(check_results),
- "rendered_v2ray": rendered,
- }
- write_json_file(state_file, new_state)
- run_notify(notify_cfg.get("command", ""), selected, status)
- print(json.dumps(current_json, ensure_ascii=True))
- except Exception as e:
- now = utc_now_iso()
- err_state = {
- "updated_at": now,
- "status": "error",
- "error": str(e),
- "last_good_domain": last_good,
- }
- write_json_file(state_file, err_state)
- if last_good:
- write_text_file(current_domain_file, last_good + "\n")
- write_json_file(
- current_domain_json,
- {
- "domain": last_good,
- "updated_at": now,
- "status": "error_use_last_good",
- "error": str(e),
- },
- )
- run_notify(notify_cfg.get("command", ""), last_good, "error_use_last_good")
- print(json.dumps({"status": "error_use_last_good", "error": str(e)}, ensure_ascii=True))
- return
- print(json.dumps({"status": "error", "error": str(e)}, ensure_ascii=True), file=sys.stderr)
- sys.exit(1)
- if __name__ == "__main__":
- main()
|