PearAdmin二次开发之防火墙封禁工具——临时封禁篇

   在安全运营实践中,IP 地址的封禁策略并非一味追求 “长期阻断”,而是需要结合行业特性、业务场景与合规要求,灵活采用更精细化的管控方式。临时封禁作为一种动态防御手段,通过为攻击源 IP 设定明确的封禁时长(如 30 分钟、1 小时等),既能在短期内切断攻击链路、遏制恶意行为的持续扩散,又能避免因长期封禁可能引发的业务适配问题(如动态 IP 用户的正常访问需求)或合规风险(部分行业对网络访问限制的时效有明确约束),从而在 “安全防护” 与 “业务连续性” 之间找到更优平衡点。
        从防御逻辑来看,临时封禁的核心价值体现在三个层面:
        其一,即时止损与攻击干扰。当攻击行为(如暴力破解、DDoS 试探、SQL 注入等)被检测到后,短期内封禁攻击源 IP 可快速中断攻击进程,避免攻击者在同一时间段内通过该 IP 完成攻击链闭环(如多次尝试突破密码、批量扫描漏洞等)。对攻击者而言,每一次临时封禁都会增加其攻击成本 —— 需重新等待封禁时效、更换 IP 或调整攻击策略,从而削弱其攻击意志。
        其二,风险缓冲与误判修正。在自动化防御场景中(如基于威胁情报或规则引擎的自动封禁),误判难以完全避免。若采用长期封禁,一旦误封正常用户 IP(如共享 IP、动态拨号 IP),可能导致业务中断或用户投诉;而临时封禁可将此类影响控制在有限时长内,同时为安全运营人员争取核查时间 —— 若发现误判,可手动提前解封,若确认是恶意 IP,也可延长封禁时长,实现 “边防御边验证” 的弹性管理。
        其三,适配业务与合规要求。部分行业(如电商、金融)对网络可用性要求极高,长期封禁可能影响潜在用户访问或正常交易;此外,某些地区的网络安全法规对 “限制网络访问的时效与程序” 有明确规定,要求封禁措施需具备 “必要性” 与 “适度性”。临时封禁通过设定可控的时效,既能满足防护需求,又能符合业务场景与合规框架的约束。

一、临时封禁菜单

通过PearAdmin的【系统管理】-【权限管理】在护网工具目录下创建【临时封禁】菜单,并设置权限标识和路径信息。

刷新页面左侧导航即可显示临时封禁菜单

二、前端界面设计

2.1 前端页面展现        

临时封禁主页面与一键封禁主界面的基础增加了时间输入框表单和时间单位选择下拉框。访问后端临时封禁接口实现从而实现临时封禁功能。因为临时封禁不同于防火墙策略拦截。在《PearAdmin二次开发之防火墙封禁工具——API开发》文章中已实现基于时间的黑名单API接口开发。

2.2 前端代码设计

在PearAdmin项目的【templates】-【system】-【huwang】目录下新建文件temporary.html。

<!DOCTYPE html>
<html>
<head>
    <title>一键封禁</title>
    {% include 'system/common/header.html' %}
</head>
</head>
<body class="pear-container">
<div class="layui-fluid">
    <div class="layui-card">
        <fieldset class="layui-elem-field layui-field-title"><legend>IP地址封禁</legend></fieldset>
        <div class="layui-card-body">
            <form class="layui-form layui-form-pane" action="">
                <div class="layui-form-item layui-form-text">
                    <div class="layui-input-block">
                        <textarea name="iplist" required lay-verify="required" placeholder="示例:
192.168.1.1
192.168.10.0/24
192.168.2.1-192.168.2.10
" class="layui-textarea"></textarea>
                    </div>
                </div>
                <br>
                <div class="layui-form-item>
                    <label class="layui-form-label">封禁时长</label>
                    <div class="layui-input-inline">
                        <input type="text" name="number" placeholder="" class="layui-input" value="30">
                    </div>
                    <div class="layui-input-inline">
                        <select name="time" required>
                            <option value=""></option>
                            <option value="minute" selected>分钟</option>
                            <option value="hour">小时</option>
                            <option value="day">天</option>
                        </select>
                    </div>
                </div>
                <br>
                <div class="layui-form-item">
                    <div class="layui-input-block">
                        <button class="layui-btn layui-btn-md" lay-submit lay-filter="block" >提交</button>
                        <button class="layui-btn layui-btn-md" type="reset" >重置</button>
                    </div>
                </div>
            </form>
        </div>
    </div>
    <div class="layui-card">
        <fieldset class="layui-elem-field layui-field-title">
            <legend>封禁日志</legend>
        </fieldset>
        <div class="layui-card-body">
            <table lay-filter="log-table"></table>
        </div>
    </div>
</div>

