⼀、 前⾔:什么是“隐盾模式”?

在传统的 Web 安全架构中,即使你开启了 Cloudflare 等 CDN,攻击者仍可以通过扫描全球 IP 段直

接通过 80/443 端⼝探测你的源站。如果源站 IP 泄露,CDN 的 WAF 防御将瞬间形同虚设。

“隐盾模式”(Hidden Shield Mode) 的核⼼逻辑是:

  1. 物理封闭:利⽤ UFW 防⽕墙封闭所有⾮⽩名单端⼝,仅允许 Cloudflare 边缘节点 IP 访问。
  2. 静默审计:将 UFW 丢弃的所有直连攻击包记录在案。
  3. 情报转化:利⽤ Python 后端和浏览器内置的全球汉化引擎,将枯燥的系统⽇志转化为实时的

威胁情报看板。看板地址:点击体验

⼆、 系统架构图

流量层:合法访客通过域名访问 -> Cloudflare 过滤 -> 携带加密凭证回源。

防御层:攻击者直连服务器 IP -> UFW 内核对⽐⽩名单 -> 发现⾮ CF 节点,直接丢弃(DROP)

并记⼊ /var/log/ufw.log 。

情报层:Python 脚本每⼩时分析⽇志 -> 结合 IPinfo 数据库溯源 -> ⽣成 ufw_stats.json 。

展⽰层:HTML/React 仪表盘通过 Intl 引擎⾃动汉化全球地理位置,实时展⽰攻击分布。

三、 部署第⼀步:基础设施硬化(Shell 脚本)

⾸先,我们需要确保服务器防⽕墙能够⾃动识别并放⾏ Cloudflare 的所有节点。

  1. 编写同步脚本

在服务器 /root/scripts ⽬录下新建 sync_cf_ips.sh :

#!/bin/bash

# =================================================================
# 脚本名称: update_cf_ufw.sh
# 描述: 自动获取 Cloudflare 最新 IP 段并更新 UFW 规则
# 建议 Cron: 0 0 * * 1 (每周一凌晨运行一次)
# =================================================================

set -e  # 出错即停止执行

# 1. 检查权限
if [ "$EUID" -ne 0 ]; then
  echo "请以 root 权限运行此脚本。"
  exit 1
fi

# 2. 确保依赖安装
if ! command -v curl &> /dev/null; then
    echo "正在安装 curl..."
    apt-get update && apt-get install -y curl
fi

echo "正在从 Cloudflare 获取最新 IP 段..."

