#!/usr/bin/env python3 import argparse import csv import datetime as dt import functools import json import math import os import re import subprocess import sys import urllib.parse import urllib.request DOMAIN_RE = re.compile(r"^(?=.{1,253}$)(?!-)[A-Za-z0-9-]{1,63}(?= 0") return idx ip_idx = col_index("ip", 0) sent_idx = col_index("sent", 1) received_idx = col_index("received", 2) loss_idx = col_index("loss_rate", 3) latency_idx = col_index("avg_latency", 4) speed_idx = col_index("download_speed", 5) region_idx = col_index("region", 6) out = [] for row in rows[header_rows:]: if ip_idx >= len(row): continue domain = normalize_domain(row[ip_idx]) if not domain: continue out.append( { "domain": domain, "ip": domain, "sent": row[sent_idx].strip() if sent_idx < len(row) else "", "received": row[received_idx].strip() if received_idx < len(row) else "", "loss_rate": row[loss_idx].strip() if loss_idx < len(row) else "", "avg_latency": row[latency_idx].strip() if latency_idx < len(row) else "", "download_speed": row[speed_idx].strip() if speed_idx < len(row) else "", "region": row[region_idx].strip() if region_idx < len(row) else "", } ) if not out: raise RuntimeError("cfst result parsed to zero valid rows") return out def flatten_values(value): out = [] if isinstance(value, str): out.append(value) elif isinstance(value, list): for item in value: out.extend(flatten_values(item)) elif isinstance(value, dict): for item in value.values(): out.extend(flatten_values(item)) return out def get_by_json_path(data, path): cur = data for part in path.split("."): if isinstance(cur, dict) and part in cur: cur = cur[part] else: return None return cur def get_values_by_path(data, path): parts = path.split(".") def walk(cur, idx): if idx >= len(parts): return [cur] part = parts[idx] if part.endswith("[]"): key = part[:-2] if isinstance(cur, dict): arr = cur.get(key) else: arr = None if not isinstance(arr, list): return [] out = [] for item in arr: out.extend(walk(item, idx + 1)) return out if isinstance(cur, dict) and part in cur: return walk(cur[part], idx + 1) return [] return walk(data, 0) def parse_domains(payload, parser_cfg): domains = [] for p in parser_cfg.get("field_paths", []): values = get_values_by_path(payload, p) domains.extend(flatten_values(values)) for p in parser_cfg.get("json_paths", []): v = get_by_json_path(payload, p) if v is not None: domains.extend(flatten_values(v)) if not domains: regex_s = parser_cfg.get("regex", r"[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}") text = json.dumps(payload, ensure_ascii=True) domains.extend(re.findall(regex_s, text)) clean = [] seen = set() for d in domains: d = str(d).strip().lower().rstrip(".") if (DOMAIN_RE.match(d) or IPV4_RE.match(d)) and d not in seen: seen.add(d) clean.append(d) return clean def parse_timezone(tz_raw): if tz_raw is None: return dt.timezone.utc s = str(tz_raw).strip().upper() if s in {"", "UTC", "Z", "+00:00", "+0000"}: return dt.timezone.utc m = re.match(r"^([+-])(\d{2}):?(\d{2})$", s) if not m: raise ValueError(f"invalid created_time_timezone: {tz_raw}") sign = 1 if m.group(1) == "+" else -1 hh = int(m.group(2)) mm = int(m.group(3)) if hh > 23 or mm > 59: raise ValueError(f"invalid created_time_timezone offset: {tz_raw}") return dt.timezone(sign * dt.timedelta(hours=hh, minutes=mm)) def parse_created_time(value, formats, timezone): if value is None: return None s = str(value).strip() if not s: return None for fmt in formats: try: parsed = dt.datetime.strptime(s, fmt) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone) return parsed.astimezone(dt.timezone.utc) except Exception: continue try: iso_text = s.replace("Z", "+00:00") parsed = dt.datetime.fromisoformat(iso_text) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone) return parsed.astimezone(dt.timezone.utc) except Exception: return None def normalize_domain(value): if value is None: return "" return str(value).strip().lower().rstrip(".") def to_float_or_none(value): try: f = float(value) if not math.isfinite(f): return None return f except Exception: return None def resolve_field(record, field_name, field_map): path = field_map.get(field_name) if not path: raise ValueError(f"field '{field_name}' is not registered in record_mapping.field_map") if not isinstance(record, dict): return None return get_by_json_path(record, path) def extract_records(payload, record_mapping): records_path = str(record_mapping.get("records_path", "")).strip() raw = get_values_by_path(payload, records_path) return [x for x in raw if isinstance(x, dict)] def validate_config(cfg): source_type = get_source_type(cfg) if source_type not in {"api", "cfst_local"}: raise ValueError("source.type must be 'api' or 'cfst_local'") output_cfg = cfg.get("output", {}) if output_cfg and not isinstance(output_cfg, dict): raise ValueError("output must be an object") if source_type == "cfst_local": cfst_cfg = cfg.get("cfst_local") if not isinstance(cfst_cfg, dict): raise ValueError("cfst_local is required and must be an object when source.type=cfst_local") work_dir = str(cfst_cfg.get("work_dir", "")).strip() if not work_dir: raise ValueError("cfst_local.work_dir is required") binary = str(cfst_cfg.get("binary", "")).strip() if not binary: raise ValueError("cfst_local.binary is required") result_file = str(cfst_cfg.get("result_file", "")).strip() if not result_file: raise ValueError("cfst_local.result_file is required") run_args = cfst_cfg.get("run_args", []) if not isinstance(run_args, list): raise ValueError("cfst_local.run_args must be an array") columns_cfg = cfst_cfg.get("columns", {}) if columns_cfg and not isinstance(columns_cfg, dict): raise ValueError("cfst_local.columns must be an object") return record_mapping = cfg.get("record_mapping") if not isinstance(record_mapping, dict): raise ValueError("record_mapping is required and must be an object") records_path = str(record_mapping.get("records_path", "")).strip() if not records_path: raise ValueError("record_mapping.records_path is required") field_map = record_mapping.get("field_map") if not isinstance(field_map, dict) or not field_map: raise ValueError("record_mapping.field_map is required and must be a non-empty object") for key, path in field_map.items(): if not str(key).strip() or not str(path).strip(): raise ValueError("record_mapping.field_map contains empty field name or path") for required in ["domain", "created_at"]: if required not in field_map: raise ValueError(f"record_mapping.field_map.{required} is required") created_time_formats = record_mapping.get("created_time_formats") if not isinstance(created_time_formats, list) or not created_time_formats: raise ValueError("record_mapping.created_time_formats is required and must be a non-empty array") for fmt in created_time_formats: if not str(fmt).strip(): raise ValueError("record_mapping.created_time_formats contains empty format") parse_timezone(record_mapping.get("created_time_timezone", "UTC")) def ensure_field_registered(field_name, where): if field_name not in field_map: raise ValueError(f"{where}: field '{field_name}' is not in record_mapping.field_map") record_filter = cfg.get("record_filter", {}) if record_filter.get("enabled", False): rules = record_filter.get("exclude_if_any", []) if not isinstance(rules, list): raise ValueError("record_filter.exclude_if_any must be an array") for i, rule in enumerate(rules): if not isinstance(rule, dict): raise ValueError(f"record_filter.exclude_if_any[{i}] must be an object") field_name = str(rule.get("field", "")).strip() if not field_name: raise ValueError(f"record_filter.exclude_if_any[{i}].field is required") ensure_field_registered(field_name, f"record_filter.exclude_if_any[{i}]") has_matcher = any(k in rule for k in ["contains", "equals", "regex"]) if not has_matcher: raise ValueError(f"record_filter.exclude_if_any[{i}] must include one of contains/equals/regex") scoring = cfg.get("scoring", {}) if scoring.get("enabled", False): strategy = str(scoring.get("strategy", "")).strip() if strategy not in {"weighted_average", "lexicographic"}: raise ValueError("scoring.strategy must be 'weighted_average' or 'lexicographic'") within_hours = to_float_or_none(scoring.get("within_hours", 24)) if within_hours is None or within_hours <= 0: raise ValueError("scoring.within_hours must be a positive number") if strategy == "weighted_average": weighted_fields = scoring.get("weighted_fields") if not isinstance(weighted_fields, list) or not weighted_fields: raise ValueError("scoring.weighted_fields is required for weighted_average strategy") for i, item in enumerate(weighted_fields): if not isinstance(item, dict): raise ValueError(f"scoring.weighted_fields[{i}] must be an object") field_name = str(item.get("field", "")).strip() if not field_name: raise ValueError(f"scoring.weighted_fields[{i}].field is required") ensure_field_registered(field_name, f"scoring.weighted_fields[{i}]") weight = to_float_or_none(item.get("weight")) if weight is None or weight <= 0: raise ValueError(f"scoring.weighted_fields[{i}].weight must be > 0") if strategy == "lexicographic": lex_fields = scoring.get("lexicographic_fields") if not isinstance(lex_fields, list) or not lex_fields: raise ValueError("scoring.lexicographic_fields is required for lexicographic strategy") for i, item in enumerate(lex_fields): if isinstance(item, str): field_name = item.strip() order = "" elif isinstance(item, dict): field_name = str(item.get("field", "")).strip() order = str(item.get("order", "")).strip().lower() else: raise ValueError(f"scoring.lexicographic_fields[{i}] must be string or object") if not field_name: raise ValueError(f"scoring.lexicographic_fields[{i}] field is required") ensure_field_registered(field_name, f"scoring.lexicographic_fields[{i}]") if order and order not in {"asc", "desc"}: raise ValueError(f"scoring.lexicographic_fields[{i}].order must be asc or desc") tie_breakers = scoring.get("tie_breakers", []) if tie_breakers is not None: if not isinstance(tie_breakers, list): raise ValueError("scoring.tie_breakers must be an array") for i, item in enumerate(tie_breakers): if not isinstance(item, dict): raise ValueError(f"scoring.tie_breakers[{i}] must be an object") field_name = str(item.get("field", "")).strip() order = str(item.get("order", "")).strip().lower() if not field_name: raise ValueError(f"scoring.tie_breakers[{i}].field is required") if order not in {"asc", "desc"}: raise ValueError(f"scoring.tie_breakers[{i}].order must be asc or desc") ensure_field_registered(field_name, f"scoring.tie_breakers[{i}]") def rule_matches(value, rule): if value is None or not isinstance(rule, dict): return False values = flatten_values(value) if not values: values = [value] case_sensitive = bool(rule.get("case_sensitive", False)) if "contains" in rule: needle = str(rule.get("contains", "")) if not needle: return False for item in values: hay = str(item) if case_sensitive: if needle in hay: return True else: if needle.lower() in hay.lower(): return True return False if "equals" in rule: target = str(rule.get("equals", "")) for item in values: item_s = str(item) if case_sensitive: if item_s == target: return True else: if item_s.lower() == target.lower(): return True return False if "regex" in rule: pattern = str(rule.get("regex", "")) if not pattern: return False flags = 0 if case_sensitive else re.IGNORECASE try: rx = re.compile(pattern, flags) except Exception: return False for item in values: if rx.search(str(item)): return True return False return False def collect_excluded_domains(records, field_map, record_filter_cfg): if not record_filter_cfg.get("enabled", False): return set() rules = record_filter_cfg.get("exclude_if_any", []) if not rules: return set() blocked = set() for record in records: domain = normalize_domain(resolve_field(record, "domain", field_map)) if not domain: continue for rule in rules: field_name = str(rule.get("field", "")).strip() if not field_name: continue value = resolve_field(record, field_name, field_map) if rule_matches(value, rule): blocked.add(domain) break return blocked def build_lexicographic_descriptors(scoring_cfg, prefer_lower): out = [] for item in scoring_cfg.get("lexicographic_fields", []): if isinstance(item, str): field_name = item.strip() order = "asc" if prefer_lower else "desc" else: field_name = str(item.get("field", "")).strip() order = str(item.get("order", "")).strip().lower() if not order: order = "asc" if prefer_lower else "desc" out.append({"field": field_name, "order": order}) return out def parse_scored_records(records, field_map, record_mapping_cfg, scoring_cfg): if not scoring_cfg.get("enabled", False): return [] strategy = str(scoring_cfg.get("strategy", "weighted_average")).strip() prefer_lower = bool(scoring_cfg.get("prefer_lower", False)) timezone = parse_timezone(record_mapping_cfg.get("created_time_timezone", "UTC")) time_formats = [str(x) for x in record_mapping_cfg.get("created_time_formats", [])] weighted_fields = scoring_cfg.get("weighted_fields", []) if strategy == "weighted_average" else [] lex_descriptors = build_lexicographic_descriptors(scoring_cfg, prefer_lower) if strategy == "lexicographic" else [] needed_fields = set() for item in weighted_fields: needed_fields.add(str(item.get("field", "")).strip()) for item in lex_descriptors: needed_fields.add(str(item.get("field", "")).strip()) for item in scoring_cfg.get("tie_breakers", []): needed_fields.add(str(item.get("field", "")).strip()) needed_fields.discard("domain") needed_fields.discard("created_at") out = [] for record in records: domain = normalize_domain(resolve_field(record, "domain", field_map)) if not domain: continue created_raw = resolve_field(record, "created_at", field_map) created_at = parse_created_time(created_raw, time_formats, timezone) field_values = {} for field_name in needed_fields: field_values[field_name] = resolve_field(record, field_name, field_map) score_value = None scores = [] lex_values = [] if strategy == "weighted_average": total = 0.0 total_weight = 0.0 missing = False for item in weighted_fields: field_name = str(item.get("field", "")).strip() weight = float(item.get("weight")) raw_v = resolve_field(record, field_name, field_map) val = to_float_or_none(raw_v) scores.append(val) if val is None: missing = True continue total += val * weight total_weight += weight if not missing and total_weight > 0: score_value = total / total_weight if strategy == "lexicographic": for item in lex_descriptors: field_name = item["field"] order = item["order"] raw_v = resolve_field(record, field_name, field_map) num_v = to_float_or_none(raw_v) v = num_v if num_v is not None else raw_v lex_values.append({"field": field_name, "value": v, "order": order}) scores.append(v) out.append( { "domain": domain, "created_at": created_at, "created_raw": created_raw, "scores": scores, "score_value": score_value, "lex_values": lex_values, "field_values": field_values, "raw_record": record, } ) return out def cmp_scalar(a, b, order): a_none = a is None b_none = b is None if a_none and b_none: return 0 if a_none: return 1 if b_none: return -1 if isinstance(a, dt.datetime): a = a.timestamp() if isinstance(b, dt.datetime): b = b.timestamp() a_num = to_float_or_none(a) b_num = to_float_or_none(b) if a_num is not None and b_num is not None: if a_num < b_num: base = -1 elif a_num > b_num: base = 1 else: base = 0 else: a_s = str(a).lower() b_s = str(b).lower() if a_s < b_s: base = -1 elif a_s > b_s: base = 1 else: base = 0 return base if order == "asc" else -base def get_sort_field_value(record, field_name): if field_name == "domain": return record.get("domain") if field_name == "created_at": return record.get("created_at") return record.get("field_values", {}).get(field_name) def rank_scored_records(records, scoring_cfg): if not records: return [] within_hours = float(scoring_cfg.get("within_hours", 24)) strategy = str(scoring_cfg.get("strategy", "weighted_average")).strip() prefer_lower = bool(scoring_cfg.get("prefer_lower", False)) tie_breakers = scoring_cfg.get("tie_breakers", []) now = dt.datetime.now(dt.timezone.utc) cutoff = now - dt.timedelta(hours=within_hours) recent = [r for r in records if r.get("created_at") is not None and r["created_at"] >= cutoff] candidates = recent if recent else records default_lex_order = "asc" if prefer_lower else "desc" def compare(a, b): if strategy == "weighted_average": order = "asc" if prefer_lower else "desc" c = cmp_scalar(a.get("score_value"), b.get("score_value"), order) if c != 0: return c elif strategy == "lexicographic": a_lex = a.get("lex_values", []) b_lex = b.get("lex_values", []) n = max(len(a_lex), len(b_lex)) for i in range(n): av = a_lex[i]["value"] if i < len(a_lex) else None bv = b_lex[i]["value"] if i < len(b_lex) else None order = default_lex_order if i < len(a_lex) and a_lex[i].get("order"): order = a_lex[i]["order"] c = cmp_scalar(av, bv, order) if c != 0: return c for item in tie_breakers: field_name = str(item.get("field", "")).strip() order = str(item.get("order", "asc")).strip().lower() av = get_sort_field_value(a, field_name) bv = get_sort_field_value(b, field_name) c = cmp_scalar(av, bv, order) if c != 0: return c return cmp_scalar(a.get("domain"), b.get("domain"), "asc") return sorted(candidates, key=functools.cmp_to_key(compare)) def apply_filter(domains, filter_cfg): include_suffixes = [s.lower() for s in filter_cfg.get("include_suffixes", []) if s] exclude_regex = [re.compile(x) for x in filter_cfg.get("exclude_regex", []) if x] out = [] for d in domains: if include_suffixes and not any(d.endswith(s) for s in include_suffixes): continue if any(rx.search(d) for rx in exclude_regex): continue out.append(d) return out def choose_top_candidate_domains(filtered_domains, top_n, ranked_scored): if ranked_scored: domains_by_score = [x["domain"] for x in ranked_scored] return domains_by_score[:top_n] return filtered_domains[:top_n] def text_or_blank(value): if value is None: return "" return str(value).strip() def set_if_nonempty_text(obj, key, value): text = text_or_blank(value) if text: obj[key] = text def base_top_candidate(domain, source_type): candidate = {"domain": domain, "source_type": source_type} if IPV4_RE.match(domain): candidate["ip"] = domain return candidate def maybe_resolve_field(record, field_name, field_map): if not isinstance(record, dict): return None if field_name not in field_map: return None return resolve_field(record, field_name, field_map) def build_cfst_candidate(row): domain = row.get("domain", "") candidate = base_top_candidate(domain=domain, source_type="cfst_local") set_if_nonempty_text(candidate, "ip", row.get("ip") or domain) set_if_nonempty_text(candidate, "loss_rate", row.get("loss_rate")) set_if_nonempty_text(candidate, "avg_latency", row.get("avg_latency")) set_if_nonempty_text(candidate, "download_speed", row.get("download_speed")) set_if_nonempty_text(candidate, "region", row.get("region")) return candidate def build_api_candidate(domain, record, field_map, scored_record=None): candidate = base_top_candidate(domain=domain, source_type="api") if record: set_if_nonempty_text(candidate, "created_raw", maybe_resolve_field(record, "created_at", field_map)) set_if_nonempty_text(candidate, "avg_latency", maybe_resolve_field(record, "avg_latency", field_map)) set_if_nonempty_text(candidate, "loss_rate", maybe_resolve_field(record, "avg_pkg_lost_rate", field_map)) set_if_nonempty_text(candidate, "download_speed", maybe_resolve_field(record, "download_speed", field_map)) location_country = text_or_blank(maybe_resolve_field(record, "location_country", field_map)) location_city = text_or_blank(maybe_resolve_field(record, "location_city", field_map)) region = "/".join([x for x in [location_country, location_city] if x]) set_if_nonempty_text(candidate, "region", region) if scored_record: if scored_record.get("score_value") is not None: candidate["score_value"] = scored_record.get("score_value") scores = list(scored_record.get("scores", [])) if scores: candidate["scores"] = scores if "created_raw" not in candidate: set_if_nonempty_text(candidate, "created_raw", scored_record.get("created_raw")) return candidate def build_top_candidates(source_type, candidate_domains, cfst_rows=None, records=None, field_map=None, ranked_scored=None): if source_type == "cfst_local": row_map = {} for row in cfst_rows or []: domain = row.get("domain", "") if domain and domain not in row_map: row_map[domain] = row return [build_cfst_candidate(row_map.get(d, {"domain": d, "ip": d})) for d in candidate_domains] ranked_map = {} for item in ranked_scored or []: domain = item.get("domain", "") if domain and domain not in ranked_map: ranked_map[domain] = item record_map = {} for record in records or []: domain = normalize_domain(maybe_resolve_field(record, "domain", field_map or {})) if domain and domain not in record_map: record_map[domain] = record out = [] for domain in candidate_domains: scored_record = ranked_map.get(domain) record = None if scored_record: record = scored_record.get("raw_record") if record is None: record = record_map.get(domain) out.append(build_api_candidate(domain, record, field_map or {}, scored_record=scored_record)) return out def run_notify(cmd, domain, status): if not cmd: return env = os.environ.copy() env["AUTODOMAIN"] = domain env["AUTODOMAIN_STATUS"] = status subprocess.run(cmd, shell=True, check=False, env=env) def choose_domain(filtered_domains, top_n, ranked_scored): if ranked_scored: top_domains = choose_top_candidate_domains(filtered_domains, top_n, ranked_scored) if top_domains: return top_domains[0], top_domains if filtered_domains: return filtered_domains[0], filtered_domains[:top_n] return None, [] def build_output_settings(output_cfg, config_path_abs): runtime_dir_cfg = output_cfg.get("runtime_dir", "./runtime") runtime_dir = resolve_path(os.path.dirname(config_path_abs), runtime_dir_cfg) selected_text_name = output_cfg.get("selected_value_file", output_cfg.get("current_domain_file", "current_domain.txt")) selected_json_name = output_cfg.get("selected_value_json", output_cfg.get("current_domain_json", "current_domain.json")) state_name = output_cfg.get("state_file", "state.json") vars_name = output_cfg.get("export_vars_file", output_cfg.get("substore_vars_file", "substore_vars.json")) return { "runtime_dir": runtime_dir, "selected_text_path": os.path.join(runtime_dir, selected_text_name), "selected_json_path": os.path.join(runtime_dir, selected_json_name), "state_path": os.path.join(runtime_dir, state_name), "vars_path": os.path.join(runtime_dir, vars_name), "selected_json_key": str(output_cfg.get("selected_value_json_key", "domain")).strip() or "domain", "state_last_good_key": str(output_cfg.get("state_last_good_key", "last_good_domain")).strip() or "last_good_domain", "vars_value_key": str(output_cfg.get("substore_value_key", "AUTO_DOMAIN")).strip() or "AUTO_DOMAIN", } def print_output_settings(config_path_abs, cfg): output_cfg = cfg.get("output", {}) settings = build_output_settings(output_cfg, config_path_abs) print(json.dumps(settings, ensure_ascii=True)) def main(): ap = argparse.ArgumentParser(description="Auto select preferred endpoint value") ap.add_argument("--config", default="config.server.json", help="Path to config JSON") ap.add_argument( "--print-output-settings", action="store_true", help="Print resolved output settings as JSON and exit", ) args = ap.parse_args() config_path_abs = os.path.abspath(args.config) if not os.path.exists(config_path_abs): print(json.dumps({"status": "error", "error": f"config file not found: {config_path_abs}"}, ensure_ascii=True), file=sys.stderr) sys.exit(1) cfg = read_json_file(config_path_abs) try: validate_config(cfg) except Exception as e: print(json.dumps({"status": "error", "error": f"invalid config: {e}"}, ensure_ascii=True), file=sys.stderr) sys.exit(1) if args.print_output_settings: print_output_settings(config_path_abs, cfg) return output_cfg = cfg.get("output", {}) output_settings = build_output_settings(output_cfg, config_path_abs) notify_cfg = cfg.get("notify", {}) selected_text_file = output_settings["selected_text_path"] selected_json_file = output_settings["selected_json_path"] state_file = output_settings["state_path"] vars_file = output_settings["vars_path"] selected_json_key = output_settings["selected_json_key"] state_last_good_key = output_settings["state_last_good_key"] vars_value_key = output_settings["vars_value_key"] state = read_json_file(state_file, default={}) last_good = state.get(state_last_good_key, "") source_type = get_source_type(cfg) try: top_n = int(cfg.get("selection", {}).get("top_n", 3)) payload = None if source_type == "cfst_local": cfst_rows = load_cfst_rows(cfg, config_path_abs) parsed = [row["domain"] for row in cfst_rows] filtered = apply_filter(parsed, cfg.get("domain_filter", {})) filtered_set = set(filtered) cfst_rows = [row for row in cfst_rows if row["domain"] in filtered_set] if not cfst_rows: raise RuntimeError("No valid IP available from cfst result after filtering") selected = cfst_rows[0]["domain"] candidate_domains = [row["domain"] for row in cfst_rows[:top_n]] top_candidates = build_top_candidates("cfst_local", candidate_domains, cfst_rows=cfst_rows) else: payload = fetch_api_json(cfg) parsed = parse_domains(payload, cfg.get("parser", {})) filtered = apply_filter(parsed, cfg.get("domain_filter", {})) record_mapping_cfg = cfg.get("record_mapping", {}) field_map = record_mapping_cfg.get("field_map", {}) records = extract_records(payload, record_mapping_cfg) record_filter_cfg = cfg.get("record_filter", {}) blocked_domains = collect_excluded_domains(records, field_map, record_filter_cfg) if blocked_domains: filtered = [d for d in filtered if d not in blocked_domains] scoring_cfg = cfg.get("scoring", {}) scored_records = parse_scored_records(records, field_map, record_mapping_cfg, scoring_cfg) filtered_set = set(filtered) scored_records = [r for r in scored_records if r["domain"] in filtered_set] ranked_scored = rank_scored_records(scored_records, scoring_cfg) selected, candidate_domains = choose_domain(filtered, top_n, ranked_scored) top_candidates = build_top_candidates( "api", candidate_domains, records=records, field_map=field_map, ranked_scored=ranked_scored, ) status = "ok" if not selected and last_good: selected = last_good status = "fallback_last_good" if not selected: if source_type == "cfst_local": raise RuntimeError("No valid IP available from cfst and no fallback in state") raise RuntimeError("No valid domain available from API and no fallback in state") write_text_file(selected_text_file, selected + "\n") current_json = { selected_json_key: selected, "updated_at": utc_now_iso(), "status": status, "source_type": source_type, "source_count": len(parsed), "top_candidates": top_candidates, } write_json_file(selected_json_file, current_json) write_json_file( vars_file, { vars_value_key: selected, "UPDATED_AT": current_json["updated_at"], "STATUS": status, }, ) new_state = { "updated_at": current_json["updated_at"], state_last_good_key: selected, "status": status, "source_count": len(parsed), "source_type": source_type, } write_json_file(state_file, new_state) run_notify(notify_cfg.get("command", ""), selected, status) print(json.dumps(current_json, ensure_ascii=True)) except Exception as e: now = utc_now_iso() err_state = { "updated_at": now, "status": "error", "error": str(e), state_last_good_key: last_good, "source_type": source_type, } write_json_file(state_file, err_state) if last_good: write_text_file(selected_text_file, last_good + "\n") write_json_file( selected_json_file, { selected_json_key: last_good, "updated_at": now, "status": "error_use_last_good", "error": str(e), "source_type": source_type, }, ) write_json_file( vars_file, { vars_value_key: last_good, "UPDATED_AT": now, "STATUS": "error_use_last_good", }, ) run_notify(notify_cfg.get("command", ""), last_good, "error_use_last_good") print(json.dumps({"status": "error_use_last_good", "error": str(e)}, ensure_ascii=True)) return print(json.dumps({"status": "error", "error": str(e)}, ensure_ascii=True), file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()