</body>
{% include 'system/common/footer.html' %}
<script>
    layui.use(['table',  'form', 'jquery', 'popup'], function(){
        let table = layui.table,
            popup = layui.popup,
            form = layui.form,
            $ = layui.$;
        form.on('submit(block)', function(data){//监听按钮
            $.ajax({
                url:"/system/huwang/exec_temporary",  //提交请求的URL
                data: JSON.stringify(data.field),
                dataType: 'json',
                contentType: 'application/json',
                type: 'post',
                success:function(result){
                    if (result.success){
                        layer.msg(result.msg, {icon: 1, time: 2000, title:'成功'}, function(){location.reload();});
                        return;
                    }else{
                        layer.msg(result.msg, { icon: 2, time: 4000, title:'失败' })
                    }
                },
                error:function(result){
                    alert("接口错误!!!"); //无返回或处理有报错时弹框
                }
            });
            return false;  // 别忘记这行,防止页面跳转
        });

        let cols = [
            [
                {title: '时间', field: 'timestamp', align: 'center', width: 280},  // 时间需较宽
                {title: '进程名', field: 'logger_name', align: 'center', width: 280},
                {title: '级别', field: 'level', align: 'center', width: 100},
                {title: '内容', field: 'message', align: 'left',}  // 内容列用minWidth
            ]
        ];

        table.render({
            elem: '#log-table',
            url: '/system/huwang/temporary_log',
            page:{limit: 10,limits:[10,30,50]},
            cols: cols,
            skin: 'line',
            text: {none: '暂无设备信息'},
        })
    });
</script>
</html>

三、后端代码设计

3.1 临时封禁路由

@bp.get('/temporary')
@authorize("system:huwang:temporary")
def temporary():
    return render_template('system/huwang/temporary.html')

3.2 临时封禁接口

@bp.post('/exec_temporary')
@authorize("system:huwang:temporary")
def exec_temporary():
    req_json = request.get_json(force=True)
    iplist = str_escape(req_json.get('iplist'))
    number = str_escape(req_json.get('number'))
    time = str_escape(req_json.get('time'))

    if time =='minute':
        expiry = int(number)*60
    elif time =='hour':
        expiry = int(number)*3600
    elif time =='day':
        expiry = int(number)*86400
    ip_list = iplist.strip().split('
')
    ip_list = temporary_ip_check(ip_list)
    if len(ip_list['error']) > 0:
        return fail_api('IP地址不正确:' + str(ip_list['error']))
    filter = []
    filter.append(Resources.location == '外网边界')
    filter.append(Resources.enable == 1)
    query = db.session.query(Resources).filter(*filter).all()
    logger = setup_logger("temporary_block")
    for dev in query:
        try:
            logger.info(f"开始进行防火墙: {dev.hostname}临时封禁操作")
            config = {
                "host": dev.mgtip,
                "vsys": dev.vsys,
                "username": dev.loginname,
                "password": dev.loginpass,
                "apikey": dev.apikey,
                "logger": logger
            }
            # 创建管理器实例
            if dev.loginmeth.lower() == 'api' and dev.vendor.lower() == 'fortigate':
                firewall = FortiGateManager(**config)
                result = firewall.temporary_block(ip_list, expiry)
                if result['success']:
                    return success_api(str(result['message']))
                else:
                    return fail_api(str(result['message']))
            else:
                return ('不支持的产品和登陆方式')
        except Exception as e:
            print(f"临时封禁IP过程发生未知性错误: {str(e)}")

3.3 临时封禁日志

@bp.get('/temporary_log')
@authorize("system:huwang:temporary")
def temporary_log():
    query = db.session.query(Hwlog).filter(Hwlog.logger_name=='temporary_block').order_by(Hwlog.timestamp.desc()).layui_paginate()
    return table_api(
        data = [{
        'timestamp': record.timestamp,
        'logger_name': record.logger_name,
        'level': record.level,
        'message': record.message
        } for record in query.items],
    count = query.total)

3.4 临时封禁IP检查

因为临时封禁与一键封禁提交给API接口的数据格式不同,所以再次单独做了IP格式检查和数据格式转换。

import re
from IPy import IP
def temporary_ip_check(ip_list):
    # 定义正则表达式模式
    single_ip_pattern = r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?).){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
    cidr_pattern = r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?).){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)/([0-2]?[0-9]|3[0-2])$'
    ip_range_pattern = r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?).){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)-((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?).){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'

    # 依次检查是否匹配三种格式
    result = {'ok':[], 'error':[]}
    for item in ip_list:
        if re.match(single_ip_pattern, item):
            if item not in result['ok']:
                result['ok'].append(item)
        elif re.match(cidr_pattern, item) or re.match(ip_range_pattern, item):
            try:
                ip_list = list(IP(item))
                for ip in ip_list:
                    if ip not in result['ok']:
                        result['ok'].append(str(ip))
            except:
                result['error'].append(item)
        else:
            result['error'].append(item)
    return (result)

四、测试验证

4.1 批量临时封禁测试

4.2 防火墙验证

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容