|
|
@@ -7,11 +7,8 @@ import json
|
|
|
import math
|
|
|
import os
|
|
|
import re
|
|
|
-import socket
|
|
|
-import ssl
|
|
|
import subprocess
|
|
|
import sys
|
|
|
-import time
|
|
|
import urllib.parse
|
|
|
import urllib.request
|
|
|
|
|
|
@@ -669,6 +666,7 @@ def parse_scored_records(records, field_map, record_mapping_cfg, scoring_cfg):
|
|
|
"score_value": score_value,
|
|
|
"lex_values": lex_values,
|
|
|
"field_values": field_values,
|
|
|
+ "raw_record": record,
|
|
|
}
|
|
|
)
|
|
|
return out
|
|
|
@@ -784,65 +782,114 @@ def apply_filter(domains, filter_cfg):
|
|
|
return out
|
|
|
|
|
|
|
|
|
-def single_tls_check(domain, timeout_ms, port, tls_verify=True):
|
|
|
- start = time.perf_counter()
|
|
|
- timeout_sec = max(0.2, timeout_ms / 1000.0)
|
|
|
- try:
|
|
|
- infos = socket.getaddrinfo(domain, port, proto=socket.IPPROTO_TCP)
|
|
|
- if not infos:
|
|
|
- return False, None, "dns_empty"
|
|
|
-
|
|
|
- af, socktype, proto, _, sockaddr = infos[0]
|
|
|
- with socket.socket(af, socktype, proto) as sock:
|
|
|
- sock.settimeout(timeout_sec)
|
|
|
- sock.connect(sockaddr)
|
|
|
- if tls_verify:
|
|
|
- ctx = ssl.create_default_context()
|
|
|
- else:
|
|
|
- ctx = ssl._create_unverified_context()
|
|
|
- with ctx.wrap_socket(sock, server_hostname=domain) as ssock:
|
|
|
- ssock.do_handshake()
|
|
|
+def choose_top_candidate_domains(filtered_domains, top_n, ranked_scored):
|
|
|
+ if ranked_scored:
|
|
|
+ domains_by_score = [x["domain"] for x in ranked_scored]
|
|
|
+ return domains_by_score[:top_n]
|
|
|
+ return filtered_domains[:top_n]
|
|
|
|
|
|
- elapsed = int((time.perf_counter() - start) * 1000)
|
|
|
- return True, elapsed, "ok"
|
|
|
- except Exception as e:
|
|
|
- return False, None, str(e)
|
|
|
|
|
|
+def blank_top_candidate(domain="", source_type=""):
|
|
|
+ return {
|
|
|
+ "domain": domain,
|
|
|
+ "ip": domain if IPV4_RE.match(domain) else "",
|
|
|
+ "source_type": source_type,
|
|
|
+ "sent": "",
|
|
|
+ "received": "",
|
|
|
+ "loss_rate": "",
|
|
|
+ "avg_latency": "",
|
|
|
+ "download_speed": "",
|
|
|
+ "region": "",
|
|
|
+ "location_country": "",
|
|
|
+ "location_city": "",
|
|
|
+ "host_provider": "",
|
|
|
+ "score_value": None,
|
|
|
+ "scores": [],
|
|
|
+ "created_raw": "",
|
|
|
+ }
|
|
|
|
|
|
-def check_domains(domains, hc_cfg):
|
|
|
- attempts = int(hc_cfg.get("attempts", 5))
|
|
|
- timeout_ms = int(hc_cfg.get("timeout_ms", 1800))
|
|
|
- port = int(hc_cfg.get("port", 443))
|
|
|
- tls_verify = bool(hc_cfg.get("tls_verify", True))
|
|
|
|
|
|
- results = []
|
|
|
- for d in domains:
|
|
|
- ok_count = 0
|
|
|
- latencies = []
|
|
|
- errors = []
|
|
|
- for _ in range(attempts):
|
|
|
- ok, latency, err = single_tls_check(d, timeout_ms, port, tls_verify=tls_verify)
|
|
|
- if ok:
|
|
|
- ok_count += 1
|
|
|
- latencies.append(latency)
|
|
|
- else:
|
|
|
- errors.append(err)
|
|
|
+def text_or_blank(value):
|
|
|
+ if value is None:
|
|
|
+ return ""
|
|
|
+ return str(value).strip()
|
|
|
|
|
|
- success_ratio = ok_count / attempts if attempts else 0.0
|
|
|
- avg_latency = int(sum(latencies) / len(latencies)) if latencies else 999999
|
|
|
- results.append(
|
|
|
- {
|
|
|
- "domain": d,
|
|
|
- "success_ratio": success_ratio,
|
|
|
- "avg_latency_ms": avg_latency,
|
|
|
- "ok_count": ok_count,
|
|
|
- "attempts": attempts,
|
|
|
- "errors": errors[:3],
|
|
|
- }
|
|
|
- )
|
|
|
|
|
|
- results.sort(key=lambda x: (-x["success_ratio"], x["avg_latency_ms"], x["domain"]))
|
|
|
- return results
|
|
|
+def maybe_resolve_field(record, field_name, field_map):
|
|
|
+ if not isinstance(record, dict):
|
|
|
+ return None
|
|
|
+ if field_name not in field_map:
|
|
|
+ return None
|
|
|
+ return resolve_field(record, field_name, field_map)
|
|
|
+
|
|
|
+
|
|
|
+def build_cfst_candidate(row):
|
|
|
+ domain = row.get("domain", "")
|
|
|
+ candidate = blank_top_candidate(domain=domain, source_type="cfst_local")
|
|
|
+ candidate["ip"] = text_or_blank(row.get("ip") or domain)
|
|
|
+ candidate["sent"] = text_or_blank(row.get("sent"))
|
|
|
+ candidate["received"] = text_or_blank(row.get("received"))
|
|
|
+ candidate["loss_rate"] = text_or_blank(row.get("loss_rate"))
|
|
|
+ candidate["avg_latency"] = text_or_blank(row.get("avg_latency"))
|
|
|
+ candidate["download_speed"] = text_or_blank(row.get("download_speed"))
|
|
|
+ candidate["region"] = text_or_blank(row.get("region"))
|
|
|
+ return candidate
|
|
|
+
|
|
|
+
|
|
|
+def build_api_candidate(domain, record, field_map, scored_record=None):
|
|
|
+ candidate = blank_top_candidate(domain=domain, source_type="api")
|
|
|
+ candidate["ip"] = domain if IPV4_RE.match(domain) else ""
|
|
|
+
|
|
|
+ if record:
|
|
|
+ candidate["created_raw"] = text_or_blank(maybe_resolve_field(record, "created_at", field_map))
|
|
|
+ candidate["avg_latency"] = text_or_blank(maybe_resolve_field(record, "avg_latency", field_map))
|
|
|
+ candidate["loss_rate"] = text_or_blank(maybe_resolve_field(record, "avg_pkg_lost_rate", field_map))
|
|
|
+ candidate["location_country"] = text_or_blank(maybe_resolve_field(record, "location_country", field_map))
|
|
|
+ candidate["location_city"] = text_or_blank(maybe_resolve_field(record, "location_city", field_map))
|
|
|
+ candidate["host_provider"] = text_or_blank(maybe_resolve_field(record, "host_provider", field_map))
|
|
|
+ region_parts = [candidate["location_country"], candidate["location_city"]]
|
|
|
+ candidate["region"] = "/".join([x for x in region_parts if x])
|
|
|
+
|
|
|
+ if scored_record:
|
|
|
+ candidate["score_value"] = scored_record.get("score_value")
|
|
|
+ candidate["scores"] = list(scored_record.get("scores", []))
|
|
|
+ if not candidate["created_raw"]:
|
|
|
+ candidate["created_raw"] = text_or_blank(scored_record.get("created_raw"))
|
|
|
+
|
|
|
+ return candidate
|
|
|
+
|
|
|
+
|
|
|
+def build_top_candidates(source_type, candidate_domains, cfst_rows=None, records=None, field_map=None, ranked_scored=None):
|
|
|
+ if source_type == "cfst_local":
|
|
|
+ row_map = {}
|
|
|
+ for row in cfst_rows or []:
|
|
|
+ domain = row.get("domain", "")
|
|
|
+ if domain and domain not in row_map:
|
|
|
+ row_map[domain] = row
|
|
|
+ return [build_cfst_candidate(row_map.get(d, {"domain": d, "ip": d})) for d in candidate_domains]
|
|
|
+
|
|
|
+ ranked_map = {}
|
|
|
+ for item in ranked_scored or []:
|
|
|
+ domain = item.get("domain", "")
|
|
|
+ if domain and domain not in ranked_map:
|
|
|
+ ranked_map[domain] = item
|
|
|
+
|
|
|
+ record_map = {}
|
|
|
+ for record in records or []:
|
|
|
+ domain = normalize_domain(maybe_resolve_field(record, "domain", field_map or {}))
|
|
|
+ if domain and domain not in record_map:
|
|
|
+ record_map[domain] = record
|
|
|
+
|
|
|
+ out = []
|
|
|
+ for domain in candidate_domains:
|
|
|
+ scored_record = ranked_map.get(domain)
|
|
|
+ record = None
|
|
|
+ if scored_record:
|
|
|
+ record = scored_record.get("raw_record")
|
|
|
+ if record is None:
|
|
|
+ record = record_map.get(domain)
|
|
|
+ out.append(build_api_candidate(domain, record, field_map or {}, scored_record=scored_record))
|
|
|
+ return out
|
|
|
|
|
|
|
|
|
def run_notify(cmd, domain, status):
|
|
|
@@ -854,52 +901,14 @@ def run_notify(cmd, domain, status):
|
|
|
subprocess.run(cmd, shell=True, check=False, env=env)
|
|
|
|
|
|
|
|
|
-def choose_domain(filtered_domains, check_results, top_n, ranked_scored):
|
|
|
+def choose_domain(filtered_domains, top_n, ranked_scored):
|
|
|
if ranked_scored:
|
|
|
- domains_by_score = [x["domain"] for x in ranked_scored]
|
|
|
-
|
|
|
- if check_results:
|
|
|
- check_map = {x["domain"]: x for x in check_results}
|
|
|
- top = []
|
|
|
- for d in domains_by_score:
|
|
|
- if d in check_map and check_map[d]["success_ratio"] > 0:
|
|
|
- top.append(check_map[d])
|
|
|
- if len(top) >= top_n:
|
|
|
- break
|
|
|
- if top:
|
|
|
- return top[0]["domain"], top
|
|
|
-
|
|
|
- score_only = [
|
|
|
- {
|
|
|
- "domain": x["domain"],
|
|
|
- "score_value": x.get("score_value"),
|
|
|
- "scores": x.get("scores", []),
|
|
|
- "created_raw": x.get("created_raw"),
|
|
|
- }
|
|
|
- for x in ranked_scored[:top_n]
|
|
|
- ]
|
|
|
- return score_only[0]["domain"], score_only
|
|
|
-
|
|
|
- top_scored = [
|
|
|
- {
|
|
|
- "domain": x["domain"],
|
|
|
- "score_value": x.get("score_value"),
|
|
|
- "scores": x.get("scores", []),
|
|
|
- "created_raw": x.get("created_raw"),
|
|
|
- }
|
|
|
- for x in ranked_scored[:top_n]
|
|
|
- ]
|
|
|
- if top_scored:
|
|
|
- return top_scored[0]["domain"], top_scored
|
|
|
-
|
|
|
- if check_results:
|
|
|
- top = [x for x in check_results if x["success_ratio"] > 0][:top_n]
|
|
|
- if top:
|
|
|
- return top[0]["domain"], top
|
|
|
- return None, check_results[:top_n]
|
|
|
+ top_domains = choose_top_candidate_domains(filtered_domains, top_n, ranked_scored)
|
|
|
+ if top_domains:
|
|
|
+ return top_domains[0], top_domains
|
|
|
|
|
|
if filtered_domains:
|
|
|
- return filtered_domains[0], [{"domain": x} for x in filtered_domains[:top_n]]
|
|
|
+ return filtered_domains[0], filtered_domains[:top_n]
|
|
|
return None, []
|
|
|
|
|
|
|
|
|
@@ -974,7 +983,6 @@ def main():
|
|
|
|
|
|
try:
|
|
|
top_n = int(cfg.get("selection", {}).get("top_n", 3))
|
|
|
- check_results = []
|
|
|
payload = None
|
|
|
|
|
|
if source_type == "cfst_local":
|
|
|
@@ -986,12 +994,9 @@ def main():
|
|
|
if not cfst_rows:
|
|
|
raise RuntimeError("No valid IP available from cfst result after filtering")
|
|
|
|
|
|
- if cfg.get("healthcheck", {}).get("enabled", False):
|
|
|
- check_results = check_domains(filtered, cfg.get("healthcheck", {}))
|
|
|
- selected, top_candidates = choose_domain(filtered, check_results, top_n, [])
|
|
|
- else:
|
|
|
- selected = cfst_rows[0]["domain"]
|
|
|
- top_candidates = cfst_rows[:top_n]
|
|
|
+ selected = cfst_rows[0]["domain"]
|
|
|
+ candidate_domains = [row["domain"] for row in cfst_rows[:top_n]]
|
|
|
+ top_candidates = build_top_candidates("cfst_local", candidate_domains, cfst_rows=cfst_rows)
|
|
|
else:
|
|
|
payload = fetch_api_json(cfg)
|
|
|
parsed = parse_domains(payload, cfg.get("parser", {}))
|
|
|
@@ -1012,14 +1017,16 @@ def main():
|
|
|
scored_records = [r for r in scored_records if r["domain"] in filtered_set]
|
|
|
ranked_scored = rank_scored_records(scored_records, scoring_cfg)
|
|
|
|
|
|
- if cfg.get("healthcheck", {}).get("enabled", False):
|
|
|
- check_results = check_domains(filtered, cfg.get("healthcheck", {}))
|
|
|
-
|
|
|
- selected, top_candidates = choose_domain(filtered, check_results, top_n, ranked_scored)
|
|
|
+ selected, candidate_domains = choose_domain(filtered, top_n, ranked_scored)
|
|
|
+ top_candidates = build_top_candidates(
|
|
|
+ "api",
|
|
|
+ candidate_domains,
|
|
|
+ records=records,
|
|
|
+ field_map=field_map,
|
|
|
+ ranked_scored=ranked_scored,
|
|
|
+ )
|
|
|
|
|
|
status = "ok"
|
|
|
- if check_results and all(x["success_ratio"] == 0 for x in check_results):
|
|
|
- status = "ok_no_healthy"
|
|
|
if not selected and last_good:
|
|
|
selected = last_good
|
|
|
status = "fallback_last_good"
|
|
|
@@ -1036,7 +1043,6 @@ def main():
|
|
|
"status": status,
|
|
|
"source_type": source_type,
|
|
|
"source_count": len(parsed),
|
|
|
- "checked_count": len(check_results),
|
|
|
"top_candidates": top_candidates,
|
|
|
}
|
|
|
write_json_file(selected_json_file, current_json)
|
|
|
@@ -1054,7 +1060,6 @@ def main():
|
|
|
state_last_good_key: selected,
|
|
|
"status": status,
|
|
|
"source_count": len(parsed),
|
|
|
- "checked_count": len(check_results),
|
|
|
"source_type": source_type,
|
|
|
}
|
|
|
write_json_file(state_file, new_state)
|