| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112 |
- #!/usr/bin/env python3
- import argparse
- import csv
- import datetime as dt
- import functools
- import json
- import math
- import os
- import re
- import socket
- import ssl
- import subprocess
- import sys
- import time
- import urllib.parse
- import urllib.request
- DOMAIN_RE = re.compile(r"^(?=.{1,253}$)(?!-)[A-Za-z0-9-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9-]{1,63}(?<!-))+$")
- IPV4_RE = re.compile(r"^(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(?:\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}$")
- def utc_now_iso():
- return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
- def read_json_file(path, default=None):
- if default is None:
- default = {}
- if not os.path.exists(path):
- return default
- with open(path, "r", encoding="utf-8") as f:
- return json.load(f)
- def write_json_file(path, data):
- os.makedirs(os.path.dirname(path), exist_ok=True)
- with open(path, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=True, indent=2)
- def write_text_file(path, data):
- os.makedirs(os.path.dirname(path), exist_ok=True)
- with open(path, "w", encoding="utf-8") as f:
- f.write(data)
- def build_url(base_url, params):
- if not params:
- return base_url
- parsed = urllib.parse.urlparse(base_url)
- current = urllib.parse.parse_qs(parsed.query)
- for k, v in params.items():
- current[k] = [str(v)]
- query = urllib.parse.urlencode(current, doseq=True)
- return urllib.parse.urlunparse(parsed._replace(query=query))
- def resolve_path(base_dir, path_value):
- path_text = str(path_value or "").strip()
- if not path_text:
- return ""
- if os.path.isabs(path_text):
- return os.path.normpath(path_text)
- return os.path.normpath(os.path.join(base_dir, path_text))
- def get_source_type(cfg):
- source_cfg = cfg.get("source", {})
- if isinstance(source_cfg, dict):
- source_type = str(source_cfg.get("type", "api")).strip().lower()
- if source_type:
- return source_type
- return "api"
- def fetch_api_json(cfg):
- api = cfg["api"]
- url = build_url(api["url"], api.get("params", {}))
- method = api.get("method", "GET").upper()
- headers = api.get("headers", {})
- timeout = int(api.get("timeout_sec", 10))
- body_obj = api.get("body")
- body = None
- if body_obj is not None:
- body = json.dumps(body_obj).encode("utf-8")
- headers = {**headers, "Content-Type": "application/json"}
- req = urllib.request.Request(url=url, data=body, headers=headers, method=method)
- with urllib.request.urlopen(req, timeout=timeout) as resp:
- raw = resp.read().decode("utf-8", errors="replace")
- return json.loads(raw)
- def load_cfst_rows(cfg, config_path_abs):
- cfst_cfg = cfg.get("cfst_local", {})
- config_dir = os.path.dirname(config_path_abs)
- work_dir = resolve_path(config_dir, cfst_cfg.get("work_dir", "./cfst"))
- binary_path = resolve_path(work_dir, cfst_cfg.get("binary", "./cfst"))
- result_file = resolve_path(work_dir, cfst_cfg.get("result_file", "result.csv"))
- encoding = str(cfst_cfg.get("encoding", "utf-8")).strip() or "utf-8"
- skip_run = bool(cfst_cfg.get("skip_run", False))
- timeout_sec = int(cfst_cfg.get("run_timeout_sec", 600))
- run_args = cfst_cfg.get("run_args", ["-o", os.path.basename(result_file)])
- if not isinstance(run_args, list):
- raise ValueError("cfst_local.run_args must be an array")
- command = [binary_path] + [str(x) for x in run_args]
- if not skip_run:
- completed = subprocess.run(
- command,
- cwd=work_dir,
- check=False,
- capture_output=True,
- text=True,
- encoding=encoding,
- errors="replace",
- timeout=timeout_sec,
- )
- if completed.returncode != 0:
- stderr = (completed.stderr or "").strip()
- stdout = (completed.stdout or "").strip()
- details = stderr or stdout or f"exit code {completed.returncode}"
- raise RuntimeError(f"cfst run failed: {details}")
- if not os.path.exists(result_file):
- raise RuntimeError(f"cfst result file not found: {result_file}")
- with open(result_file, "r", encoding=encoding, errors="replace", newline="") as f:
- reader = csv.reader(f)
- rows = [row for row in reader if any(str(col).strip() for col in row)]
- header_rows = int(cfst_cfg.get("header_rows", 1))
- if len(rows) <= header_rows:
- raise RuntimeError("cfst result has no data rows")
- columns_cfg = cfst_cfg.get("columns", {})
- if not isinstance(columns_cfg, dict):
- raise ValueError("cfst_local.columns must be an object")
- def col_index(name, default_index):
- raw = columns_cfg.get(name, default_index)
- try:
- idx = int(raw)
- except Exception as exc:
- raise ValueError(f"cfst_local.columns.{name} must be an integer") from exc
- if idx < 0:
- raise ValueError(f"cfst_local.columns.{name} must be >= 0")
- return idx
- ip_idx = col_index("ip", 0)
- sent_idx = col_index("sent", 1)
- received_idx = col_index("received", 2)
- loss_idx = col_index("loss_rate", 3)
- latency_idx = col_index("avg_latency", 4)
- speed_idx = col_index("download_speed", 5)
- region_idx = col_index("region", 6)
- out = []
- for row in rows[header_rows:]:
- if ip_idx >= len(row):
- continue
- domain = normalize_domain(row[ip_idx])
- if not domain:
- continue
- out.append(
- {
- "domain": domain,
- "ip": domain,
- "sent": row[sent_idx].strip() if sent_idx < len(row) else "",
- "received": row[received_idx].strip() if received_idx < len(row) else "",
- "loss_rate": row[loss_idx].strip() if loss_idx < len(row) else "",
- "avg_latency": row[latency_idx].strip() if latency_idx < len(row) else "",
- "download_speed": row[speed_idx].strip() if speed_idx < len(row) else "",
- "region": row[region_idx].strip() if region_idx < len(row) else "",
- }
- )
- if not out:
- raise RuntimeError("cfst result parsed to zero valid rows")
- return out
- def flatten_values(value):
- out = []
- if isinstance(value, str):
- out.append(value)
- elif isinstance(value, list):
- for item in value:
- out.extend(flatten_values(item))
- elif isinstance(value, dict):
- for item in value.values():
- out.extend(flatten_values(item))
- return out
- def get_by_json_path(data, path):
- cur = data
- for part in path.split("."):
- if isinstance(cur, dict) and part in cur:
- cur = cur[part]
- else:
- return None
- return cur
- def get_values_by_path(data, path):
- parts = path.split(".")
- def walk(cur, idx):
- if idx >= len(parts):
- return [cur]
- part = parts[idx]
- if part.endswith("[]"):
- key = part[:-2]
- if isinstance(cur, dict):
- arr = cur.get(key)
- else:
- arr = None
- if not isinstance(arr, list):
- return []
- out = []
- for item in arr:
- out.extend(walk(item, idx + 1))
- return out
- if isinstance(cur, dict) and part in cur:
- return walk(cur[part], idx + 1)
- return []
- return walk(data, 0)
- def parse_domains(payload, parser_cfg):
- domains = []
- for p in parser_cfg.get("field_paths", []):
- values = get_values_by_path(payload, p)
- domains.extend(flatten_values(values))
- for p in parser_cfg.get("json_paths", []):
- v = get_by_json_path(payload, p)
- if v is not None:
- domains.extend(flatten_values(v))
- if not domains:
- regex_s = parser_cfg.get("regex", r"[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}")
- text = json.dumps(payload, ensure_ascii=True)
- domains.extend(re.findall(regex_s, text))
- clean = []
- seen = set()
- for d in domains:
- d = str(d).strip().lower().rstrip(".")
- if (DOMAIN_RE.match(d) or IPV4_RE.match(d)) and d not in seen:
- seen.add(d)
- clean.append(d)
- return clean
- def parse_timezone(tz_raw):
- if tz_raw is None:
- return dt.timezone.utc
- s = str(tz_raw).strip().upper()
- if s in {"", "UTC", "Z", "+00:00", "+0000"}:
- return dt.timezone.utc
- m = re.match(r"^([+-])(\d{2}):?(\d{2})$", s)
- if not m:
- raise ValueError(f"invalid created_time_timezone: {tz_raw}")
- sign = 1 if m.group(1) == "+" else -1
- hh = int(m.group(2))
- mm = int(m.group(3))
- if hh > 23 or mm > 59:
- raise ValueError(f"invalid created_time_timezone offset: {tz_raw}")
- return dt.timezone(sign * dt.timedelta(hours=hh, minutes=mm))
- def parse_created_time(value, formats, timezone):
- if value is None:
- return None
- s = str(value).strip()
- if not s:
- return None
- for fmt in formats:
- try:
- parsed = dt.datetime.strptime(s, fmt)
- if parsed.tzinfo is None:
- parsed = parsed.replace(tzinfo=timezone)
- return parsed.astimezone(dt.timezone.utc)
- except Exception:
- continue
- try:
- iso_text = s.replace("Z", "+00:00")
- parsed = dt.datetime.fromisoformat(iso_text)
- if parsed.tzinfo is None:
- parsed = parsed.replace(tzinfo=timezone)
- return parsed.astimezone(dt.timezone.utc)
- except Exception:
- return None
- def normalize_domain(value):
- if value is None:
- return ""
- return str(value).strip().lower().rstrip(".")
- def to_float_or_none(value):
- try:
- f = float(value)
- if not math.isfinite(f):
- return None
- return f
- except Exception:
- return None
- def resolve_field(record, field_name, field_map):
- path = field_map.get(field_name)
- if not path:
- raise ValueError(f"field '{field_name}' is not registered in record_mapping.field_map")
- if not isinstance(record, dict):
- return None
- return get_by_json_path(record, path)
- def extract_records(payload, record_mapping):
- records_path = str(record_mapping.get("records_path", "")).strip()
- raw = get_values_by_path(payload, records_path)
- return [x for x in raw if isinstance(x, dict)]
- def validate_config(cfg):
- source_type = get_source_type(cfg)
- if source_type not in {"api", "cfst_local"}:
- raise ValueError("source.type must be 'api' or 'cfst_local'")
- output_cfg = cfg.get("output", {})
- if output_cfg and not isinstance(output_cfg, dict):
- raise ValueError("output must be an object")
- if source_type == "cfst_local":
- cfst_cfg = cfg.get("cfst_local")
- if not isinstance(cfst_cfg, dict):
- raise ValueError("cfst_local is required and must be an object when source.type=cfst_local")
- work_dir = str(cfst_cfg.get("work_dir", "")).strip()
- if not work_dir:
- raise ValueError("cfst_local.work_dir is required")
- binary = str(cfst_cfg.get("binary", "")).strip()
- if not binary:
- raise ValueError("cfst_local.binary is required")
- result_file = str(cfst_cfg.get("result_file", "")).strip()
- if not result_file:
- raise ValueError("cfst_local.result_file is required")
- run_args = cfst_cfg.get("run_args", [])
- if not isinstance(run_args, list):
- raise ValueError("cfst_local.run_args must be an array")
- columns_cfg = cfst_cfg.get("columns", {})
- if columns_cfg and not isinstance(columns_cfg, dict):
- raise ValueError("cfst_local.columns must be an object")
- return
- record_mapping = cfg.get("record_mapping")
- if not isinstance(record_mapping, dict):
- raise ValueError("record_mapping is required and must be an object")
- records_path = str(record_mapping.get("records_path", "")).strip()
- if not records_path:
- raise ValueError("record_mapping.records_path is required")
- field_map = record_mapping.get("field_map")
- if not isinstance(field_map, dict) or not field_map:
- raise ValueError("record_mapping.field_map is required and must be a non-empty object")
- for key, path in field_map.items():
- if not str(key).strip() or not str(path).strip():
- raise ValueError("record_mapping.field_map contains empty field name or path")
- for required in ["domain", "created_at"]:
- if required not in field_map:
- raise ValueError(f"record_mapping.field_map.{required} is required")
- created_time_formats = record_mapping.get("created_time_formats")
- if not isinstance(created_time_formats, list) or not created_time_formats:
- raise ValueError("record_mapping.created_time_formats is required and must be a non-empty array")
- for fmt in created_time_formats:
- if not str(fmt).strip():
- raise ValueError("record_mapping.created_time_formats contains empty format")
- parse_timezone(record_mapping.get("created_time_timezone", "UTC"))
- def ensure_field_registered(field_name, where):
- if field_name not in field_map:
- raise ValueError(f"{where}: field '{field_name}' is not in record_mapping.field_map")
- record_filter = cfg.get("record_filter", {})
- if record_filter.get("enabled", False):
- rules = record_filter.get("exclude_if_any", [])
- if not isinstance(rules, list):
- raise ValueError("record_filter.exclude_if_any must be an array")
- for i, rule in enumerate(rules):
- if not isinstance(rule, dict):
- raise ValueError(f"record_filter.exclude_if_any[{i}] must be an object")
- field_name = str(rule.get("field", "")).strip()
- if not field_name:
- raise ValueError(f"record_filter.exclude_if_any[{i}].field is required")
- ensure_field_registered(field_name, f"record_filter.exclude_if_any[{i}]")
- has_matcher = any(k in rule for k in ["contains", "equals", "regex"])
- if not has_matcher:
- raise ValueError(f"record_filter.exclude_if_any[{i}] must include one of contains/equals/regex")
- scoring = cfg.get("scoring", {})
- if scoring.get("enabled", False):
- strategy = str(scoring.get("strategy", "")).strip()
- if strategy not in {"weighted_average", "lexicographic"}:
- raise ValueError("scoring.strategy must be 'weighted_average' or 'lexicographic'")
- within_hours = to_float_or_none(scoring.get("within_hours", 24))
- if within_hours is None or within_hours <= 0:
- raise ValueError("scoring.within_hours must be a positive number")
- if strategy == "weighted_average":
- weighted_fields = scoring.get("weighted_fields")
- if not isinstance(weighted_fields, list) or not weighted_fields:
- raise ValueError("scoring.weighted_fields is required for weighted_average strategy")
- for i, item in enumerate(weighted_fields):
- if not isinstance(item, dict):
- raise ValueError(f"scoring.weighted_fields[{i}] must be an object")
- field_name = str(item.get("field", "")).strip()
- if not field_name:
- raise ValueError(f"scoring.weighted_fields[{i}].field is required")
- ensure_field_registered(field_name, f"scoring.weighted_fields[{i}]")
- weight = to_float_or_none(item.get("weight"))
- if weight is None or weight <= 0:
- raise ValueError(f"scoring.weighted_fields[{i}].weight must be > 0")
- if strategy == "lexicographic":
- lex_fields = scoring.get("lexicographic_fields")
- if not isinstance(lex_fields, list) or not lex_fields:
- raise ValueError("scoring.lexicographic_fields is required for lexicographic strategy")
- for i, item in enumerate(lex_fields):
- if isinstance(item, str):
- field_name = item.strip()
- order = ""
- elif isinstance(item, dict):
- field_name = str(item.get("field", "")).strip()
- order = str(item.get("order", "")).strip().lower()
- else:
- raise ValueError(f"scoring.lexicographic_fields[{i}] must be string or object")
- if not field_name:
- raise ValueError(f"scoring.lexicographic_fields[{i}] field is required")
- ensure_field_registered(field_name, f"scoring.lexicographic_fields[{i}]")
- if order and order not in {"asc", "desc"}:
- raise ValueError(f"scoring.lexicographic_fields[{i}].order must be asc or desc")
- tie_breakers = scoring.get("tie_breakers", [])
- if tie_breakers is not None:
- if not isinstance(tie_breakers, list):
- raise ValueError("scoring.tie_breakers must be an array")
- for i, item in enumerate(tie_breakers):
- if not isinstance(item, dict):
- raise ValueError(f"scoring.tie_breakers[{i}] must be an object")
- field_name = str(item.get("field", "")).strip()
- order = str(item.get("order", "")).strip().lower()
- if not field_name:
- raise ValueError(f"scoring.tie_breakers[{i}].field is required")
- if order not in {"asc", "desc"}:
- raise ValueError(f"scoring.tie_breakers[{i}].order must be asc or desc")
- ensure_field_registered(field_name, f"scoring.tie_breakers[{i}]")
- def rule_matches(value, rule):
- if value is None or not isinstance(rule, dict):
- return False
- values = flatten_values(value)
- if not values:
- values = [value]
- case_sensitive = bool(rule.get("case_sensitive", False))
- if "contains" in rule:
- needle = str(rule.get("contains", ""))
- if not needle:
- return False
- for item in values:
- hay = str(item)
- if case_sensitive:
- if needle in hay:
- return True
- else:
- if needle.lower() in hay.lower():
- return True
- return False
- if "equals" in rule:
- target = str(rule.get("equals", ""))
- for item in values:
- item_s = str(item)
- if case_sensitive:
- if item_s == target:
- return True
- else:
- if item_s.lower() == target.lower():
- return True
- return False
- if "regex" in rule:
- pattern = str(rule.get("regex", ""))
- if not pattern:
- return False
- flags = 0 if case_sensitive else re.IGNORECASE
- try:
- rx = re.compile(pattern, flags)
- except Exception:
- return False
- for item in values:
- if rx.search(str(item)):
- return True
- return False
- return False
- def collect_excluded_domains(records, field_map, record_filter_cfg):
- if not record_filter_cfg.get("enabled", False):
- return set()
- rules = record_filter_cfg.get("exclude_if_any", [])
- if not rules:
- return set()
- blocked = set()
- for record in records:
- domain = normalize_domain(resolve_field(record, "domain", field_map))
- if not domain:
- continue
- for rule in rules:
- field_name = str(rule.get("field", "")).strip()
- if not field_name:
- continue
- value = resolve_field(record, field_name, field_map)
- if rule_matches(value, rule):
- blocked.add(domain)
- break
- return blocked
- def build_lexicographic_descriptors(scoring_cfg, prefer_lower):
- out = []
- for item in scoring_cfg.get("lexicographic_fields", []):
- if isinstance(item, str):
- field_name = item.strip()
- order = "asc" if prefer_lower else "desc"
- else:
- field_name = str(item.get("field", "")).strip()
- order = str(item.get("order", "")).strip().lower()
- if not order:
- order = "asc" if prefer_lower else "desc"
- out.append({"field": field_name, "order": order})
- return out
- def parse_scored_records(records, field_map, record_mapping_cfg, scoring_cfg):
- if not scoring_cfg.get("enabled", False):
- return []
- strategy = str(scoring_cfg.get("strategy", "weighted_average")).strip()
- prefer_lower = bool(scoring_cfg.get("prefer_lower", False))
- timezone = parse_timezone(record_mapping_cfg.get("created_time_timezone", "UTC"))
- time_formats = [str(x) for x in record_mapping_cfg.get("created_time_formats", [])]
- weighted_fields = scoring_cfg.get("weighted_fields", []) if strategy == "weighted_average" else []
- lex_descriptors = build_lexicographic_descriptors(scoring_cfg, prefer_lower) if strategy == "lexicographic" else []
- needed_fields = set()
- for item in weighted_fields:
- needed_fields.add(str(item.get("field", "")).strip())
- for item in lex_descriptors:
- needed_fields.add(str(item.get("field", "")).strip())
- for item in scoring_cfg.get("tie_breakers", []):
- needed_fields.add(str(item.get("field", "")).strip())
- needed_fields.discard("domain")
- needed_fields.discard("created_at")
- out = []
- for record in records:
- domain = normalize_domain(resolve_field(record, "domain", field_map))
- if not domain:
- continue
- created_raw = resolve_field(record, "created_at", field_map)
- created_at = parse_created_time(created_raw, time_formats, timezone)
- field_values = {}
- for field_name in needed_fields:
- field_values[field_name] = resolve_field(record, field_name, field_map)
- score_value = None
- scores = []
- lex_values = []
- if strategy == "weighted_average":
- total = 0.0
- total_weight = 0.0
- missing = False
- for item in weighted_fields:
- field_name = str(item.get("field", "")).strip()
- weight = float(item.get("weight"))
- raw_v = resolve_field(record, field_name, field_map)
- val = to_float_or_none(raw_v)
- scores.append(val)
- if val is None:
- missing = True
- continue
- total += val * weight
- total_weight += weight
- if not missing and total_weight > 0:
- score_value = total / total_weight
- if strategy == "lexicographic":
- for item in lex_descriptors:
- field_name = item["field"]
- order = item["order"]
- raw_v = resolve_field(record, field_name, field_map)
- num_v = to_float_or_none(raw_v)
- v = num_v if num_v is not None else raw_v
- lex_values.append({"field": field_name, "value": v, "order": order})
- scores.append(v)
- out.append(
- {
- "domain": domain,
- "created_at": created_at,
- "created_raw": created_raw,
- "scores": scores,
- "score_value": score_value,
- "lex_values": lex_values,
- "field_values": field_values,
- }
- )
- return out
- def cmp_scalar(a, b, order):
- a_none = a is None
- b_none = b is None
- if a_none and b_none:
- return 0
- if a_none:
- return 1
- if b_none:
- return -1
- if isinstance(a, dt.datetime):
- a = a.timestamp()
- if isinstance(b, dt.datetime):
- b = b.timestamp()
- a_num = to_float_or_none(a)
- b_num = to_float_or_none(b)
- if a_num is not None and b_num is not None:
- if a_num < b_num:
- base = -1
- elif a_num > b_num:
- base = 1
- else:
- base = 0
- else:
- a_s = str(a).lower()
- b_s = str(b).lower()
- if a_s < b_s:
- base = -1
- elif a_s > b_s:
- base = 1
- else:
- base = 0
- return base if order == "asc" else -base
- def get_sort_field_value(record, field_name):
- if field_name == "domain":
- return record.get("domain")
- if field_name == "created_at":
- return record.get("created_at")
- return record.get("field_values", {}).get(field_name)
- def rank_scored_records(records, scoring_cfg):
- if not records:
- return []
- within_hours = float(scoring_cfg.get("within_hours", 24))
- strategy = str(scoring_cfg.get("strategy", "weighted_average")).strip()
- prefer_lower = bool(scoring_cfg.get("prefer_lower", False))
- tie_breakers = scoring_cfg.get("tie_breakers", [])
- now = dt.datetime.now(dt.timezone.utc)
- cutoff = now - dt.timedelta(hours=within_hours)
- recent = [r for r in records if r.get("created_at") is not None and r["created_at"] >= cutoff]
- candidates = recent if recent else records
- default_lex_order = "asc" if prefer_lower else "desc"
- def compare(a, b):
- if strategy == "weighted_average":
- order = "asc" if prefer_lower else "desc"
- c = cmp_scalar(a.get("score_value"), b.get("score_value"), order)
- if c != 0:
- return c
- elif strategy == "lexicographic":
- a_lex = a.get("lex_values", [])
- b_lex = b.get("lex_values", [])
- n = max(len(a_lex), len(b_lex))
- for i in range(n):
- av = a_lex[i]["value"] if i < len(a_lex) else None
- bv = b_lex[i]["value"] if i < len(b_lex) else None
- order = default_lex_order
- if i < len(a_lex) and a_lex[i].get("order"):
- order = a_lex[i]["order"]
- c = cmp_scalar(av, bv, order)
- if c != 0:
- return c
- for item in tie_breakers:
- field_name = str(item.get("field", "")).strip()
- order = str(item.get("order", "asc")).strip().lower()
- av = get_sort_field_value(a, field_name)
- bv = get_sort_field_value(b, field_name)
- c = cmp_scalar(av, bv, order)
- if c != 0:
- return c
- return cmp_scalar(a.get("domain"), b.get("domain"), "asc")
- return sorted(candidates, key=functools.cmp_to_key(compare))
- def apply_filter(domains, filter_cfg):
- include_suffixes = [s.lower() for s in filter_cfg.get("include_suffixes", []) if s]
- exclude_regex = [re.compile(x) for x in filter_cfg.get("exclude_regex", []) if x]
- out = []
- for d in domains:
- if include_suffixes and not any(d.endswith(s) for s in include_suffixes):
- continue
- if any(rx.search(d) for rx in exclude_regex):
- continue
- out.append(d)
- return out
- def single_tls_check(domain, timeout_ms, port, tls_verify=True):
- start = time.perf_counter()
- timeout_sec = max(0.2, timeout_ms / 1000.0)
- try:
- infos = socket.getaddrinfo(domain, port, proto=socket.IPPROTO_TCP)
- if not infos:
- return False, None, "dns_empty"
- af, socktype, proto, _, sockaddr = infos[0]
- with socket.socket(af, socktype, proto) as sock:
- sock.settimeout(timeout_sec)
- sock.connect(sockaddr)
- if tls_verify:
- ctx = ssl.create_default_context()
- else:
- ctx = ssl._create_unverified_context()
- with ctx.wrap_socket(sock, server_hostname=domain) as ssock:
- ssock.do_handshake()
- elapsed = int((time.perf_counter() - start) * 1000)
- return True, elapsed, "ok"
- except Exception as e:
- return False, None, str(e)
- def check_domains(domains, hc_cfg):
- attempts = int(hc_cfg.get("attempts", 5))
- timeout_ms = int(hc_cfg.get("timeout_ms", 1800))
- port = int(hc_cfg.get("port", 443))
- tls_verify = bool(hc_cfg.get("tls_verify", True))
- results = []
- for d in domains:
- ok_count = 0
- latencies = []
- errors = []
- for _ in range(attempts):
- ok, latency, err = single_tls_check(d, timeout_ms, port, tls_verify=tls_verify)
- if ok:
- ok_count += 1
- latencies.append(latency)
- else:
- errors.append(err)
- success_ratio = ok_count / attempts if attempts else 0.0
- avg_latency = int(sum(latencies) / len(latencies)) if latencies else 999999
- results.append(
- {
- "domain": d,
- "success_ratio": success_ratio,
- "avg_latency_ms": avg_latency,
- "ok_count": ok_count,
- "attempts": attempts,
- "errors": errors[:3],
- }
- )
- results.sort(key=lambda x: (-x["success_ratio"], x["avg_latency_ms"], x["domain"]))
- return results
- def render_v2ray(template_file, output_file, token, domain):
- if not template_file or not output_file:
- return False
- if not os.path.exists(template_file):
- return False
- with open(template_file, "r", encoding="utf-8") as f:
- tpl = f.read()
- rendered = tpl.replace(token, domain)
- os.makedirs(os.path.dirname(output_file), exist_ok=True)
- with open(output_file, "w", encoding="utf-8") as f:
- f.write(rendered)
- return True
- def run_notify(cmd, domain, status):
- if not cmd:
- return
- env = os.environ.copy()
- env["AUTODOMAIN"] = domain
- env["AUTODOMAIN_STATUS"] = status
- subprocess.run(cmd, shell=True, check=False, env=env)
- def choose_domain(filtered_domains, check_results, top_n, ranked_scored):
- if ranked_scored:
- domains_by_score = [x["domain"] for x in ranked_scored]
- if check_results:
- check_map = {x["domain"]: x for x in check_results}
- top = []
- for d in domains_by_score:
- if d in check_map and check_map[d]["success_ratio"] > 0:
- top.append(check_map[d])
- if len(top) >= top_n:
- break
- if top:
- return top[0]["domain"], top
- score_only = [
- {
- "domain": x["domain"],
- "score_value": x.get("score_value"),
- "scores": x.get("scores", []),
- "created_raw": x.get("created_raw"),
- }
- for x in ranked_scored[:top_n]
- ]
- return score_only[0]["domain"], score_only
- top_scored = [
- {
- "domain": x["domain"],
- "score_value": x.get("score_value"),
- "scores": x.get("scores", []),
- "created_raw": x.get("created_raw"),
- }
- for x in ranked_scored[:top_n]
- ]
- if top_scored:
- return top_scored[0]["domain"], top_scored
- if check_results:
- top = [x for x in check_results if x["success_ratio"] > 0][:top_n]
- if top:
- return top[0]["domain"], top
- return None, check_results[:top_n]
- if filtered_domains:
- return filtered_domains[0], [{"domain": x} for x in filtered_domains[:top_n]]
- return None, []
- def build_output_settings(output_cfg, config_path_abs):
- runtime_dir_cfg = output_cfg.get("runtime_dir", "./runtime")
- runtime_dir = resolve_path(os.path.dirname(config_path_abs), runtime_dir_cfg)
- selected_text_name = output_cfg.get("selected_value_file", output_cfg.get("current_domain_file", "current_domain.txt"))
- selected_json_name = output_cfg.get("selected_value_json", output_cfg.get("current_domain_json", "current_domain.json"))
- state_name = output_cfg.get("state_file", "state.json")
- vars_name = output_cfg.get("export_vars_file", output_cfg.get("substore_vars_file", "substore_vars.json"))
- return {
- "runtime_dir": runtime_dir,
- "selected_text_path": os.path.join(runtime_dir, selected_text_name),
- "selected_json_path": os.path.join(runtime_dir, selected_json_name),
- "state_path": os.path.join(runtime_dir, state_name),
- "vars_path": os.path.join(runtime_dir, vars_name),
- "selected_json_key": str(output_cfg.get("selected_value_json_key", "domain")).strip() or "domain",
- "state_last_good_key": str(output_cfg.get("state_last_good_key", "last_good_domain")).strip() or "last_good_domain",
- "vars_value_key": str(output_cfg.get("substore_value_key", "AUTO_DOMAIN")).strip() or "AUTO_DOMAIN",
- }
- def print_output_settings(config_path_abs, cfg):
- output_cfg = cfg.get("output", {})
- settings = build_output_settings(output_cfg, config_path_abs)
- print(json.dumps(settings, ensure_ascii=True))
- def main():
- ap = argparse.ArgumentParser(description="Auto select VMess preferred domain")
- ap.add_argument("--config", default="config.server.json", help="Path to config JSON")
- ap.add_argument(
- "--print-output-settings",
- action="store_true",
- help="Print resolved output settings as JSON and exit",
- )
- args = ap.parse_args()
- config_path_abs = os.path.abspath(args.config)
- if not os.path.exists(config_path_abs):
- print(json.dumps({"status": "error", "error": f"config file not found: {config_path_abs}"}, ensure_ascii=True), file=sys.stderr)
- sys.exit(1)
- cfg = read_json_file(config_path_abs)
- try:
- validate_config(cfg)
- except Exception as e:
- print(json.dumps({"status": "error", "error": f"invalid config: {e}"}, ensure_ascii=True), file=sys.stderr)
- sys.exit(1)
- if args.print_output_settings:
- print_output_settings(config_path_abs, cfg)
- return
- output_cfg = cfg.get("output", {})
- output_settings = build_output_settings(output_cfg, config_path_abs)
- v2_cfg = cfg.get("v2ray", {})
- notify_cfg = cfg.get("notify", {})
- selected_text_file = output_settings["selected_text_path"]
- selected_json_file = output_settings["selected_json_path"]
- state_file = output_settings["state_path"]
- vars_file = output_settings["vars_path"]
- selected_json_key = output_settings["selected_json_key"]
- state_last_good_key = output_settings["state_last_good_key"]
- vars_value_key = output_settings["vars_value_key"]
- state = read_json_file(state_file, default={})
- last_good = state.get(state_last_good_key, "")
- source_type = get_source_type(cfg)
- try:
- top_n = int(cfg.get("selection", {}).get("top_n", 3))
- check_results = []
- payload = None
- if source_type == "cfst_local":
- cfst_rows = load_cfst_rows(cfg, config_path_abs)
- parsed = [row["domain"] for row in cfst_rows]
- filtered = apply_filter(parsed, cfg.get("domain_filter", {}))
- filtered_set = set(filtered)
- cfst_rows = [row for row in cfst_rows if row["domain"] in filtered_set]
- if not cfst_rows:
- raise RuntimeError("No valid IP available from cfst result after filtering")
- if cfg.get("healthcheck", {}).get("enabled", False):
- check_results = check_domains(filtered, cfg.get("healthcheck", {}))
- selected, _ = choose_domain(filtered, check_results, top_n, [])
- top_candidates = cfst_rows[:top_n]
- else:
- selected = cfst_rows[0]["domain"]
- top_candidates = cfst_rows[:top_n]
- else:
- payload = fetch_api_json(cfg)
- parsed = parse_domains(payload, cfg.get("parser", {}))
- filtered = apply_filter(parsed, cfg.get("domain_filter", {}))
- record_mapping_cfg = cfg.get("record_mapping", {})
- field_map = record_mapping_cfg.get("field_map", {})
- records = extract_records(payload, record_mapping_cfg)
- record_filter_cfg = cfg.get("record_filter", {})
- blocked_domains = collect_excluded_domains(records, field_map, record_filter_cfg)
- if blocked_domains:
- filtered = [d for d in filtered if d not in blocked_domains]
- scoring_cfg = cfg.get("scoring", {})
- scored_records = parse_scored_records(records, field_map, record_mapping_cfg, scoring_cfg)
- filtered_set = set(filtered)
- scored_records = [r for r in scored_records if r["domain"] in filtered_set]
- ranked_scored = rank_scored_records(scored_records, scoring_cfg)
- if cfg.get("healthcheck", {}).get("enabled", True):
- check_results = check_domains(filtered, cfg.get("healthcheck", {}))
- selected, top_candidates = choose_domain(filtered, check_results, top_n, ranked_scored)
- status = "ok"
- if not selected and last_good:
- selected = last_good
- status = "fallback_last_good"
- if not selected:
- if source_type == "cfst_local":
- raise RuntimeError("No valid IP available from cfst and no fallback in state")
- raise RuntimeError("No valid domain available from API and no fallback in state")
- write_text_file(selected_text_file, selected + "\n")
- current_json = {
- selected_json_key: selected,
- "updated_at": utc_now_iso(),
- "status": status,
- "source_type": source_type,
- "source_count": len(parsed),
- "checked_count": len(check_results),
- "top_candidates": top_candidates,
- }
- write_json_file(selected_json_file, current_json)
- write_json_file(
- vars_file,
- {
- vars_value_key: selected,
- "UPDATED_AT": current_json["updated_at"],
- "STATUS": status,
- },
- )
- rendered = render_v2ray(
- template_file=v2_cfg.get("template_file", ""),
- output_file=v2_cfg.get("output_file", ""),
- token=v2_cfg.get("replace_token", "__AUTO_DOMAIN__"),
- domain=selected,
- )
- new_state = {
- "updated_at": current_json["updated_at"],
- state_last_good_key: selected,
- "status": status,
- "source_count": len(parsed),
- "checked_count": len(check_results),
- "rendered_v2ray": rendered,
- "source_type": source_type,
- }
- write_json_file(state_file, new_state)
- run_notify(notify_cfg.get("command", ""), selected, status)
- print(json.dumps(current_json, ensure_ascii=True))
- except Exception as e:
- now = utc_now_iso()
- err_state = {
- "updated_at": now,
- "status": "error",
- "error": str(e),
- state_last_good_key: last_good,
- "source_type": source_type,
- }
- write_json_file(state_file, err_state)
- if last_good:
- write_text_file(selected_text_file, last_good + "\n")
- write_json_file(
- selected_json_file,
- {
- selected_json_key: last_good,
- "updated_at": now,
- "status": "error_use_last_good",
- "error": str(e),
- "source_type": source_type,
- },
- )
- run_notify(notify_cfg.get("command", ""), last_good, "error_use_last_good")
- print(json.dumps({"status": "error_use_last_good", "error": str(e)}, ensure_ascii=True))
- return
- print(json.dumps({"status": "error", "error": str(e)}, ensure_ascii=True), file=sys.stderr)
- sys.exit(1)
- if __name__ == "__main__":
- main()
|