背景
随着MCP协议在企业和个人应用中的广泛采用,安全性已成为MCP系统设计和开发中不可忽视的核心要素。一个健壮的MCP安全体系不仅能保护敏感数据和用户隐私,还能确保AI模型与外部工具交互的可靠性和完整性。本文将深入探讨MCP TypeScript-SDK的安全体系建设,帮助开发者构建既强大又安全的MCP应用。
1. 威胁模型与风险评估
1.1 MCP应用面临的安全威胁
MCP应用作为连接AI模型与外部工具的桥梁,面临着多方面的安全威胁。了解这些威胁是构建安全体系的第一步:
1.1.1 外部攻击威胁
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ │ │ │ │ │
│ 恶意用户 │────▶│ MCP 系统 │────▶│ 外部资源/工具 │
│ │ │ │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ ▲ │
│ │ │
└───────────────────────┴───────────────────────┘
攻击路径
恶意输入注入:攻击者可能通过向MCP系统提交恶意构造的输入,尝试操纵AI模型或后端工具。
身份冒充:未经授权的用户可能尝试伪装为合法用户访问MCP服务。
中间人攻击:在客户端与服务器之间截获或篡改通信内容。
拒绝服务攻击:通过大量请求使MCP服务器资源耗尽,导致服务不可用。
1.1.2 内部安全威胁
权限滥用:具有合法访问权限的用户可能滥用其权限获取敏感数据。
数据泄露:内部人员可能有意或无意地暴露敏感信息。
配置错误:错误的安全配置可能导致不必要的系统暴露。
1.1.3 工具集成风险
MCP的核心价值在于连接AI模型与外部工具,这也带来了独特的安全风险:
工具能力滥用:恶意用户可能尝试使用MCP提供的工具执行未授权操作。
工具链污染:第三方工具可能包含恶意代码或漏洞。
数据流转风险:敏感数据在多个工具间流转时面临泄露风险。
1.2 风险评估方法论
对MCP系统进行全面的风险评估,可采用以下方法论:
1.2.1 STRIDE威胁建模
STRIDE是一种常用的威胁建模方法,特别适用于MCP这类复杂系统:
| 威胁类型 | 描述 | MCP相关示例 |
|---|---|---|
| 欺骗(Spoofing) | 冒充他人身份 | 攻击者冒充管理员访问MCP管理界面 |
| 篡改(Tampering) | 修改系统数据 | 修改MCP资源定义或工具参数 |
| 否认(Repudiation) | 否认曾执行的操作 | 用户否认曾使用MCP工具执行某操作 |
| 信息泄露(Information Disclosure) | 未授权访问信息 | 获取MCP处理的敏感数据 |
| 拒绝服务(Denial of Service) | 使服务不可用 | 通过大量请求使MCP服务器崩溃 |
| 权限提升(Elevation of Privilege) | 获取未授权的权限 | 从普通用户提升为MCP管理员 |
1.2.2 风险评分矩阵
使用风险评分矩阵可以量化各种威胁的风险级别:
// 风险评估辅助函数
function assessRisk(
threat: string,
likelihood: 1 | 2 | 3, // 1=低, 2=中, 3=高
impact: 1 | 2 | 3 // 1=低, 2=中, 3=高
): {
threat: string; risk: string; score: number } {
const score = likelihood * impact;
let risk = "低";
if (score >= 7) risk = "高";
else if (score >= 4) risk = "中";
return {
threat, risk, score };
}
// 使用示例
const mcpRiskMatrix = [
assessRisk("未授权工具访问", 3, 3), // 高风险
assessRisk("配置文件泄露", 2, 3), // 中风险
assessRisk("日志信息泄露", 2, 1) // 低风险
];
1.3 安全优先级确定
基于风险评估结果,可以建立MCP系统的安全优先级框架:
1.3.1 关键安全领域排序
const securityPriorities = [
{
area: "身份验证与授权",
priority: "高",
rationale: "直接影响谁可以访问MCP系统及其功能"
},
{
area: "工具API安全",
priority: "高",
rationale: "防止工具被滥用执行未授权操作"
},
{
area: "数据传输加密",
priority: "高",
rationale: "保护传输中的敏感信息不被截获"
},
{
area: "输入验证",
priority: "中",
rationale: "防止注入攻击和不合法输入"
},
{
area: "错误处理与日志",
priority: "中",
rationale: "确保安全事件可被检测和回溯"
},
{
area: "依赖项安全",
priority: "中",
rationale: "减轻第三方库引入的风险"
}
];
1.3.2 制定安全规划路线图
根据安全优先级,可以制定分阶段的安全实施路线图:
基础安全阶段:实现必要的身份验证、加密和输入验证
增强安全阶段:添加细粒度访问控制、异常检测和安全审计
高级安全阶段:实施高级威胁防护、自动化安全测试和持续监控
1.4 典型攻击场景分析
以下是MCP系统可能面临的典型攻击场景及其防御策略:
1.4.1 工具命令注入攻击
攻击场景:攻击者构造恶意输入,尝试在工具执行上下文中注入命令。
// 易受攻击的工具实现
const vulnerableFileTool = {
name: "readFile",
description: "读取文件内容",
parameters: {
path: {
type: "string", description: "文件路径" }
},
handler: async ({
path }) => {
// 危险: 直接使用用户输入而不验证
const command = `cat ${
path}`;
return executeCommand(command); // 可能遭受命令注入
}
};
// 防御示例
const secureFileTool = {
name: "readFile",
description: "读取文件内容",
parameters: {
path: {
type: "string", description: "文件路径" }
},
handler: async ({
path }) => {
// 验证输入是否为安全的文件路径
if (!isValidFilePath(path)) {
throw new Error("无效的文件路径");
}
// 使用安全的文件读取API而非命令执行
return fs.promises.readFile(path, 'utf-8');
}
};
function isValidFilePath(path: string): boolean {
// 实现路径验证逻辑
return /^[a-zA-Z0-9_-/.]+$/.test(path) && !path.includes('..');
}
1.4.2 权限逃逸攻击
攻击场景:攻击者尝试绕过访问控制,获取未授权的资源。
// 易受攻击的实现
app.get('/api/mcp/resources/:resourceId', async (req, res) => {
const {
resourceId } = req.params;
// 危险: 未检查用户是否有权访问该资源
const resource = await getResource(resourceId);
res.json(resource);
});
// 防御示例
app.get('/api/mcp/resources/:resourceId', async (req, res) => {
const {
resourceId } = req.params;
const userId = getUserIdFromToken(req.headers.authorization);
// 先检查权限
if (!await hasAccessToResource(userId, resourceId)) {
return res.status(403).json({
error: '无权访问此资源' });
}
const resource = await getResource(resourceId);
res.json(resource);
});
1.4.3 中间人攻击
攻击场景:攻击者截获MCP客户端与服务器之间的通信。
防御策略:
始终使用TLS加密通信
实现证书验证
考虑使用HTTP严格传输安全(HSTS)
import https from 'https';
import fs from 'fs';
import express from 'express';
const app = express();
// HTTPS服务器配置
const httpsOptions = {
key: fs.readFileSync('server.key'),
cert: fs.readFileSync('server.cert'),
// 强制使用现代TLS版本
minVersion: 'TLSv1.2'
};
// 设置HSTS头
app.use((req, res, next) => {
res.setHeader(
'Strict-Transport-Security',
'max-age=31536000; includeSubDomains; preload'
);
next();
});
// 创建HTTPS服务器
https.createServer(httpsOptions, app).listen(443, () => {
console.log('MCP服务器在HTTPS端口443上启动');
});
通过深入分析MCP应用面临的威胁模型并进行全面风险评估,开发者可以有的放矢地构建安全体系,为后续的安全实施奠定坚实基础。
2. 数据加密与传输安全
MCP系统处理的数据可能包含敏感信息,保护这些数据的安全至关重要。本节将探讨如何在MCP应用中实现数据加密与传输安全。
2.1 通信层加密实现
MCP应用需要确保客户端与服务器之间的所有通信都经过加密,防止数据在传输过程中被窃取或篡改。
2.1.1 TLS/SSL配置
在MCP服务器中实现TLS加密:
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
import https from 'https';
import fs from 'fs';
// 读取TLS证书
const options = {
key: fs.readFileSync('private-key.pem'),
cert: fs.readFileSync('certificate.pem'),
// 推荐的TLS安全配置
ciphers: 'TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256',
minVersion: 'TLSv1.2',
// 可选:要求客户端证书进行双向TLS认证
requestCert: false,
rejectUnauthorized: false
};
// 创建HTTPS服务器
const httpsServer = https.createServer(options);
// 配置MCP服务器使用HTTPS
const mcpServer = new McpServer({
transport: {
type: 'http',
server: httpsServer
}
});
httpsServer.listen(443, () => {
console.log('MCP HTTPS服务器启动在端口443');
});
2.1.2 客户端连接安全
MCP客户端在连接服务器时应验证TLS证书:
import {
McpClient } from '@modelcontextprotocol/typescript-sdk';
import https from 'https';
import fs from 'fs';
// 创建安全的HTTPS代理
const httpsAgent = new https.Agent({
// 可信CA证书列表,用于验证服务器证书
ca: fs.readFileSync('trusted-ca.pem'),
// 检查服务器名称是否与证书匹配
checkServerIdentity: (host, cert) => {
// 实现证书验证逻辑
if (!cert.subject.CN.includes(host)) {
return new Error('证书CN不匹配目标主机');
}
return undefined; // 验证通过
},
// 可选:客户端证书(双向TLS)
key: fs.readFileSync('client-key.pem'),
cert: fs.readFileSync('client-cert.pem')
});
// 配置客户端使用安全连接
const client = new McpClient({
url: 'https://mcp-server.example.com',
httpsAgent
});
2.2 敏感数据存储加密
除了传输安全,MCP应用还需要保护存储中的敏感数据。
2.2.1 配置文件加密
敏感配置(如API密钥)应加密存储:
import crypto from 'crypto';
import fs from 'fs';
// 加密配置数据
function encryptConfig(configData: any, masterKey: Buffer): string {
// 生成随机初始化向量
const iv = crypto.randomBytes(16);
// 创建加密器
const cipher = crypto.createCipheriv('aes-256-gcm', masterKey, iv);
// 加密数据
let encrypted = cipher.update(JSON.stringify(configData), 'utf8', 'hex');
encrypted += cipher.final('hex');
// 获取认证标签
const authTag = cipher.getAuthTag();
// 组合IV、认证标签和加密数据
return JSON.stringify({
iv: iv.toString('hex'),
authTag: authTag.toString('hex'),
encryptedData: encrypted
});
}
// 解密配置数据
function decryptConfig(encryptedString: string, masterKey: Buffer): any {
const encryptedObj = JSON.parse(encryptedString);
// 从存储的格式中提取组件
const iv = Buffer.from(encryptedObj.iv, 'hex');
const authTag = Buffer.from(encryptedObj.authTag, 'hex');
const encryptedData = encryptedObj.encryptedData;
// 创建解密器
const decipher = crypto.createDecipheriv('aes-256-gcm', masterKey, iv);
decipher.setAuthTag(authTag);
// 解密数据
let decrypted = decipher.update(encryptedData, 'hex', 'utf8');
decrypted += decipher.final('utf8');
return JSON.parse(decrypted);
}
// 使用示例
const masterKey = crypto.scryptSync('强密码', '盐值', 32); // 安全地生成密钥
// 存储加密配置
const config = {
apiKey: 'super-secret-api-key',
userCredentials: {
username: 'admin',
password: 'secret-password'
}
};
const encryptedConfig = encryptConfig(config, masterKey);
fs.writeFileSync('config.encrypted', encryptedConfig);
// 读取并解密配置
const storedConfig = fs.readFileSync('config.encrypted', 'utf8');
const decryptedConfig = decryptConfig(storedConfig, masterKey);
2.2.2 会话数据加密
MCP会话数据的安全存储:
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
import crypto from 'crypto';
// 加密会话存储实现
class EncryptedSessionStore {
private encryptionKey: Buffer;
private sessions: Map<string, string> = new Map();
constructor(encryptionKey: Buffer) {
this.encryptionKey = encryptionKey;
}
// 存储加密会话
async saveSession(sessionId: string, sessionData: any): Promise<void> {
const encrypted = this.encryptData(JSON.stringify(sessionData));
this.sessions.set(sessionId, encrypted);
}
// 获取并解密会话
async getSession(sessionId: string): Promise<any | null> {
const encrypted = this.sessions.get(sessionId);
if (!encrypted) return null;
try {
const decrypted = this.decryptData(encrypted);
return JSON.parse(decrypted);
} catch (error) {
console.error('会话解密失败:', error);
return null;
}
}
// 删除会话
async deleteSession(sessionId: string): Promise<void> {
this.sessions.delete(sessionId);
}
// 加密数据
private encryptData(data: string): string {
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv('aes-256-gcm', this.encryptionKey, iv);
let encrypted = cipher.update(data, 'utf8', 'hex');
encrypted += cipher.final('hex');
const authTag = cipher.getAuthTag();
return JSON.stringify({
iv: iv.toString('hex'),
authTag: authTag.toString('hex'),
data: encrypted
});
}
// 解密数据
private decryptData(encryptedJson: string): string {
const {
iv, authTag, data } = JSON.parse(encryptedJson);
const decipher = crypto.createDecipheriv(
'aes-256-gcm',
this.encryptionKey,
Buffer.from(iv, 'hex')
);
decipher.setAuthTag(Buffer.from(authTag, 'hex'));
let decrypted = decipher.update(data, 'hex', 'utf8');
decrypted += decipher.final('utf8');
return decrypted;
}
}
// 生成安全的加密密钥
const sessionKey = crypto.randomBytes(32);
const sessionStore = new EncryptedSessionStore(sessionKey);
// 在MCP服务器中使用加密会话存储
const server = new McpServer({
sessionStorage: {
async save(sessionId, data) {
await sessionStore.saveSession(sessionId, data);
},
async load(sessionId) {
return sessionStore.getSession(sessionId);
},
async delete(sessionId) {
await sessionStore.deleteSession(sessionId);
}
}
});
2.3 密钥管理最佳实践
有效的密钥管理是数据加密安全性的关键。
2.3.1 密钥轮换机制
实现定期密钥轮换以提高安全性:
import crypto from 'crypto';
class KeyRotationManager {
private currentKey: Buffer;
private previousKeys: Map<string, Buffer> = new Map();
private keyLifetimeMs: number;
private currentKeyId: string;
constructor(keyLifetimeMs: number = 7 * 24 * 60 * 60 * 1000) {
// 默认7天
this.keyLifetimeMs = keyLifetimeMs;
// 生成初始密钥
this.currentKeyId = this.generateKeyId();
this.currentKey = crypto.randomBytes(32);
// 设置定期轮换
setInterval(() => this.rotateKey(), this.keyLifetimeMs);
}
private generateKeyId(): string {
return Date.now().toString(36) + Math.random().toString(36).substr(2, 5);
}
private rotateKey(): void {
// 保存当前密钥
this.previousKeys.set(this.currentKeyId, this.currentKey);
// 生成新密钥
this.currentKeyId = this.generateKeyId();
this.currentKey = crypto.randomBytes(32);
// 移除过期密钥(保留最近的5个密钥)
const keyIds = Array.from(this.previousKeys.keys());
if (keyIds.length > 5) {
const oldestKeyId = keyIds[0];
this.previousKeys.delete(oldestKeyId);
}
console.log(`密钥已轮换,新密钥ID: ${
this.currentKeyId}`);
}
// 获取当前加密密钥和ID
getCurrentKey(): {
keyId: string, key: Buffer } {
return {
keyId: this.currentKeyId,
key: this.currentKey
};
}
// 通过ID获取密钥(用于解密旧数据)
getKeyById(keyId: string): Buffer | null {
if (keyId === this.currentKeyId) {
return this.currentKey;
}
return this.previousKeys.get(keyId) || null;
}
// 使用当前密钥加密数据
encrypt(data: string): {
keyId: string, encryptedData: string } {
const {
keyId, key } = this.getCurrentKey();
const iv = crypto.randomBytes(16);
const cipher = crypto.createCipheriv('aes-256-gcm', key, iv);
let encrypted = cipher.update(data, 'utf8', 'hex');
encrypted += cipher.final('hex');
const authTag = cipher.getAuthTag();
return {
keyId,
encryptedData: JSON.stringify({
iv: iv.toString('hex'),
authTag: authTag.toString('hex'),
data: encrypted
})
};
}
// 解密数据(支持使用旧密钥)
decrypt(keyId: string, encryptedDataJson: string): string {
const key = this.getKeyById(keyId);
if (!key) {
throw new Error(`找不到ID为 ${
keyId} 的密钥`);
}
const {
iv, authTag, data } = JSON.parse(encryptedDataJson);
const decipher = crypto.createDecipheriv(
'aes-256-gcm',
key,
Buffer.from(iv, 'hex')
);
decipher.setAuthTag(Buffer.from(authTag, 'hex'));
let decrypted = decipher.update(data, 'hex', 'utf8');
decrypted += decipher.final('utf8');
return decrypted;
}
}
// 使用密钥轮换管理器
const keyManager = new KeyRotationManager();
// 加密数据
const {
keyId, encryptedData } = keyManager.encrypt('敏感数据');
// 稍后解密数据
const decrypted = keyManager.decrypt(keyId, encryptedData);
2.3.2 硬件安全模块(HSM)集成
对于高安全性要求的MCP应用,可以考虑集成硬件安全模块:
import crypto from 'crypto';
import {
HSM } from 'some-hsm-library'; // 假设的HSM库
class HsmKeyManager {
private hsm: HSM;
constructor(hsmConfig: any) {
// 初始化HSM连接
this.hsm = new HSM(hsmConfig);
}
// 使用HSM生成密钥
async generateKey(keyLabel: string): Promise<string> {
return this.hsm.generateKey({
label: keyLabel,
algorithm: 'AES',
size: 256,
extractable: false // 密钥不可导出
});
}
// 使用HSM中的密钥加密数据
async encrypt(data: string, keyLabel: string): Promise<string> {
const iv = crypto.randomBytes(16);
const {
encrypted, authTag } = await this.hsm.encrypt({
algorithm: 'AES-GCM',
key: keyLabel,
iv,
data: Buffer.from(data, 'utf8')
});
return JSON.stringify({
iv: iv.toString('hex'),
authTag: authTag.toString('hex'),
data: encrypted.toString('hex')
});
}
// 使用HSM中的密钥解密数据
async decrypt(encryptedDataJson: string, keyLabel: string): Promise<string> {
const {
iv, authTag, data } = JSON.parse(encryptedDataJson);
const decrypted = await this.hsm.decrypt({
algorithm: 'AES-GCM',
key: keyLabel,
iv: Buffer.from(iv, 'hex'),
authTag: Buffer.from(authTag, 'hex'),
data: Buffer.from(data, 'hex')
});
return decrypted.toString('utf8');
}
}
2.4 安全传输协议选择
MCP应用支持多种传输协议,每种都有不同的安全考虑。
2.4.1 HTTP传输安全
HTTP传输是MCP最常用的方式之一,确保其安全性至关重要:
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
import express from 'express';
import helmet from 'helmet';
import rateLimit from 'express-rate-limit';
const app = express();
// 使用Helmet设置安全HTTP头
app.use(helmet());
// 设置跨域资源共享(CORS)策略
app.use((req, res, next) => {
res.header('Access-Control-Allow-Origin', 'https://trusted-client.example.com');
res.header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS');
res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization');
next();
});
// 实施速率限制防止滥用
const apiLimiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15分钟
max: 100, // 每IP最多100请求
standardHeaders: true,
legacyHeaders: false,
message: {
error: '请求过多,请稍后再试' }
});
app.use('/mcp', apiLimiter);
// 配置MCP服务器使用HTTP传输
const server = new McpServer({
transport: {
type: 'http',
// 集成Express应用
path: '/mcp',
app
}
});
// 启动服务器
app.listen(3000, () => {
console.log('MCP HTTP服务器在端口3000上启动');
});
2.4.2 WebSocket传输安全
对于使用WebSocket的实时MCP应用:
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
import https from 'https';
import fs from 'fs';
import {
WebSocketServer } from 'ws';
import url from 'url';
// HTTPS服务器配置
const httpsServer = https.createServer({
key: fs.readFileSync('private-key.pem'),
cert: fs.readFileSync('certificate.pem'),
minVersion: 'TLSv1.2'
});
// 创建安全的WebSocket服务器
const wss = new WebSocketServer({
server: httpsServer,
// 验证请求
verifyClient: (info, cb) => {
const {
query } = url.parse(info.req.url || '', true);
// 验证访问令牌
if (!query.token || !isValidToken(query.token as string)) {
return cb(false, 401, 'Unauthorized');
}
// 验证来源
const origin = info.origin || '';
if (!isAllowedOrigin(origin)) {
return cb(false, 403, 'Forbidden');
}
cb(true);
}
});
// 验证令牌
function isValidToken(token: string): boolean {
// 实现令牌验证逻辑
return true; // 示例
}
// 验证允许的来源
function isAllowedOrigin(origin: string): boolean {
const allowedOrigins = [
'https://app.example.com',
'https://admin.example.com'
];
return allowedOrigins.includes(origin);
}
// 配置MCP服务器使用WebSocket传输
const server = new McpServer({
transport: {
type: 'websocket',
server: wss
}
});
// 启动HTTPS服务器
httpsServer.listen(3000, () => {
console.log('MCP WebSocket服务器在端口3000上启动');
});
2.4.3 stdio传输安全
对于本地进程间通信的stdio传输:
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
import crypto from 'crypto';
// 生成本地认证令牌
const localAuthToken = crypto.randomBytes(32).toString('hex');
// 将令牌安全地传递给子进程
process.env.MCP_LOCAL_AUTH_TOKEN = localAuthToken;
// 配置MCP服务器使用stdio传输,并添加认证
const server = new McpServer({
transport: {
type: 'stdio'
},
// 添加认证中间件
middleware: [
async (context, next) => {
// 从请求中提取令牌
const authToken = context.request.headers?.authorization;
// 验证令牌
if (authToken !== `Bearer ${
localAuthToken}`) {
throw new Error('未授权访问');
}
return next();
}
]
});
// 启动服务器
server.start();
通过全面实施数据加密和传输安全措施,MCP应用可以有效防止数据泄露和拦截攻击,为用户提供更安全可靠的服务。
3. 注入攻击防御
MCP系统作为连接AI模型与外部工具的桥梁,可能面临各种注入攻击。本节将探讨如何在MCP应用中防御这些攻击。
3.1 输入验证与净化
输入验证是防御注入攻击的第一道防线,必须对所有来自外部的数据进行严格验证。
3.1.1 使用Zod进行输入验证
MCP TypeScript-SDK支持使用Zod库进行参数验证,这是防御注入攻击的有效方法:
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
import {
z } from 'zod';
// 创建MCP服务器
const server = new McpServer();
// 添加使用Zod验证的工具
server.addTool({
name: 'searchDatabase',
description: '在数据库中搜索记录',
// 使用Zod定义参数模式
parameters: z.object({
// 为了安全,限制搜索条件为特定格式
query: z.string()
.min(3)
.max(100)
.regex(/^[a-zA-Z0-9s]+$/, '只允许字母、数字和空格'),
limit: z.number().int().min(1).max(100).default(10)
}),
handler: async ({
query, limit }) => {
// 此时,参数已经过验证,确保符合指定模式
// 执行安全的数据库查询
return safeDbQuery(query, limit);
}
});
// 安全的数据库查询实现
async function safeDbQuery(query: string, limit: number) {
// 使用参数化查询,而不是字符串拼接
// 参数化查询由数据库驱动程序处理,防止SQL注入
const result = await db.query(
'SELECT * FROM records WHERE name LIKE ? LIMIT ?',
[`%${
query}%`, limit]
);
return result;
}
3.1.2 深层输入净化
对于复杂的输入数据,需要进行深层净化:
import DOMPurify from 'dompurify';
import {
JSDOM } from 'jsdom';
// 创建DOMPurify实例
const window = new JSDOM('').window;
const purify = DOMPurify(window);
// 用于净化HTML内容的工具
server.addTool({
name: 'formatHtmlContent',
description: '格式化HTML内容并确保安全',
parameters: z.object({
htmlContent: z.string(),
allowedTags: z.array(z.string()).default(['p', 'b', 'i', 'a', 'ul', 'li'])
}),
handler: async ({
htmlContent, allowedTags }) => {
// 净化HTML,仅允许特定标签
const cleanHtml = purify.sanitize(htmlContent, {
ALLOWED_TAGS: allowedTags,
ALLOWED_ATTR: ['href', 'target', 'title'],
ALLOW_DATA_ATTR: false
});
return {
cleanHtml };
}
});
3.2 参数校验机制
除了基本的输入验证,MCP应用还应实施全面的参数校验机制。
3.2.1 运行时类型检查
TypeScript的类型系统在编译时提供类型安全,但在运行时添加额外检查同样重要:
// 运行时类型检查工具函数
function validateType(value: any, type: string, fieldName: string): void {
let isValid = false;
switch (type) {
case 'string':
isValid = typeof value === 'string';
break;
case 'number':
isValid = typeof value === 'number' && !isNaN(value);
break;
case 'boolean':
isValid = typeof value === 'boolean';
break;
case 'array':
isValid = Array.isArray(value);
break;
case 'object':
isValid = typeof value === 'object' && value !== null && !Array.isArray(value);
break;
default:
throw new Error(`不支持的类型: ${
type}`);
}
if (!isValid) {
throw new Error(`${
fieldName} 应该是 ${
type} 类型,但接收到 ${
typeof value}`);
}
}
// 用于深度验证结构的函数
function validateStructure(obj: any, schema: any, path: string = ''): void {
if (!obj || typeof obj !== 'object') {
throw new Error(`${
path || 'value'} 必须是一个对象`);
}
for (const [key, definition] of Object.entries(schema)) {
const currentPath = path ? `${
path}.${
key}` : key;
const value = obj[key];
// 检查必填字段
if (definition.required && (value === undefined || value === null)) {
throw new Error(`缺少必填字段: ${
currentPath}`);
}
// 如果有值,验证类型
if (value !== undefined && value !== null) {
validateType(value, definition.type, currentPath);
// 对象类型的递归验证
if (definition.type === 'object' && definition.properties) {
validateStructure(value, definition.properties, currentPath);
}
// 数组类型的元素验证
if (definition.type === 'array' && definition.items) {
value.forEach((item: any, index: number) => {
validateType(item, definition.items.type, `${
currentPath}[${
index}]`);
});
}
}
}
}
// 在MCP工具中使用结构验证
server.addTool({
name: 'processUserProfile',
description: '处理用户个人资料',
parameters: z.object({
profile: z.any() // 使用自定义验证
}),
handler: async ({
profile }) => {
// 定义预期的结构
const profileSchema = {
username: {
type: 'string', required: true },
age: {
type: 'number', required: false },
contact: {
type: 'object',
required: false,
properties: {
email: {
type: 'string', required: true },
phone: {
type: 'string', required: false }
}
},
interests: {
type: 'array',
required: false,
items: {
type: 'string' }
}
};
// 运行时验证
validateStructure(profile, profileSchema);
// 处理已验证的用户资料
return processProfile(profile);
}
});
3.2.2 中间件参数校验
在MCP服务器中使用中间件进行全局参数校验:
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
// 创建MCP服务器
const server = new McpServer();
// 添加参数验证中间件
server.use(async (context, next) => {
// 针对工具调用的参数验证
if (context.request.method === 'tools/call') {
const {
name, arguments: args } = context.request.params;
// 针对特定工具的检查
if (name === 'executeQuery') {
// 执行SQL查询的安全检查
validateSqlQuery(args.query);
}
// 检查参数中是否有可疑模式
detectSuspiciousPatterns(args);
}
return next();
});
// 验证SQL查询是否安全
function validateSqlQuery(query: string): void {
// 检查是否包含危险的SQL关键字
const dangerousKeywords = ['DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', '--'];
const upperQuery = query.toUpperCase();
for (const keyword of dangerousKeywords) {
if (upperQuery.includes(keyword)) {
throw new Error(`查询包含不允许的关键字: ${
keyword}`);
}
}
// 检查多语句查询(通常不安全)
if (query.includes(';')) {
throw new Error('不允许多语句查询');
}
}
// 检测各种参数中的可疑模式
function detectSuspiciousPatterns(obj: any): void {
// 将对象转换为字符串以进行模式匹配
const stringified = JSON.stringify(obj);
// 检测可能的脚本注入
if (/<script>|javascript:/i.test(stringified)) {
throw new Error('检测到可能的脚本注入');
}
// 检测可能的命令注入
if (/$(|`||/g.test(stringified)) {
throw new Error('检测到可能的命令注入');
}
// 检测路径遍历尝试
if (/..//g.test(stringified)) {
throw new Error('检测到可能的路径遍历');
}
}
3.3 防SQL注入策略
MCP工具经常需要连接数据库,这使得SQL注入成为潜在的安全威胁。
3.3.1 参数化查询
始终使用参数化查询而非字符串拼接:
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
import {
Pool } from 'pg'; // 使用PostgreSQL作为示例
// 创建数据库连接池
const pool = new Pool({
user: 'dbuser',
host: 'localhost',
database: 'mcpapp',
password: 'password',
port: 5432,
});
// 创建MCP服务器
const server = new McpServer();
// 添加安全的数据库查询工具
server.addTool({
name: 'queryUserData',
description: '查询用户数据',
parameters: z.object({
userId: z.string().uuid('用户ID必须是有效的UUID'),
fields: z.array(z.string()).default(['name', 'email'])
}),
handler: async ({
userId, fields }) => {
// 验证请求的字段是否安全
const allowedFields = ['id', 'name', 'email', 'created_at', 'updated_at'];
for (const field of fields) {
if (!allowedFields.includes(field)) {
throw new Error(`不允许查询字段: ${
field}`);
}
}
// 安全地构建字段列表(不使用用户输入直接构建查询)
const fieldList = fields.map(field => `"${
field}"`).join(', ');
// 使用参数化查询
const query = `SELECT ${
fieldList} FROM users WHERE id = $1`;
const result = await pool.query(query, [userId]);
return result.rows[0] || null;
}
});
3.3.2 ORM与查询构建器
使用ORM或查询构建器进一步减少SQL注入风险:
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
import {
Knex, knex } from 'knex';
// 初始化Knex查询构建器
const db = knex({
client: 'pg',
connection: {
host: 'localhost',
user: 'dbuser',
password: 'password',
database: 'mcpapp'
}
});
// 创建MCP服务器
const server = new McpServer();
// 添加使用Knex的安全查询工具
server.addTool({
name: 'searchRecords',
description: '搜索记录',
parameters: z.object({
tableName: z.enum(['users', 'products', 'orders']),
filters: z.record(z.string(), z.any()).default({
}),
page: z.number().int().min(1).default(1),
pageSize: z.number().int().min(1).max(100).default(20)
}),
handler: async ({
tableName, filters, page, pageSize }) => {
// 计算分页参数
const offset = (page - 1) * pageSize;
// 使用Knex构建查询,防止SQL注入
const query = db(tableName)
.select('*')
.limit(pageSize)
.offset(offset);
// 安全地添加过滤条件
for (const [key, value] of Object.entries(filters)) {
query.where(key, value);
}
// 执行查询
const results = await query;
// 获取总记录数
const countQuery = db(tableName).count('* as total');
for (const [key, value] of Object.entries(filters)) {
countQuery.where(key, value);
}
const [{
total }] = await countQuery;
return {
data: results,
pagination: {
total: Number(total),
page,
pageSize,
pages: Math.ceil(Number(total) / pageSize)
}
};
}
});
3.4 防止命令注入
MCP工具可能需要执行系统命令,这是另一个潜在的注入攻击点。
3.4.1 安全的命令执行
确保系统命令执行的安全性:
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
import {
execFile } from 'child_process';
import path from 'path';
import util from 'util';
// 将execFile转换为Promise版本
const execFileAsync = util.promisify(execFile);
// 创建MCP服务器
const server = new McpServer();
// 添加安全的文件分析工具
server.addTool({
name: 'analyzeFile',
description: '分析文件内容',
parameters: z.object({
filePath: z.string(),
analysisType: z.enum(['count', 'search', 'stats']),
searchTerm: z.string().optional()
}),
handler: async ({
filePath, analysisType, searchTerm }) => {
// 安全验证文件路径
const normalizedPath = path.normalize(filePath);
const basePath = '/safe/data/directory/';
if (!normalizedPath.startsWith(basePath)) {
throw new Error('无权访问此目录');
}
// 确保文件存在
if (!fileExists(normalizedPath)) {
throw new Error('文件不存在');
}
let result;
switch (analysisType) {
case 'count':
// 使用execFile而非exec,防止命令注入
result = await execFileAsync('wc', ['-l', normalizedPath]);
return {
lineCount: parseInt(result.stdout.trim().split(' ')[0]) };
case 'search':
if (!searchTerm) {
throw new Error('搜索类型需要提供搜索词');
}
// 注意这里没有使用shell通配符或正则表达式,减少注入风险
result = await execFileAsync('grep', ['-c', searchTerm, normalizedPath]);
return {
matchCount: parseInt(result.stdout.trim()) };
case 'stats':
// 使用自定义脚本而不是直接用shell命令
result = await execFileAsync('/usr/local/bin/analyze-file.sh', [normalizedPath]);
return {
stats: JSON.parse(result.stdout) };
default:
throw new Error('不支持的分析类型');
}
}
});
// 检查文件是否存在的辅助函数
function fileExists(filePath: string): boolean {
try {
return fs.statSync(filePath).isFile();
} catch (error) {
return false;
}
}
3.4.2 隔离环境执行
对于需要执行不可信代码的场景,使用隔离环境:
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
import {
NodeVM } from 'vm2';
// 创建MCP服务器
const server = new McpServer();
// 添加安全的代码执行工具
server.addTool({
name: 'executeUserCode',
description: '在安全沙箱中执行用户提供的JavaScript代码',
parameters: z.object({
code: z.string(),
input: z.any().optional()
}),
handler: async ({
code, input }) => {
// 创建安全的虚拟机沙箱
const vm = new NodeVM({
console: 'redirect', // 重定向控制台输出
sandbox: {
input }, // 注入输入数据
timeout: 1000, // 执行超时时间(毫秒)
allowAsync: true,
// 限制可用模块
require: {
external: false, // 禁止外部模块
builtin: ['crypto'], // 只允许特定内建模块
root: './', // 模块根目录
mock: {
// 模拟特定模块
fs: {
readFileSync: () => 'Access denied'
}
}
}
});
try {
// 捕获控制台输出
let consoleOutput = [];
vm.on('console.log', (...args) => {
consoleOutput.push(args.map(arg => String(arg)).join(' '));
});
// 在沙箱中执行代码
const result = await vm.run(`module.exports = async function(input) { ${
code} }`, 'vm.js')(input);
return {
result,
consoleOutput
};
} catch (error) {
return {
error: error.message,
stack: error.stack
};
}
}
});
通过实施全面的输入验证、参数校验、安全数据库操作和安全命令执行策略,MCP应用可以有效防御各种注入攻击,确保系统的安全性和稳定性。
4. 安全审计与合规性
完善的MCP安全体系不仅包括主动防御措施,还需要建立健全的审计机制和合规实践。本节将探讨如何在MCP应用中实现安全审计和满足合规性要求。
4.1 日志记录与监控
全面的日志记录是安全审计的基础,可以帮助识别可疑活动并在安全事件发生时提供取证依据。
4.1.1 安全日志记录实践
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
import winston from 'winston';
import 'winston-daily-rotate-file';
// 创建结构化日志记录器
const logger = winston.createLogger({
level: 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
defaultMeta: {
service: 'mcp-service' },
transports: [
// 控制台输出
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
),
}),
// 安全事件日志(单独文件)
new winston.transports.DailyRotateFile({
filename: 'logs/security-%DATE%.log',
datePattern: 'YYYY-MM-DD',
maxSize: '20m',
maxFiles: '14d',
level: 'warn',
}),
// 所有请求日志
new winston.transports.DailyRotateFile({
filename: 'logs/requests-%DATE%.log',
datePattern: 'YYYY-MM-DD',
maxSize: '20m',
maxFiles: '7d',
}),
],
});
// 创建MCP服务器
const server = new McpServer();
// 添加审计日志中间件
server.use(async (context, next) => {
const requestId = generateRequestId();
const startTime = Date.now();
// 记录请求信息
logger.info('MCP请求', {
requestId,
method: context.request.method,
params: sanitizeLogData(context.request.params), // 移除敏感信息
user: context.session?.user?.id || 'anonymous',
ip: context.request.ip,
userAgent: context.request.headers?.['user-agent']
});
try {
// 处理请求
const result = await next();
// 记录响应信息
const duration = Date.now() - startTime;
logger.info('MCP响应', {
requestId,
duration,
success: true
});
return result;
} catch (error) {
// 记录错误
const duration = Date.now() - startTime;
logger.warn('MCP错误', {
requestId,
duration,
error: {
message: error.message,
stack: error.stack,
code: error.code
}
});
// 对于安全相关错误,记录更详细的信息
if (isSecurityError(error)) {
logger.warn('安全事件', {
requestId,
type: getSecurityErrorType(error),
details: error.details,
user: context.session?.user?.id || 'anonymous',
ip: context.request.ip
});
}
throw error;
}
});
// 生成唯一请求ID
function generateRequestId(): string {
return `req-${
Date.now()}-${
Math.random().toString(36).substring(2, 10)}`;
}
// 从日志中移除敏感数据
function sanitizeLogData(data: any): any {
if (!data) return data;
// 创建副本以避免修改原始对象
const sanitized = {
...data };
// 敏感字段列表
const sensitiveFields = ['password', 'token', 'apiKey', 'secret', 'creditCard'];
// 递归检查并清理对象
Object.keys(sanitized).forEach(key => {
if (sensitiveFields.includes(key.toLowerCase())) {
sanitized[key] = '******'; // 替换敏感值
} else if (typeof sanitized[key] === 'object' && sanitized[key] !== null) {
sanitized[key] = sanitizeLogData(sanitized[key]); // 递归处理嵌套对象
}
});
return sanitized;
}
// 检查是否为安全相关错误
function isSecurityError(error: any): boolean {
const securityErrorCodes = [
'UNAUTHORIZED', 'FORBIDDEN', 'INVALID_TOKEN',
'INJECTION_ATTEMPT', 'RATE_LIMIT_EXCEEDED'
];
return securityErrorCodes.includes(error.code);
}
// 获取安全错误类型
function getSecurityErrorType(error: any): string {
switch (error.code) {
case 'UNAUTHORIZED':
case 'INVALID_TOKEN':
return 'authentication_failure';
case 'FORBIDDEN':
return 'authorization_failure';
case 'INJECTION_ATTEMPT':
return 'injection_attempt';
case 'RATE_LIMIT_EXCEEDED':
return 'rate_limit_abuse';
default:
return 'other_security_event';
}
}
4.1.2 实时安全监控
除了日志记录,实时监控可以帮助快速发现和响应安全威胁:
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
import {
EventEmitter } from 'events';
// 创建安全监控器
class SecurityMonitor extends EventEmitter {
private failedAuthAttempts: Map<string, {
count: number, firstAttempt: number }> = new Map();
private suspiciousActivities: Map<string, {
count: number, activities: any[] }> = new Map();
private checkInterval: NodeJS.Timeout | null = null;
constructor(private thresholds = {
authFailures: 5, suspiciousActivities: 3 }) {
super();
// 启动定期检查
this.checkInterval = setInterval(() => this.cleanupOldEntries(), 15 * 60 * 1000); // 每15分钟
}
// 记录身份验证失败
recordAuthFailure(identifier: string): void {
const entry = this.failedAuthAttempts.get(identifier) || {
count: 0, firstAttempt: Date.now() };
entry.count++;
this.failedAuthAttempts.set(identifier, entry);
// 检查是否超过阈值
if (entry.count >= this.thresholds.authFailures) {
this.emit('brute_force_attempt', {
identifier,
attempts: entry.count,
timeSpan: Date.now() - entry.firstAttempt
});
}
}
// 记录可疑活动
recordSuspiciousActivity(userId: string, activity: any): void {
const entry = this.suspiciousActivities.get(userId) || {
count: 0, activities: [] };
entry.count++;
entry.activities.push({
...activity, timestamp: new Date() });
this.suspiciousActivities.set(userId, entry);
// 检查是否超过阈值
if (entry.count >= this.thresholds.suspiciousActivities) {
this.emit('suspicious_user', {
userId,
activityCount: entry.count,
recentActivities: entry.activities.slice(-3) // 最近3条活动
});
}
}
// 清理旧条目
private cleanupOldEntries(): void {
const now = Date.now();
const oneHourAgo = now - 60 * 60 * 1000;
// 清理旧的身份验证失败记录
for (const [identifier, entry] of this.failedAuthAttempts.entries()) {
if (entry.firstAttempt < oneHourAgo) {
this.failedAuthAttempts.delete(identifier);
}
}
// 只保留过去24小时的可疑活动
const oneDayAgo = now - 24 * 60 * 60 * 1000;
for (const [userId, entry] of this.suspiciousActivities.entries()) {
entry.activities = entry.activities.filter(
activity => activity.timestamp > oneDayAgo
);
entry.count = entry.activities.length;
if (entry.count === 0) {
this.suspiciousActivities.delete(userId);
} else {
this.suspiciousActivities.set(userId, entry);
}
}
}
// 停止监控
stop(): void {
if (this.checkInterval) {
clearInterval(this.checkInterval);
this.checkInterval = null;
}
}
}
// 创建MCP服务器和安全监控器
const server = new McpServer();
const securityMonitor = new SecurityMonitor();
// 设置警报处理
securityMonitor.on('brute_force_attempt', async (data) => {
// 记录警报
logger.warn('检测到暴力破解尝试', data);
// 阻止来源
await blockSource(data.identifier);
// 通知安全团队
await notifySecurityTeam('brute_force_attempt', data);
});
securityMonitor.on('suspicious_user', async (data) => {
// 记录警报
logger.warn('检测到可疑用户活动', data);
// 增加用户监控级别
await increaseUserMonitoringLevel(data.userId);
// 通知安全团队
await notifySecurityTeam('suspicious_user', data);
});
// 将安全监控集成到MCP服务器
server.use(async (context, next) => {
try {
// 处理请求
return await next();
} catch (error) {
// 监控身份验证失败
if (error.code === 'UNAUTHORIZED' || error.code === 'INVALID_TOKEN') {
const identifier = context.request.ip || 'unknown';
securityMonitor.recordAuthFailure(identifier);
}
throw error;
}
});
// 添加敏感操作监控
server.addTool({
name: 'updateUserPermissions',
description: '更新用户权限',
parameters: z.object({
userId: z.string(),
permissions: z.array(z.string())
}),
handler: async ({
userId, permissions }, context) => {
// 记录敏感操作
securityMonitor.recordSuspiciousActivity(context.session.user.id, {
action: 'updateUserPermissions',
targetUser: userId,
permissions
});
// 继续处理请求
return updateUserPermissions(userId, permissions);
}
});
4.2 异常检测机制
自动化异常检测可以帮助识别潜在的安全威胁:
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
// 简单的行为分析引擎
class BehaviorAnalyzer {
private userPatterns: Map<string, UserPattern> = new Map();
// 分析并记录用户行为
analyzeUserBehavior(userId: string, action: string, context: any): AnomalyResult {
// 获取或创建用户模式
let pattern = this.userPatterns.get(userId);
if (!pattern) {
pattern = {
userId,
actions: new Map(),
lastSeen: new Date(),
commonIPs: new Set(),
timeDistribution: Array(24).fill(0) // 按小时分布
};
this.userPatterns.set(userId, pattern);
}
// 更新用户模式
pattern.lastSeen = new Date();
// 记录行为
const actionCount = pattern.actions.get(action) || 0;
pattern.actions.set(action, actionCount + 1);
// 记录IP
if (context.ip) {
pattern.commonIPs.add(context.ip);
}
// 记录时间分布
const hour = new Date().getHours();
pattern.timeDistribution[hour]++;
// 检测异常
return this.detectAnomalies(userId, action, context);
}
// 检测行为异常
private detectAnomalies(userId: string, action: string, context: any): AnomalyResult {
const pattern = this.userPatterns.get(userId);
if (!pattern) return {
isAnomaly: false };
const anomalies = [];
// 检测IP异常
if (context.ip && !this.isCommonIP(userId, context.ip)) {
anomalies.push({
type: 'unusual_location',
details: `不常见的IP地址: ${
context.ip}`
});
}
// 检测时间异常
const hour = new Date().getHours();
if (!this.isCommonTimeForUser(userId, hour)) {
anomalies.push({
type: 'unusual_time',
details: `不常见的活动时间: ${
hour}:00-${
hour+1}:00`
});
}
// 检测敏感操作
if (this.isSensitiveAction(action) && !this.isCommonActionForUser(userId, action)) {
anomalies.push({
type: 'unusual_action',
details: `不常见的敏感操作: ${
action}`
});
}
return {
isAnomaly: anomalies.length > 0,
anomalies
};
}
// 检查是否是用户常用IP
private isCommonIP(userId: string, ip: string): boolean {
const pattern = this.userPatterns.get(userId);
if (!pattern) return false;
if (pattern.commonIPs.size <= 3) {
return pattern.commonIPs.has(ip);
} else {
// 对于有多个IP的用户,需要更复杂的判断逻辑
return pattern.commonIPs.has(ip);
}
}
// 检查是否是用户常见操作
private isCommonActionForUser(userId: string, action: string): boolean {
const pattern = this.userPatterns.get(userId);
if (!pattern) return false;
const actionCount = pattern.actions.get(action) || 0;
return actionCount >= 3; // 简单标准:执行过至少3次
}
// 检查是否是用户常见活动时间
private isCommonTimeForUser(userId: string, hour: number): boolean {
const pattern = this.userPatterns.get(userId);
if (!pattern) return false;
// 简单标准:如果用户在这个时间段至少有过5次活动
return pattern.timeDistribution[hour] >= 5;
}
// 检查是否是敏感操作
private isSensitiveAction(action: string): boolean {
const sensitiveActions = [
'updateUserPermissions',
'deleteAccount',
'resetPassword',
'accessAdminPanel',
'exportUserData'
];
return sensitiveActions.includes(action);
}
}
// 异常检测结果类型
interface AnomalyResult {
isAnomaly: boolean;
anomalies?: Array<{
type: string;
details: string;
}>;
}
// 用户行为模式类型
interface UserPattern {
userId: string;
actions: Map<string, number>;
lastSeen: Date;
commonIPs: Set<string>;
timeDistribution: number[]; // 24小时活动分布
}
// 创建MCP服务器和行为分析器
const server = new McpServer();
const behaviorAnalyzer = new BehaviorAnalyzer();
// 将异常检测集成到MCP服务器
server.use(async (context, next) => {
// 跳过未认证的请求
if (!context.session?.user?.id) {
return next();
}
const userId = context.session.user.id;
const action = getActionFromRequest(context.request);
// 分析用户行为
const result = behaviorAnalyzer.analyzeUserBehavior(userId, action, {
ip: context.request.ip,
time: new Date(),
userAgent: context.request.headers?.['user-agent']
});
// 如果检测到异常
if (result.isAnomaly) {
// 记录异常
logger.warn('检测到异常用户行为', {
userId,
action,
anomalies: result.anomalies,
requestId: context.requestId
});
// 对于高风险异常,可能需要额外验证或阻止
if (isHighRiskAnomaly(result.anomalies)) {
// 例如,要求额外验证
if (!context.request.headers?.['secondary-auth-token']) {
throw new Error('检测到异常活动,需要额外验证');
}
}
}
return next();
});
// 从请求中提取操作类型
function getActionFromRequest(request: any): string {
if (request.method === 'tools/call') {
return request.params.name;
}
return `${
request.method}`;
}
// 判断是否高风险异常
function isHighRiskAnomaly(anomalies: any[]): boolean {
if (!anomalies) return false;
// 如果有多个异常,视为高风险
if (anomalies.length >= 2) return true;
// 特定类型的异常视为高风险
const highRiskTypes = ['unusual_location', 'unusual_action'];
return anomalies.some(a => highRiskTypes.includes(a.type));
}
4.3 合规性要求适配
不同行业和地区有不同的合规要求,MCP应用需要灵活适配:
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
// 合规性配置
interface ComplianceConfig {
dataRetention: {
logRetentionDays: number;
userDataRetentionDays: number;
};
privacy: {
maskPII: boolean;
allowExport: boolean;
requireConsent: boolean;
};
security: {
mfaRequired: boolean;
passwordPolicies: {
minLength: number;
requireSpecialChars: boolean;
requireNumbers: boolean;
expiryDays: number;
};
sessionTimeout: number; // 分钟
};
audit: {
logLevel: 'minimal' | 'standard' | 'verbose';
sensitiveOperations: string[];
};
}
// 根据地区获取合规性配置
function getComplianceConfigForRegion(region: string): ComplianceConfig {
// 实际实现中,这些配置应该从配置文件或数据库获取
switch (region.toUpperCase()) {
case 'EU':
return {
// GDPR适配配置
dataRetention: {
logRetentionDays: 90,
userDataRetentionDays: 730 // 2年
},
privacy: {
maskPII: true,
allowExport: true,
requireConsent: true
},
security: {
mfaRequired: true,
passwordPolicies: {
minLength: 12,
requireSpecialChars: true,
requireNumbers: true,
expiryDays: 90
},
sessionTimeout: 30
},
audit: {
logLevel: 'verbose',
sensitiveOperations: [/* ... */]
}
};
case 'US-HEALTHCARE':
return {
// HIPAA适配配置
dataRetention: {
logRetentionDays: 365 * 6, // 6年
userDataRetentionDays: 365 * 6
},
privacy: {
maskPII: true,
allowExport: false,
requireConsent: true
},
security: {
mfaRequired: true,
passwordPolicies: {
minLength: 14,
requireSpecialChars: true,
requireNumbers: true,
expiryDays: 60
},
sessionTimeout: 15
},
audit: {
logLevel: 'verbose',
sensitiveOperations: [/* ... */]
}
};
default:
// 默认配置
return {
dataRetention: {
logRetentionDays: 365,
userDataRetentionDays: 365 * 3
},
privacy: {
maskPII: false,
allowExport: true,
requireConsent: false
},
security: {
mfaRequired: false,
passwordPolicies: {
minLength: 8,
requireSpecialChars: false,
requireNumbers: true,
expiryDays: 120
},
sessionTimeout: 60
},
audit: {
logLevel: 'standard',
sensitiveOperations: [/* ... */]
}
};
}
}
// MCP服务器合规性适配器
class ComplianceAdapter {
private config: ComplianceConfig;
constructor(region: string) {
this.config = getComplianceConfigForRegion(region);
}
// 应用合规设置到MCP服务器
applyToServer(server: McpServer): void {
// 设置会话超时
server.setSessionTimeout(this.config.security.sessionTimeout * 60 * 1000);
// 添加合规中间件
server.use(async (context, next) => {
// 检查会话是否需要MFA
if (this.config.security.mfaRequired &&
context.session?.user &&
!context.session.user.mfaVerified) {
throw new Error('需要多因素认证');
}
// 记录敏感操作
if (this.isSensitiveOperation(context.request)) {
this.logCompliance(context, 'sensitive_operation');
}
// 隐私同意检查
if (this.config.privacy.requireConsent &&
this.involvesUserData(context.request) &&
!context.session?.user?.hasConsented) {
throw new Error('需要用户同意才能处理个人数据');
}
return next();
});
}
// 获取当前合规配置
getConfig(): ComplianceConfig {
return {
...this.config };
}
// 检查是否为敏感操作
private isSensitiveOperation(request: any): boolean {
if (request.method !== 'tools/call') return false;
return this.config.audit.sensitiveOperations.includes(request.params.name);
}
// 检查请求是否涉及用户数据
private involvesUserData(request: any): boolean {
// 实际实现应基于请求内容和上下文检查
return request.params?.userData || request.path?.includes('/users/');
}
// 记录合规日志
private logCompliance(context: any, eventType: string): void {
const logLevel = this.config.audit.logLevel;
// 根据配置的日志级别决定记录的详细程度
let logData: any = {
timestamp: new Date(),
eventType,
userId: context.session?.user?.id || 'anonymous',
action: context.request.method
};
if (logLevel === 'standard' || logLevel === 'verbose') {
logData.requestId = context.requestId;
logData.ip = context.request.ip;
}
if (logLevel === 'verbose') {
logData.request = sanitizeLogData(context.request);
logData.session = sanitizeLogData(context.session);
}
// 记录合规日志
logger.info('合规事件', logData);
}
}
// 使用合规适配器
const region = process.env.COMPLIANCE_REGION || 'DEFAULT';
const complianceAdapter = new ComplianceAdapter(region);
const server = new McpServer();
// 应用合规设置
complianceAdapter.applyToServer(server);
// 数据保留策略实现
const dataRetentionDays = complianceAdapter.getConfig().dataRetention.userDataRetentionDays;
scheduleDataRetentionJob(dataRetentionDays);
4.4 安全事件响应流程
当安全事件发生时,有效的响应流程至关重要:
import {
McpServer } from '@modelcontextprotocol/typescript-sdk';
// 安全事件类型
enum SecurityEventSeverity {
INFO = 'info',
LOW = 'low',
MEDIUM = 'medium',
HIGH = 'high',
CRITICAL = 'critical'
}
interface SecurityEvent {
id: string;
timestamp: Date;
type: string;
severity: SecurityEventSeverity;
source: {
ip?: string;
userId?: string;
sessionId?: string;
};
details: any;
status: 'new' | 'investigating' | 'mitigated' | 'resolved' | 'false_positive';
}
// 安全事件处理器
class SecurityEventHandler {
private events: SecurityEvent[] = [];
// 记录安全事件
recordEvent(event: Omit<SecurityEvent, 'id' | 'timestamp' | 'status'>): SecurityEvent {
const securityEvent: SecurityEvent = {
id: generateEventId(),
timestamp: new Date(),
status: 'new',
...event
};
this.events.push(securityEvent);
// 记录事件
logger.warn('安全事件', securityEvent);
// 对严重事件进行自动响应
if (event.severity === SecurityEventSeverity.HIGH ||
event.severity === SecurityEventSeverity.CRITICAL) {
this.triggerAutomatedResponse(securityEvent);
}
return securityEvent;
}
// 触发自动响应
private async triggerAutomatedResponse(event: SecurityEvent): Promise<void> {
logger.info('触发安全事件自动响应', {
eventId: event.id });
// 基于事件类型执行不同的响应
switch (event.type) {
case 'brute_force_attempt':
// 暂时封禁来源IP
if (event.source.ip) {
await this.banIP(event.source.ip, 30); // 封禁30分钟
}
break;
case 'suspicious_access':
// 锁定账户
if (event.source.userId) {
await this.lockAccount(event.source.userId);
}
break;
case 'data_exfiltration':
// 终止用户会话
if (event.source.sessionId) {
await this.terminateSession(event.source.sessionId);
}
// 限制用户访问
if (event.source.userId) {
await this.restrictUserAccess(event.source.userId);
}
break;
case 'injection_attempt':
// 增加来源IP的监控级别
if (event.source.ip) {
await this.increaseIPMonitoring(event.source.ip);
}
break;
}
// 对关键事件进行通知
if (event.severity === SecurityEventSeverity.CRITICAL) {
await this.notifySecurityTeam(event);
}
// 更新事件状态
this.updateEventStatus(event.id, 'investigating');
}
// 更新事件状态
updateEventStatus(eventId: string, status: SecurityEvent['status']): void {
const event = this.events.find(e => e.id === eventId);
if (event) {
event.status = status;
logger.info('更新安全事件状态', {
eventId, status });
}
}
// 查询事件
getEvents(filters: Partial<{
types: string[];
severities: SecurityEventSeverity[];
startDate: Date;
endDate: Date;
status: SecurityEvent['status'];
}> = {
}): SecurityEvent[] {
let filtered = [...this.events];
if (filters.types) {
filtered = filtered.filter(e => filters.types!.includes(e.type));
}
if (filters.severities) {
filtered = filtered.filter(e => filters.severities!.includes(e.severity));
}
if (filters.status) {
filtered = filtered.filter(e => e.status === filters.status);
}
if (filters.startDate) {
filtered = filtered.filter(e => e.timestamp >= filters.startDate!);
}
if (filters.endDate) {
filtered = filtered.filter(e => e.timestamp <= filters.endDate!);
}
return filtered;
}
// 安全响应方法
private async banIP(ip: string, minutes: number): Promise<void> {
// 实现IP封禁逻辑
logger.info(`封禁IP ${
ip} ${
minutes}分钟`);
// ...实现封禁逻辑
}
private async lockAccount(userId: string): Promise<void> {
// 实现账户锁定逻辑
logger.info(`锁定用户账户 ${
userId}`);
// ...实现锁定逻辑
}
private async terminateSession(sessionId: string): Promise<void> {
// 实现会话终止逻辑
logger.info(`终止会话 ${
sessionId}`);
// ...实现会话终止逻辑
}
private async restrictUserAccess(userId: string): Promise<void> {
// 实现用户访问限制逻辑
logger.info(`限制用户 ${
userId} 的访问权限`);
// ...实现访问限制逻辑
}
private async increaseIPMonitoring(ip: string): Promise<void> {
// 实现增强监控逻辑
logger.info(`增加对IP ${
ip} 的监控级别`);
// ...实现监控升级逻辑
}
private async notifySecurityTeam(event: SecurityEvent): Promise<void> {
// 实现安全团队通知逻辑
logger.info(`通知安全团队关于事件 ${
event.id}`);
// ...实现通知逻辑
}
}
// 生成事件ID
function generateEventId(): string {
return `sec-${
Date.now()}-${
Math.random().toString(36).substring(2, 10)}`;
}
// 创建MCP服务器和安全事件处理器
const server = new McpServer();
const securityEventHandler = new SecurityEventHandler();
// 添加安全事件检测中间件
server.use(async (context, next) => {
try {
// 处理请求
return await next();
} catch (error) {
// 检测安全相关错误
if (isSecurityError(error)) {
const severity = getErrorSeverity(error);
const eventType = getSecurityErrorType(error);
// 记录安全事件
securityEventHandler.recordEvent({
type: eventType,
severity,
source: {
ip: context.request.ip,
userId: context.session?.user?.id,
sessionId: context.session?.id
},
details: {
error: error.message,
code: error.code,
request: {
method: context.request.method,
params: sanitizeLogData(context.request.params)
}
}
});
}
throw error;
}
});
// 获取错误的严重性级别
function getErrorSeverity(error: any): SecurityEventSeverity {
const highSeverityErrors = ['INJECTION_ATTEMPT', 'UNAUTHORIZED_TOOL_ACCESS'];
const mediumSeverityErrors = ['INVALID_TOKEN', 'RATE_LIMIT_EXCEEDED'];
if (highSeverityErrors.includes(error.code)) {
return SecurityEventSeverity.HIGH;
} else if (mediumSeverityErrors.includes(error.code)) {
return SecurityEventSeverity.MEDIUM;
}
return SecurityEventSeverity.LOW;
}
通过实施全面的安全审计和合规措施,MCP应用可以有效识别和应对安全威胁,同时满足不同行业和地区的监管要求,确保系统的安全性和合规性。
结语
本文深入探讨了MCP安全体系建设的四个关键方面:威胁模型与风险评估、数据加密与传输安全、注入攻击防御以及安全审计与合规性。通过全面的安全体系建设,MCP应用可以提供高水平的安全保障,保护敏感数据和用户隐私,确保AI模型与外部工具交互的安全性和可靠性。
安全不是一个静态目标,而是一个持续的过程。随着新威胁的出现和技术的发展,MCP安全体系也需要不断演进和完善。开发者应保持警惕,跟踪安全最佳实践的变化,并持续改进MCP应用的安全措施。
















暂无评论内容