目录
一、导入外部模块
二、飞塔防护墙类定义
2.1类定义与功能概述
2.2 初始化方法 __init__
2.3 定义_send_request 方法
2.4 检查策略ID是否存在方法
2.5 创建地址对象方法
2.6 创建地址组方法
2.7 创建全局组方法
2.8 地址组追加对象方法
2.9 创建拦截策略方法
2.10 策略位置移动方法
2.11 获取最近使用的拦截地址组方法
2.12 获取所有拦截地址组方法
2.13 获取地址组成员列表方法
2.14 封禁IP地址过滤方法
2.15 地址组追加地址成员方法
2.16 IP地址与地址组归属查询方法
2.17 地址组移除地址对象方法
2.18 拦截策略部署方法
2.19 IP封禁方法
2.20 IP解禁方法
2.21 临时封禁方法
三、总结
防火墙封禁工具要实现对防火墙设备的有效管控,核心在于建立与设备的交互机制,从而完成一系列关键操作。这些操作涵盖防火墙地址对象、地址组对象、服务对象、服务组对象、时间对象的创建,以及防火墙策略的配置与调整 —— 包括将地址对象加入或移出地址组,为策略追加或移除源地址、目标地址、服务端口,还有修改策略动作等。
要实现与防火墙的交互,业界普遍采用两种技术路径:其一,调用防火墙产品原生提供的 API 接口;其二,通过 SSH 协议登录防火墙并执行相应命令。从本质来看,这两种方法的底层逻辑并无二致,都是通过特定协议与防火墙的控制层面进行通信,只是对外呈现的交互形式与技术实现方式存在差异。
本文将在前期《PearAdmin 二次开发之防火墙封禁工具 —— 资产管理篇》的基础上,以飞塔防火墙为具体案例,深入探讨基于 API 接口的开发实践,构建工具与防火墙之间的交互桥梁。
为提升工具的可扩展性,使其能在未来轻松兼容更多厂商的防火墙产品,开发过程将采用面向对象的设计思想。具体而言,为每个厂商的防火墙产品定义专属类,通过多态方法封装各类操作逻辑 —— 无论是地址对象管理、策略配置,还是对象间的关联调整等,均在类中实现标准化的方法接口。这种设计不仅能确保当前功能的稳定实现,更为后续拓展其他品牌设备的支持奠定了灵活的架构基础。
一、导入外部模块
import requests
import json
import re
import datetime
import urllib3
from urllib.parse import quote
from applications.common.utils.logging_setup import setup_logger
二、飞塔防护墙类定义
2.1类定义与功能概述
class FortiGateManager:
"""FortiGate防火墙管理类,集成策略部署、IP封禁和解封功能"""
def __init__(self, host, vsys, username, password, apikey, logger=None):
self.host = host
self.vsys = vsys
self.username = username
self.password = password
self.apikey = apikey
self.logger = logger or setup_logger(__name__)
self.logger.info(f"FortiGateManager实例已创建,管理主机: {host}")
self.base_url = f"https://{self.host}/api/v2/cmdb"
self.query_params = f"?vdom={self.vsys}&access_token={self.apikey}"
类名:FortiGateManager,用于与 Fortinet FortiGate 防火墙进行交互。
功能:通过 FortiGate 的 REST API 实现策略部署、IP 封禁和解封等操作。
2.2 初始化方法 __init__
def __init__(self, host, vsys, username, password, apikey, logger=None):
参数:
host:防火墙的 IP 地址
vsys:虚拟系统名称(VDOM),用于防火墙开启虚拟墙环境。
username:API 访问的用户名(若使用 API Key 则不需要)。
password:API 访问的密码(若使用 API Key 则不需要)。
apikey:API 密钥,用于身份验证。
logger:日志记录器实例(可选),默认使用setup_logger函数创建。
2.3 定义_send_request 方法
这段代码定义了一个名为 _send_request 的方法,它是 FortiGateManager 类的核心组件,负责与 FortiGate 防火墙的 REST API 进行通信。下面我将详细解释其功能和实现逻辑。
def _send_request(self, method, endpoint, data=None, params=None):
"""统一的API请求方法,处理错误和日志"""
url = f"{self.base_url}{endpoint}{self.query_params}"
if params:
url += '&' + '&'.join([f"{k}={v}" for k, v in params.items()])
self.logger.debug(f"发送{method}请求到: {url}")
if data:
self.logger.debug(f"请求数据: {json.dumps(data, ensure_ascii=False)}")
print(url)
try:
with requests.Session() as session:
session.verify = False
response = session.request(method, url, json=data)
self.logger.debug(f"响应状态码: {response.status_code}")
if response.content:
try:
json_response = response.json()
self.logger.debug(f"响应数据: {json.dumps(json_response, ensure_ascii=False)}")
return json_response
except json.JSONDecodeError:
self.logger.warning(f"响应非JSON格式: {response.text[:200]}...")
return response
except requests.exceptions.HTTPError as e:
self.logger.error(f"HTTP错误 {e.response.status_code}: {e.response.text}")
raise
except requests.exceptions.RequestException as e:
self.logger.error(f"请求异常,请检查网络通信是否正常")
raise
_send_request 是一个内部方法(以下划线开头),用于统一处理所有与 FortiGate API 的 HTTP 请求。它提供了以下功能:
构建完整的请求 URL
记录详细的请求 / 响应日志
处理 JSON 数据序列化 / 反序列化
异常处理和错误记录
返回标准化的响应结果
这个方法为与 FortiGate 防火墙的通信提供了坚实的基础,后续可以在此之上构建更高级的功能,如地址对象管理、策略部署等。
2.4 检查策略ID是否存在方法
方法主要用于在拦截策略部属时检查策略id是否已存在于防火墙,如果存在则需重新指定策略ID。
def check_policy_exists(self, policy_id):
"""检查指定策略ID是否已存在"""
try:
self.logger.info(f"检查策略ID {policy_id} 是否已存在")
endpoint = f"/firewall/policy/{policy_id}/?format=policyid"
response = self._send_request("GET", endpoint)
return response.get('http_status') == 200
except Exception as e:
self.logger.error(f"查询策略ID过程发生未知错误:{str(e)}")
raise
2.5 创建地址对象方法
通过传递的ip_list封禁IP地址列表数据,创建防火墙地址对象。统计地址对象总数与创建成功总数并返回。
def create_address_object(self, ip_list):
"""创建防火墙地址对象"""
results = []
try:
endpoint = "/firewall/address"
for data in ip_list:
ip_name = data.get('name')
if not ip_name:
results.append({"name": None, "success": False, "message": "IP名称为空"})
continue
try:
self.logger.info(f"创建地址对象 {ip_name}")
response = self._send_request("POST", endpoint, data=data)
if response.get('http_status') == 200:
self.logger.info(f"地址对象 {ip_name} 创建成功")
results.append({"name": ip_name, "success": True, "message": "创建成功"})
else:
error_msg = response.get('message', '未知错误')
self.logger.error(f"地址对象 {ip_name} 创建失败: {error_msg}")
results.append({"name": ip_name, "success": False, "message": error_msg})
except requests.exceptions.HTTPError as e:
if e.response.status_code == 409: # 冲突,通常表示已存在
self.logger.warning(f"地址对象 {ip_name} 已存在")
results.append({"name": ip_name, "success": True, "message": "已存在"})
else:
self.logger.error(f"地址对象 {ip_name} 创建失败: HTTP错误 {e.response.status_code}")
results.append({"name": ip_name, "success": False, "message": str(e)})
except Exception as e:
self.logger.error(f"地址对象 {ip_name} 创建失败: 未知错误 {str(e)}")
results.append({"name": ip_name, "success": False, "message": str(e)})
except Exception as e:
self.logger.error(f"批量创建地址对象过程发生灾难性错误: {str(e)}")
raise
# 返回详细结果
success_count = len([r for r in results if r['success']])
total_count = len(results)
self.logger.info(f"批量创建地址对象完成,成功: {success_count}/{total_count}")
2.6 创建地址组方法
创建地址组方法,根据提供的成员和地址组信息创建地址组,因为飞塔防火墙无法创建一个空的地址组对象,即创建地址组时必须给其添加成员。
def create_address_group(self, name, members=None):
"""创建地址组"""
try:
self.logger.info(f"创建地址组 {name}")
endpoint = "/firewall/addrgrp"
members = members or []
addrgrp_data = {
'name': name,
'member': [{'name': member} for member in members]
}
response = self._send_request("POST", endpoint, data=addrgrp_data)
if response.get('http_status') == 200:
self.logger.info(f"地址组 {name} 创建成功")
return True
else:
self.logger.error(f"地址组 {name} 创建失败: {response.get('message', '未知错误')}")
return False
except Exception as e:
self.logger.error(f"创建地址组 {name} 发生未知错误:{str(e)}")
raise
2.7 创建全局组方法
为了更好地组织防火墙拦截策略,拦截策略原地址调用的是“Deny_Group_Set”,其成员为动态生成的地址组“Deny_Group_时间戳”。
def create_global_group(self, global_group_name, member_name):
"""将地址组添加到全局组"""
try:
self.logger.info(f"将 {member_name} 添加到全局组 {global_group_name}")
endpoint = f"/firewall/addrgrp"
# 更新全局组
data = {'name': global_group_name, 'member': [{'name':member_name}]}
response = self._send_request("POST", endpoint, data=data)
if response.get('http_status') == 200:
self.logger.info(f"成功将 {member_name} 添加到全局组 {global_group_name}")
return True
else:
self.logger.error(f"添加失败: {response.get('message', '未知错误')}")
return False
except Exception as e:
self.logger.error(f"添加到全局组时发生未知错误:{str(e)}")
raise
2.8 地址组追加对象方法
该方法主要功能实现的是将动态生成的地址组追加到地址组。
def append_member_to_addrg(self,group_name, member_name):
"""将地址组添加到地址组"""
try:
self.logger.info(f"将 {member_name} 追加到地址组 {group_name}")
endpoint = f"/firewall/addrgrp/{group_name}/member"
# 更新全局组
data = {"name": member_name}
response = self._send_request("POST", endpoint, data=data)
if response.get('http_status') == 200:
self.logger.info(f"成功将 {member_name} 追加到地址组 {group_name}")
return True
else:
self.logger.error(f"追加失败: {response.get('message', '未知错误')}")
return False
except Exception as e:
self.logger.error(f"追加到地址组时发生未知错误:{str(e)}")
raise
2.9 创建拦截策略方法
根据策略ID生成拦截策略,新创建的策略往往都处于策略最底端,后续需要将其移至策略顶部。
def create_firewall_policy(self, policy_id, name, src_addr, action="deny"):
"""创建防火墙策略"""
try:
self.logger.info(f"创建防火墙策略 {policy_id}")
endpoint = "/firewall/policy"
policy_data = {
"policyid": policy_id,
"name": name,
"srcintf": [{"name": "any"}],
"dstintf": [{"name": "any"}],
"action": action,
"srcaddr": [{"name": src_addr}],
"dstaddr": [{"name": "all"}],
"service": [{"name": "ALL"}],
"schedule": "always",
"match-vip": "enable"
}
response = self._send_request("POST", endpoint, data=policy_data)
if response.get('http_status') == 200:
self.logger.info(f"防火墙策略 {policy_id} 创建成功")
return True
else:
self.logger.error(f"防火墙策略 {policy_id} 创建失败: {response.get('message', '未知错误')}")
return False
except Exception as e:
self.logger.error(f"创建防火墙策略时发生未知错误:{str(e)}")
raise
2.10 策略位置移动方法
获取防火墙最顶部策略的ID号,将拦截策略移至其上。
def move_policy_to_top(self, policy_id):
"""将策略移动到顶部"""
try:
# 获取第一条策略ID
endpoint = "/firewall/policy"
params = {"start": 0, "count": 1, "format": "policyid|action"}
response = self._send_request("GET", endpoint, params=params)
try:
target_policy_id = response['results'][0]['policyid']
self.logger.info(f"获取到第一条策略ID: {target_policy_id}")
except (KeyError, IndexError):
self.logger.warning("未找到现有策略,策略已在顶部")
return True
# 移动策略到指定位置
self.logger.info(f"将策略 {policy_id} 移动到策略 {target_policy_id} 之前")
endpoint = f"/firewall/policy/{policy_id}"
params = {"action": "move", "before": target_policy_id}
response = self._send_request("PUT", endpoint, params=params)
if response.get('http_status') == 200:
self.logger.info(f"策略 {policy_id} 移动成功")
return True
else:
self.logger.error(f"策略 {policy_id} 移动失败: {response.get('message', '未知错误')}")
return False
except Exception as e:
self.logger.error(f"移动策略时发生未知错误:{str(e)}")
raise
2.11 获取最近使用的拦截地址组方法
IP封禁任务过程中,需要将拦截的IP地址对象追加到拦截地址组,所以每次拦截任务均需要获取当前最近使用的拦截地址组是哪个,然后再向该地址组追加拦截成员。
def get_latest_deny_group(self):
"""获取最近新建的"Deny_Group_时间"名字"""
try:
self.logger.info("开始获取最近使用的拦截地址组名称")
endpoint = "/firewall/addrgrp"
params = {"format": "name"}
response = self._send_request("GET", endpoint, params=params)
pattern = re.compile(r'^Deny_Group_d+$')
deny_groups = sorted(
[item['name'] for item in response.get('results', []) if pattern.match(item['name'])],
key=lambda x: x.split('_')[-1],
reverse=True
)
if not deny_groups:
self.logger.warning("未找到匹配的Deny_Group_*地址组")
return None
deny_group_name = deny_groups[0]
self.logger.info(f"成功获取最新拦截地址组: {deny_group_name}")
return deny_group_name
except Exception as e:
self.logger.error(f"获取拦截地址组名称失败: {str(e)}")
raise
2.12 获取所有拦截地址组方法
在IP解禁工作过程中,需要获取所有拦截地址组,以便后续获取其成员列表,判断IP地址对象与拦截地址组的映射关系。
def get_all_deny_group(self):
"""获取最近新建的"Deny_Group_时间"名字"""
try:
self.logger.info("开始获取所有拦截地址组名称")
endpoint = "/firewall/addrgrp"
params = {"format": "name"}
response = self._send_request("GET", endpoint, params=params)
pattern = re.compile(r'^Deny_Group_d+$')
all_deny_groups = sorted(
[item['name'] for item in response.get('results', []) if pattern.match(item['name'])],
key=lambda x: x.split('_')[-1],
reverse=True
)
# print("所有拦截地址组名:%s"%all_deny_groups)
if not all_deny_groups:
self.logger.warning("未找到匹配的Deny_Group_*地址组")
return None
self.logger.info(f"成功获取所有拦截地址组名,共计: {len(all_deny_groups)}个")
return all_deny_groups
except Exception as e:
self.logger.error(f"获取拦截地址组名称失败: {str(e)}")
raise
2.13 获取地址组成员列表方法
获取地址组成员列表,判断其是否包含需要解禁的IP地址。
def get_group_members(self, group_name):
"""获取指定地址组的成员列表"""
try:
self.logger.info(f"开始获取地址组 {group_name} 的成员列表")
endpoint = f"/firewall/addrgrp"
params = {"filter": f"name=={group_name}"}
response = self._send_request("GET", endpoint, params=params)
members = response.get('results', [{}])[0].get('member', [])
members = [item['name'] for item in members]
self.logger.info(f"成功获取地址组 {group_name} 的成员列表,共 {len(members)} 个成员")
return members
except Exception as e:
self.logger.error(f"获取地址组成员失败: {str(e)}")
raise
2.14 封禁IP地址过滤方法
IP地址封禁过程中,判断IP地址是否已存在于当前的拦截地址组。
def filter_new_ips(self, members, ip_list):
"""过滤已存在于地址组中的IP,返回需要新增的IP列表"""
try:
self.logger.info(f"开始过滤已存在于拦截地址组中的IP,共检查 {len(ip_list['ok'])} 个IP")
existing_ips = []
new_ips = []
for item in ip_list['ok']:
ip_name = item.get('name')
if not ip_name:
continue
if ip_name in members:
existing_ips.append(ip_name)
else:
new_ips.append(item)
self.logger.info(f"过滤结果: 已存在 {len(existing_ips)} 个IP, 新增 {len(new_ips)} 个IP")
return new_ips
except Exception as e:
self.logger.error(f"过滤IP列表失败: {str(e)}")
raise
2.15 地址组追加地址成员方法
def add_ips_to_group(self, ip_list, group_name):
"""将IP地址添加到地址组"""
try:
self.logger.info(f"开始将 {len(ip_list)} 个IP添加到地址组 {group_name}")
result = []
for ip_data in ip_list:
ip_name = ip_data.get('name')
try:
endpoint = f"/firewall/addrgrp/{group_name}/member"
response = self._send_request("POST", endpoint, data={"name": ip_name})
if response['http_status'] == 200:
result.append({"name": ip_name, "success": True, "message": "封禁成功"})
self.logger.info(f"成功将 {ip_name} 添加到地址组 {group_name}")
else:
self.logger.warning(f"地址组 {group_name} 已满,即将创建新的地址组")
# 创建新的地址组
timestamp = datetime.datetime.now().strftime("%y%m%d%H%M")
new_group_name = f"Deny_Group_{timestamp}"
self.create_address_group(new_group_name, [ip_name])
self.logger.info(f"创建新地址组: {new_group_name}")
# 将新地址组追加到全局组
self.append_member_to_addrg("Deny_Group_Set", new_group_name)
result.append(
{"name": ip_name, "success": True, "message": f"地址组已满,已添加到新地址组 {new_group_name}"})
except Exception as e:
result.append({"name": ip_name, "success": False, "message": str(e)})
self.logger.error(f"添加 {ip_name} 到地址组时发生未知错误: {str(e)}")
success_count = len([r for r in result if r['success']])
self.logger.info(f"批量封禁IP完成,成功: {success_count}, 总数: {len(result)}")
return result
except Exception as e:
self.logger.error(f"批量封禁IP过程发生灾难性错误: {str(e)}")
raise
2.16 IP地址与地址组归属查询方法
用于查询IP地址对象与拦截地质组的映射关系,方便后续对其进行解封操作。
def find_ips_in_groups(self, ip_list):
"""查找IP存在于哪些拦截组中"""
try:
self.logger.info(f"开始查找 {len(ip_list)} IP关联的拦截组")
ip_groups = {}
# 获取所有Deny组
all_deny_groups = self.get_all_deny_group()
if not all_deny_groups:
self.logger.info("未找到匹配的拦截地址组")
return {}
# 获取每个组的成员
group_members = {}
for group_name in all_deny_groups:
members = self.get_group_members(group_name)
group_members[group_name] = members
# 查找IP所在的组
for item in ip_list['ok']:
ip_name = item.get('name')
if not ip_name:
continue
for group_name, members in group_members.items():
if ip_name in members:
if ip_name not in ip_groups:
ip_groups[ip_name] = []
ip_groups[ip_name].append(group_name)
self.logger.info(f"IP {ip_name} 存在于组 {group_name} 中")
self.logger.info(f"共找到 {len(ip_groups)} 个IP存在于拦截组中")
return ip_groups
except Exception as e:
self.logger.error(f"查找待解禁IP失败: {str(e)}")
raise
2.17 地址组移除地址对象方法
执行IP地址解封任务时,将IP地址对象从拦截地址组中进行移除。
def remove_ips_from_groups(self, ip_groups):
"""从拦截组中移除IP"""
try:
self.logger.info(f"开始从拦截组中移除 {len(ip_groups)} 个IP")
result = []
for ip_name, groups in ip_groups.items():
for group_name in groups:
encoded_ip = quote(ip_name, safe='')
endpoint = f"/firewall/addrgrp/{group_name}/member/{encoded_ip}"
try:
self._send_request("DELETE", endpoint)
result.append({"name": ip_name, "group": group_name, "success": True, "message": "解禁成功"})
self.logger.info(f"成功从组 {group_name} 中移除IP {ip_name}")
except requests.exceptions.HTTPError as e:
result.append({"name": ip_name, "group": group_name, "success": False,
"message": f"HTTP错误 {e.response.status_code}"})
self.logger.error(f"从组 {group_name} 移除IP {ip_name} 失败: {e.response.text}")
except Exception as e:
result.append({"name": ip_name, "group": group_name, "success": False, "message": str(e)})
self.logger.error(f"从组 {group_name} 移除IP {ip_name} 时发生未知错误: {str(e)}")
success_count = len([r for r in result if r['success']])
self.logger.info(f"批量移除IP完成,成功: {success_count}, 总数: {len(result)}")
return result
except Exception as e:
self.logger.error(f"批量移除IP过程发生灾难性错误: {str(e)}")
raise
2.18 拦截策略部署方法
执行拦截策略一键部署时调用的方法。
def deploy_block_policy(self, policy_id, policy_name="护网拦截策略"):
"""部署防火墙拦截策略"""
try:
self.logger.info(f"开始部署防火墙拦截策略,策略ID: {policy_id}")
# 检查策略是否已存在
if self.check_policy_exists(policy_id):
self.logger.warning(f"策略ID {policy_id} 已存在")
return {"success": False, "message": f"策略ID {policy_id} 已存在"}
# 创建示例地址对象
ip_list = [{'name': '127.0.0.100/32', 'type': 'ipmask', 'subnet': '127.0.0.100 255.255.255.255'}]
if not self.create_address_object(ip_list):
return {"success": False, "message": "创建示例地址对象失败"}
# 创建动态地址组
timestamp = datetime.datetime.now().strftime("%y%m%d%H%M")
addrgrp_name = f"Deny_Group_{timestamp}"
if not self.create_address_group(addrgrp_name, ["127.0.0.100/32"]):
return {"success": False, "message": "创建动态地址组失败"}
# 创建全局地址组并添加成员到全局组
if not self.create_global_group("Deny_Group_Set", addrgrp_name):
return {"success": False, "message": "添加到全局组失败"}
# 创建防火墙策略
if not self.create_firewall_policy(policy_id, policy_name, "Deny_Group_Set"):
return {"success": False, "message": "创建防火墙策略失败"}
# 移动策略到顶部
if not self.move_policy_to_top(policy_id):
return {"success": False, "message": "移动策略到顶部失败"}
self.logger.info(f"防火墙拦截策略 {policy_id} 部署成功")
return {"success": True, "message": "防火墙拦截策略部署成功"}
except Exception as e:
self.logger.critical(f"部署防火墙拦截策略时发生灾难性错误: {str(e)}", exc_info=True)
return {"success": False, "message": f"部署过程发生未知错误: {str(e)}"}
2.19 IP封禁方法
执行一键封禁任务时调用的IP封禁方法。
def block_ips(self, ip_list):
"""封禁IP操作"""
try:
self.logger.info(f"开始处理IP封禁任务,待处理IP数量: {len(ip_list['ok'])}")
# 获取最新拦截组
deny_group_name = self.get_latest_deny_group()
# 获取组成员
members = self.get_group_members(deny_group_name)
# 过滤已存在的IP
new_ips = self.filter_new_ips(members, ip_list)
if not new_ips:
self.logger.info("所有IP已存在于拦截组中,无需执行封禁操作")
return {"success": True, "message": "无需封禁", "details": []}
# 创建地址对象
self.create_address_object(new_ips)
# 执行封禁
result = self.add_ips_to_group(new_ips, deny_group_name)
success_count = len([r for r in result if r['success']])
self.logger.info(f"IP封禁任务完成,成功: {success_count}, 总数: {len(result)}")
return {"success": True, "message": f"IP封禁成功,成功: {success_count}, 总数: {len(result)}",
"details": result}
except Exception as e:
self.logger.critical(f"封禁IP过程发生灾难性错误: {str(e)}", exc_info=True)
return {"success": False, "message": f"封禁IP过程发生未知错误: {str(e)}"}
2.20 IP解禁方法
执行IP地址一键解禁时调用的方法
def unblock_ips(self, ip_list):
"""解封IP列表"""
try:
self.logger.info(f"开始处理IP解封任务,待处理IP数量: {len(ip_list)}")
# 查找IP所在的组
ip_related_groups = self.find_ips_in_groups(ip_list)
if not ip_related_groups:
self.logger.info("所有IP均不在拦截组中,无需执行解封操作")
return {"success": True, "message": "无需解封", "details": []}
# 执行解封
result = self.remove_ips_from_groups(ip_related_groups)
success_count = len([r for r in result if r['success']])
self.logger.info(f"IP解封任务完成,成功: {success_count}, 总数: {len(result)}")
return {"success": True, "message": f"IP解封成功,成功: {success_count}, 总数: {len(result)}",
"details": result}
except Exception as e:
self.logger.critical(f"解封IP过程发生灾难性错误: {str(e)}", exc_info=True)
return {"success": False, "message": f"解封IP过程发生未知错误: {str(e)}"}
2.21 临时封禁方法
执行临时封禁任务时调用的方法。
def temporary_block(self, ip_list, expiry):
"""临时封禁IP操作"""
print(ip_list)
endpoint = "/monitor/user/banned/add_users"
self.base_url = f"https://{self.host}/api/v2"
data = {"ip_addresses":ip_list['ok'], "expiry": expiry,}
print(data)
try:
self.logger.info(f"开始处理IP临时封禁任务,待处理IP数量: {len(ip_list['ok'])}")
response = self._send_request("POST", endpoint, data=data)
print(response)
self.logger.info(f"IP封禁任务完成,成功: {len(ip_list['ok'])}, 总数: {len(ip_list['ok'])}")
return {"success": True, "message": f"IP封禁成功,成功: {len(ip_list['ok'])}, 总数: {len(ip_list['ok'])}"}
except Exception as e:
if re.search('Failed to establish',str(e)):
self.logger.critical(f"防火墙API连接失败,请检查通信和设备状态", exc_info=True)
return {"success": False, "message": f"防火墙API连接失败,请检查通信和设备状态"}
else:
self.logger.critical(f"临时封禁IP过程发生未知性错误: {str(e)}", exc_info=True)
return {"success": False, "message": f"临时封禁IP过程发生未知错误: {str(e)}"}
三、总结
初次使用面向对象的设计思想,用的不是太熟练,其中很多方法可以进行优化合并。当前主要追求功能实现,后续有时间了再进行整体优化。
















暂无评论内容