#!/usr/bin/env python3 """ VPN Logs Daily Report ───────────────────── 1. Идёт в S3 bucket за вчерашнюю дату (UTC) 2. Сравнивает с inventory — кто загрузился, кто нет 3. Качает meta.json от каждой ноды, проверяет целостность 4. Качает xray-error.log.zst, парсит топ ошибок 5. Формирует отчёт: краткая сводка (caption) + детальный файл (.txt.gz) 6. Шлёт в Telegram через bot API """ import os import re import sys import gzip import json import time import socket import datetime import subprocess from pathlib import Path from collections import Counter import yaml import boto3 import requests from botocore.client import Config from botocore.exceptions import ClientError # === конфиг === SECRETS = "/etc/log-collector/secrets.env" INVENTORY = "/etc/log-collector/inventory.yaml" STATE_DIR = Path("/var/lib/log-collector") LOG_DIR = Path("/var/log/log-collector") STATE_DIR.mkdir(parents=True, exist_ok=True) LOG_DIR.mkdir(parents=True, exist_ok=True) # === читаем secrets === secrets = {} for line in open(SECRETS): line = line.strip() if line.startswith("#") or "=" not in line: continue k, v = line.split("=", 1) secrets[k.strip()] = v.strip() os.environ[k.strip()] = v.strip() S3_ENDPOINT = secrets["S3_ENDPOINT"] S3_BUCKET = secrets["S3_BUCKET"] S3_REGION = secrets.get("S3_REGION", "ru-1") TG_TOKEN = secrets["TG_BOT_TOKEN"] TG_CHAT = secrets["TG_CHAT_ID"] # === читаем inventory === with open(INVENTORY) as f: inv = yaml.safe_load(f) NODES = inv["nodes"] # === дата отчёта (вчера UTC) === NOW = datetime.datetime.now(datetime.UTC) REPORT_DATE = (NOW - datetime.timedelta(days=1)).strftime("%Y-%m-%d") DATE_PREFIX = REPORT_DATE.replace("-", "/") # 2026/05/19 # === S3 клиент === s3 = boto3.client( "s3", endpoint_url=S3_ENDPOINT, aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"], aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"], region_name=S3_REGION, config=Config(signature_version="s3v4", retries={"max_attempts": 3}), ) # === country emoji map === FLAG = { "DE": "🇩🇪", "NL": "🇳🇱", "US": "🇺🇸", "HK": "🇭🇰", "FI": "🇫🇮", "PL": "🇵🇱", "AT": "🇦🇹", "SE": "🇸🇪", "EE": "🇪🇪", "GB": "🇬🇧", "LV": "🇱🇻", "KZ": "🇰🇿", "TR": "🇹🇷", "FR": "🇫🇷", "IN": "🇮🇳", "RU": "🇷🇺", } def fmt_size(n): """Bytes → human readable.""" for unit in ["B", "KB", "MB", "GB", "TB"]: if n < 1024: return f"{n:.1f} {unit}" n /= 1024 return f"{n:.1f} PB" def list_node_objects(node): """Список объектов в S3 для конкретной ноды за дату отчёта.""" prefix = f"{DATE_PREFIX}/{node['name']}__{node['ip']}/" try: resp = s3.list_objects_v2(Bucket=S3_BUCKET, Prefix=prefix) return resp.get("Contents", []) except ClientError as e: return None # ошибка S3 def get_meta(node): """Скачивает _meta.json от ноды (если есть).""" key = f"{DATE_PREFIX}/{node['name']}__{node['ip']}/_meta.json" try: resp = s3.get_object(Bucket=S3_BUCKET, Key=key) return json.loads(resp["Body"].read()) except ClientError: return None def parse_error_log(node): """Скачивает xray-error.log.zst, распаковывает, парсит топ ошибок.""" key = f"{DATE_PREFIX}/{node['name']}__{node['ip']}/xray-error.log.zst" try: resp = s3.get_object(Bucket=S3_BUCKET, Key=key) data = resp["Body"].read() except ClientError: return Counter() # распаковка zstd через subprocess (быстрее python-zstandard) try: decompressed = subprocess.run( ["zstd", "-dc"], input=data, capture_output=True, check=True ).stdout.decode("utf-8", errors="replace") except Exception: return Counter() # извлекаем "ошибочные паттерны" — берём всё что после уровня WARN/ERROR counter = Counter() pattern = re.compile(r"(?:WARN|ERROR|Warning|Error)[\]\s:>]+([^\n]{10,200})") for line in decompressed.splitlines(): m = pattern.search(line) if m: msg = m.group(1).strip() # нормализуем: убираем IP, ports, UUID msg = re.sub(r"\d+\.\d+\.\d+\.\d+(:\d+)?", "", msg) msg = re.sub(r"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}", "", msg) msg = re.sub(r"\bport\s+\d+", "port ", msg) msg = msg[:120] # обрезаем длинные counter[msg] += 1 return counter def get_bucket_total(): """Общий размер всех объектов в bucket.""" total = 0 count = 0 paginator = s3.get_paginator("list_objects_v2") for page in paginator.paginate(Bucket=S3_BUCKET): for obj in page.get("Contents", []): total += obj["Size"] count += 1 return total, count def main(): print(f"[{NOW}] Report for {REPORT_DATE}") # === шаг 1: проверяем каждую ноду === results = [] total_compressed = 0 total_files = 0 all_errors = Counter() for node in NODES: objects = list_node_objects(node) if objects is None: results.append({"node": node, "status": "S3_ERROR", "size": 0}) continue if not objects: results.append({"node": node, "status": "MISSING", "size": 0}) continue size = sum(o["Size"] for o in objects) meta = get_meta(node) # парсим ошибки (только vpn-ноды, не gaming/bridge) if node.get("role") in ("vpn", "vpn-wl", "routing"): errs = parse_error_log(node) all_errors.update(errs) else: errs = Counter() results.append({ "node": node, "status": "OK", "size": size, "files": len(objects), "meta": meta, "error_count": sum(errs.values()), }) total_compressed += size total_files += len(objects) # === шаг 2: bucket usage === bucket_total_size, bucket_total_count = get_bucket_total() # === шаг 3: формируем краткую сводку (caption) === ok_count = sum(1 for r in results if r["status"] == "OK") missing = [r for r in results if r["status"] == "MISSING"] errors = [r for r in results if r["status"] == "S3_ERROR"] by_country = {} for r in results: c = r["node"]["country"] by_country.setdefault(c, {"ok": 0, "total": 0, "size": 0}) by_country[c]["total"] += 1 if r["status"] == "OK": by_country[c]["ok"] += 1 by_country[c]["size"] += r["size"] caption_lines = [ f"📊 Daily Log Report — {REPORT_DATE}", "", f"✅ Загружено: {ok_count}/{len(NODES)} ({fmt_size(total_compressed)} сжатого)", ] if missing: names = ", ".join(f"{m['node']['name']}" for m in missing[:5]) more = f" +{len(missing)-5}" if len(missing) > 5 else "" caption_lines.append(f"❌ Не пришли: {names}{more}") if errors: caption_lines.append(f"⚠️ S3 ошибки: {len(errors)} нод") # топ ошибки в чат — только если что-то есть if all_errors: top_errs = all_errors.most_common(3) caption_lines.append("") caption_lines.append("🔴 Топ ошибок xray:") for msg, n in top_errs: caption_lines.append(f" • {n}× {msg[:60]}") caption_lines.append("") caption_lines.append(f"💾 Bucket: {fmt_size(bucket_total_size)} ({bucket_total_count} файлов)") caption_lines.append("📎 Полный отчёт во вложении ↓") caption = "\n".join(caption_lines) # === шаг 4: детальный отчёт в файл === detail_lines = [] detail_lines.append("═" * 60) detail_lines.append(f" VPN LOGS — DAILY REPORT") detail_lines.append(f" Date: {REPORT_DATE} UTC") detail_lines.append(f" Generated: {NOW.strftime('%Y-%m-%d %H:%M:%S')} UTC") detail_lines.append("═" * 60) detail_lines.append("") # === по странам === detail_lines.append("┌─── СТАТУС ПО СТРАНАМ ───┐") for c in sorted(by_country.keys()): s = by_country[c] mark = "✓" if s["ok"] == s["total"] else "⚠" detail_lines.append( f" {FLAG.get(c, '🏳️')} {c} {s['ok']}/{s['total']} {mark} {fmt_size(s['size']):>10}" ) detail_lines.append("") # === по нодам === detail_lines.append("┌─── ДЕТАЛИ ПО НОДАМ ───┐") for r in results: n = r["node"] flag = FLAG.get(n["country"], "🏳️") if r["status"] == "OK": meta_info = "" if r.get("meta"): m = r["meta"] # вычислим raw size из meta raw = sum(f.get("lines", 0) for f in m.get("files", [])) meta_info = f" {raw:>10} lines" err_info = f" err={r['error_count']}" if r['error_count'] else "" detail_lines.append( f" [OK] {flag} {n['country']:2} {n['name']:18} {n['ip']:18} " f"{fmt_size(r['size']):>10} files={r['files']}{meta_info}{err_info}" ) elif r["status"] == "MISSING": detail_lines.append( f" [FAIL] {flag} {n['country']:2} {n['name']:18} {n['ip']:18} — нет логов" ) else: detail_lines.append( f" [ERR] {flag} {n['country']:2} {n['name']:18} {n['ip']:18} S3 error" ) detail_lines.append("") # === топ ошибок === detail_lines.append("┌─── TOP XRAY ERRORS (all nodes) ───┐") if all_errors: for msg, n in all_errors.most_common(20): detail_lines.append(f" {n:>6} {msg[:100]}") else: detail_lines.append(" (нет ошибок)") detail_lines.append("") # === bucket info === detail_lines.append("┌─── BUCKET ───┐") detail_lines.append(f" endpoint: {S3_ENDPOINT}") detail_lines.append(f" bucket: {S3_BUCKET}") detail_lines.append(f" total objects: {bucket_total_count}") detail_lines.append(f" total size: {fmt_size(bucket_total_size)}") detail_lines.append("") # === аномалии === detail_lines.append("┌─── ANOMALIES ───┐") anomalies = [] # ноды с очень большим error.log for r in results: if r["status"] == "OK" and r.get("error_count", 0) > 100: anomalies.append(f" • {r['node']['name']} ({r['node']['country']}): " f"{r['error_count']} ошибок (много)") # ноды которые молчат N дней state_file = STATE_DIR / "last_seen.json" last_seen = {} if state_file.exists(): last_seen = json.loads(state_file.read_text()) for r in results: ip = r["node"]["ip"] if r["status"] == "OK": last_seen[ip] = REPORT_DATE else: ls = last_seen.get(ip, "never") if ls != "never": days_silent = (datetime.date.fromisoformat(REPORT_DATE) - datetime.date.fromisoformat(ls)).days if days_silent >= 2: anomalies.append(f" • {r['node']['name']} ({r['node']['country']}): " f"{days_silent} дней молчит") state_file.write_text(json.dumps(last_seen, indent=2)) if anomalies: detail_lines.extend(anomalies) else: detail_lines.append(" (нет аномалий)") detail_lines.append("") detail_lines.append("═" * 60) detail_report = "\n".join(detail_lines) # === шаг 5: сохраняем + сжимаем отчёт === report_path = LOG_DIR / f"report-{REPORT_DATE}.txt" report_path.write_text(detail_report) gz_path = LOG_DIR / f"report-{REPORT_DATE}.txt.gz" with open(report_path, "rb") as fin, gzip.open(gz_path, "wb", compresslevel=9) as fout: fout.write(fin.read()) print(f"detailed report: {gz_path} ({gz_path.stat().st_size} bytes)") print(f"caption length: {len(caption)} chars") # === шаг 6: отправляем в Telegram === # 1) Сначала caption-сообщение r1 = requests.post( f"https://api.telegram.org/bot{TG_TOKEN}/sendMessage", json={ "chat_id": TG_CHAT, "text": caption, "parse_mode": "HTML", "disable_web_page_preview": True, }, timeout=30, ) print(f"caption sent: {r1.status_code}") # 2) Затем файл-отчёт with open(gz_path, "rb") as f: r2 = requests.post( f"https://api.telegram.org/bot{TG_TOKEN}/sendDocument", data={"chat_id": TG_CHAT, "caption": f"📋 detailed-{REPORT_DATE}"}, files={"document": (gz_path.name, f, "application/gzip")}, timeout=60, ) print(f"file sent: {r2.status_code}") if r2.status_code != 200: print(r2.text) sys.exit(1) print("DONE") if __name__ == "__main__": main()