# 3. 获取 IPv4 和 IPv6 列表
CF_IPV4=$(curl -s https://www.cloudflare.com/ips-v4)
CF_IPV6=$(curl -s https://www.cloudflare.com/ips-v6)

if [ -z "$CF_IPV4" ]; then
    echo "错误:无法获取 Cloudflare IP 列表。"
    exit 1
fi

# 4. 清理旧的 Cloudflare 规则 (可选,但强烈建议)
# 为了防止规则重复堆积,我们先删除所有 80/443 的规则,或者根据注释标记删除
# 这里采用简单粗暴但安全的方法:重置部分规则前先关闭,或按需精准删除
echo "正在清理旧规则并配置防火墙..."

# 5. 设置默认策略
ufw --force reset
ufw default deny incoming
ufw default allow outgoing

# 6. 放行 SSH 端口 (务必最先执行,防止断连)
# 建议将 22 替换为你实际的 SSH 端口
ufw allow 22/tcp comment 'SSH Port'

# 7. 循环放行 Cloudflare IP 访问 80 和 443
for ip in $CF_IPV4 $CF_IPV6; do
    ufw allow from "$ip" to any port 80,443 proto tcp comment 'Cloudflare IP'
done

# 8. 启用防火墙
ufw --force enable
ufw reload

echo "UFW 规则更新成功!已允许 Cloudflare 全球节点访问 80/443 端口。"

四、 第⼆步:情报引擎配置(Python 脚本)

我们需要⼀个分析器来读取内核⽇志并将其结构化。

  1. 安装环境与依赖

确保系统已安装 Python3 及其包管理器:

# Ubuntu/Debian 系统

apt-get install -y python3-pip

# 安装分析引擎所需的库

pip3 install requests psutil maxminddb

  1. 编写 ufw_analyzer.py (双源溯源版)

将此⽂件存放在你的 Web 根⽬录下(如 /www/wwwroot/yourdomain.com/ )。此版本⽀持优先读

取本地 .mmdb 数据库以提升性能。

import re
import json
import os
import time
from collections import Counter
from datetime import datetime

import requests
import psutil

# --- 配置区 ---
LOG_FILE = "/var/log/ufw.log"
OUTPUT_JSON = "./ufw_stats.json"
API_TOKEN = "你的IPinfo_Token"  # 建议在 ipinfo.io 免费注册获取 Token
DB_FILE = "./ipinfo_lite.mmdb"
NODE_NAME = "UCloud-HK-Node01"


def analyze_logs():
    """解析 UFW 日志并提取被拦截的源 IP"""
    if not os.path.exists(LOG_FILE):
        print(f"Error: Log file {LOG_FILE} not found.")
        return []

    ip_pattern = re.compile(r"SRC=([0-9\.]+)")
    blocked_ips = []

    try:
        with open(LOG_FILE, 'r', errors='ignore') as f:
            for line in f:
                if "[UFW BLOCK]" in line:
                    match = ip_pattern.search(line)
                    if match:
                        blocked_ips.append(match.group(1))
    except Exception as e:
        print(f"Error reading log: {e}")

    return blocked_ips


def get_geo(ips):
    """获取 IP 的地理位置信息,优先使用本地数据库"""
    results = []
    top_ips = Counter(ips).most_common(100)
    reader = None

    # 尝试加载本地 MaxMind/GeoIP 数据库
    if os.path.exists(DB_FILE):
        try:
            import maxminddb
            reader = maxminddb.open_database(DB_FILE)
        except ImportError:
            print("Warning: maxminddb module not installed.")
        except Exception as e:
            print(f"Error loading DB: {e}")

    for ip, count in top_ips:
        data = None

        # 1. 优先尝试本地解析
        if reader:
            try:
                res = reader.get(ip)
                if res:
                    data = {
                        "country": res.get('country', {}).get('names', {}).get('en', 'Unknown'),
                        "city": res.get('city', {}).get('names', {}).get('en', 'Unknown'),
                        "isp": res.get('traits', {}).get('isp', 'Unknown'),
                        "source": "本地数据库"
                    }
            except Exception:
                pass

        # 2. 本地失败则请求 ipinfo.io API
        if not data:
            try:
                params = {'token': API_TOKEN} if API_TOKEN else {}
                r = requests.get(f"https://ipinfo.io/{ip}/json", params=params, timeout=5)
                if r.status_code == 200:
                    resp_json = r.json()
                    data = {
                        "country": resp_json.get('country', 'Unknown'),
                        "city": resp_json.get('city', 'Unknown'),
                        "isp": re.sub(r'^AS\d+\s+', '', resp_json.get('org', 'Unknown')),
                        "source": "云端解析"
                    }
                    time.sleep(0.1)  # 避免触发频率限制
            except Exception as e:
                print(f"API Error for {ip}: {e}")
                continue

        if data:
            data.update({"ip": ip, "count": count})
            results.append(data)

    if reader:
        reader.close()

    return results


def main():
    print(f"[{datetime.now()}] Starting UFW log analysis...")
    
    ips = analyze_logs()
    if not ips:
        print("No blocked IPs found.")
        return

    output = {
        "node_name": NODE_NAME,
        "total_blocks": len(ips),
        "last_update": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "system_info": {
            "cpu_load": psutil.cpu_percent(interval=1),
            "mem_usage": psutil.virtual_memory().percent
        },
        "top_attackers": get_geo(ips)
    }

    try:
        with open(OUTPUT_JSON, 'w', encoding='utf-8') as f:
            json.dump(output, f, ensure_ascii=False, indent=2)
        print(f"Analysis complete. Results saved to {OUTPUT_JSON}")
    except Exception as e:
        print(f"Failed to save JSON: {e}")


if __name__ == "__main__":
    main()

 

五、 第三步:⾃动化运维(宝塔⾯板计划任务)

为了保持数据的实时性,我们需要在宝塔⾯板(或系统 Crontab)中设置⾃动化任务:

任务 1:同步 CF ⽩名单(每天执⾏)

任务类型:Shell 脚本
任务名称:隐盾模式-同步 CF 节点
执⾏周期:每天 03:00
脚本内容: bash /root/scripts/sync_cf_ips.sh
任务 2:解析情报⽇志(每⼩时执⾏)
任务类型:Shell 脚本
任务名称:隐盾模式-⽇志分析
执⾏周期:每 1 ⼩时 0 分钟
脚本内容:
cd /www/wwwroot/[yourdomain.com/](https://yourdomain.com/)
python3 ufw_analyzer.py

六、 第四步:可视化激活(前端看板)

本系统的⼀⼤特⾊是**“零字典汉化”**。在前端 HTML 中,我们利⽤浏览器内置的

Intl.DisplayNames API 实现全球 200 多个国家名称的即时中⽂转换:

/**
 * 全球地理名称自动汉化引擎
 * 使用 Intl.DisplayNames 接口实现符合 Unicode 标准的语言翻译
 */

// 建议将实例放在函数外或设为单例,避免重复初始化带来的开销
const regionNames = new Intl.DisplayNames(['zh-Hans'], { type: 'region' });

/**
 * 将 ISO 3166-1 alpha-2 国家代码翻译为中文
 * @param {string} code - 国家/地区代码 (例如: 'US', 'hk', 'GB')
 * @returns {string} 汉化后的名称或原始代码
 */
function translateCountry(code) {
  if (!code || typeof code !== 'string') return '未知';

  try {
    // toUpperCase() 确保了对 'us' 或 'Us' 这种非标准输入的兼容性
    return regionNames.of(code.toUpperCase());
  } catch (e) {
    // 捕获无效代码异常(如输入 'XYZ'),返回原始代码以防程序崩溃
    return code;
  }
}

// --- 使用示例 ---
console.log(translateCountry('US')); // 美国
console.log(translateCountry('JP')); // 日本
console.log(translateCountry('HK')); // 中国香港

 

你可以直接将项⽬中的 shanku.html 或 architecture.html 部署到 Web 根⽬录,即可在浏览

器中通过图形化界⾯查看实时攻击态势。

七、 进阶:如何利⽤这些数据?

  1. ASN 战略封禁:通过看板发现某特定 ASN 发起了数万次攻击。在 Cloudflare 的 WAF 规则中

直接添加⼀条: IP ASN 等于 14061 -> 阻止 。这⽐封禁单个 IP 有效⼏千倍。

  1. 区域围栏:如果你没有来⾃东欧或北⾮的业务,通过看板识别出这些地区的攻击聚集后,可以

直接在 CDN 侧屏蔽整个国家。

  1. 合规性说明:本⽅案使⽤了 IPinfo Lite 数据集。根据协议,请务必在看板⻚脚保留 IP

Geolocation by IPinfo 字样。

结语:安全不是⼀堵死墙,⽽是⼀套会呼吸的系统。“隐盾模式”通过将防御⾏为可视

化,让管理员从被动挨打转变为主动的情报治理。当攻击者发现他在你⾯前是“透明”的

时候,他就会寻找下⼀个⽬标。

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。