【MCP Node.js SDK 全栈进阶指南】高级篇(3):MCP 安全体系建设

背景

随着MCP协议在企业和个人应用中的广泛采用,安全性已成为MCP系统设计和开发中不可忽视的核心要素。一个健壮的MCP安全体系不仅能保护敏感数据和用户隐私,还能确保AI模型与外部工具交互的可靠性和完整性。本文将深入探讨MCP TypeScript-SDK的安全体系建设,帮助开发者构建既强大又安全的MCP应用。

1. 威胁模型与风险评估

1.1 MCP应用面临的安全威胁

MCP应用作为连接AI模型与外部工具的桥梁,面临着多方面的安全威胁。了解这些威胁是构建安全体系的第一步:

1.1.1 外部攻击威胁
┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│                 │     │                 │     │                 │
│   恶意用户      │────▶│   MCP 系统      │────▶│ 外部资源/工具   │
│                 │     │                 │     │                 │
└─────────────────┘     └─────────────────┘     └─────────────────┘
        │                       ▲                       │
        │                       │                       │
        └───────────────────────┴───────────────────────┘
                          攻击路径

恶意输入注入:攻击者可能通过向MCP系统提交恶意构造的输入,尝试操纵AI模型或后端工具。

身份冒充:未经授权的用户可能尝试伪装为合法用户访问MCP服务。

中间人攻击:在客户端与服务器之间截获或篡改通信内容。

拒绝服务攻击:通过大量请求使MCP服务器资源耗尽,导致服务不可用。

1.1.2 内部安全威胁

权限滥用:具有合法访问权限的用户可能滥用其权限获取敏感数据。

数据泄露:内部人员可能有意或无意地暴露敏感信息。

配置错误:错误的安全配置可能导致不必要的系统暴露。

1.1.3 工具集成风险

MCP的核心价值在于连接AI模型与外部工具,这也带来了独特的安全风险:

工具能力滥用:恶意用户可能尝试使用MCP提供的工具执行未授权操作。

工具链污染:第三方工具可能包含恶意代码或漏洞。

数据流转风险:敏感数据在多个工具间流转时面临泄露风险。

1.2 风险评估方法论

对MCP系统进行全面的风险评估,可采用以下方法论:

1.2.1 STRIDE威胁建模

STRIDE是一种常用的威胁建模方法,特别适用于MCP这类复杂系统:

威胁类型 描述 MCP相关示例
欺骗(Spoofing) 冒充他人身份 攻击者冒充管理员访问MCP管理界面
篡改(Tampering) 修改系统数据 修改MCP资源定义或工具参数
否认(Repudiation) 否认曾执行的操作 用户否认曾使用MCP工具执行某操作
信息泄露(Information Disclosure) 未授权访问信息 获取MCP处理的敏感数据
拒绝服务(Denial of Service) 使服务不可用 通过大量请求使MCP服务器崩溃
权限提升(Elevation of Privilege) 获取未授权的权限 从普通用户提升为MCP管理员
1.2.2 风险评分矩阵

使用风险评分矩阵可以量化各种威胁的风险级别:

// 风险评估辅助函数
function assessRisk(
  threat: string,
  likelihood: 1 | 2 | 3, // 1=低, 2=中, 3=高
  impact: 1 | 2 | 3      // 1=低, 2=中, 3=高
): {
             threat: string; risk: string; score: number } {
            
  const score = likelihood * impact;
  let risk = "低";
  
  if (score >= 7) risk = "高";
  else if (score >= 4) risk = "中";
  
  return {
             threat, risk, score };
}

// 使用示例
const mcpRiskMatrix = [
  assessRisk("未授权工具访问", 3, 3),  // 高风险
  assessRisk("配置文件泄露", 2, 3),    // 中风险
  assessRisk("日志信息泄露", 2, 1)     // 低风险
];

1.3 安全优先级确定

基于风险评估结果,可以建立MCP系统的安全优先级框架:

1.3.1 关键安全领域排序
const securityPriorities = [
  {
            
    area: "身份验证与授权",
    priority: "高",
    rationale: "直接影响谁可以访问MCP系统及其功能"
  },
  {
            
    area: "工具API安全",
    priority: "高",
    rationale: "防止工具被滥用执行未授权操作"
  },
  {
            
    area: "数据传输加密",
    priority: "高",
    rationale: "保护传输中的敏感信息不被截获"
  },
  {
            
    area: "输入验证",
    priority: "中",
    rationale: "防止注入攻击和不合法输入"
  },
  {
            
    area: "错误处理与日志",
    priority: "中",
    rationale: "确保安全事件可被检测和回溯"
  },
  {
            
    area: "依赖项安全",
    priority: "中",
    rationale: "减轻第三方库引入的风险"
  }
];
1.3.2 制定安全规划路线图

根据安全优先级,可以制定分阶段的安全实施路线图:

基础安全阶段:实现必要的身份验证、加密和输入验证
增强安全阶段:添加细粒度访问控制、异常检测和安全审计
高级安全阶段:实施高级威胁防护、自动化安全测试和持续监控

1.4 典型攻击场景分析

以下是MCP系统可能面临的典型攻击场景及其防御策略:

1.4.1 工具命令注入攻击

攻击场景:攻击者构造恶意输入,尝试在工具执行上下文中注入命令。

// 易受攻击的工具实现
const vulnerableFileTool = {
            
  name: "readFile",
  description: "读取文件内容",
  parameters: {
            
    path: {
             type: "string", description: "文件路径" }
  },
  handler: async ({
             path }) => {
            
    // 危险: 直接使用用户输入而不验证
    const command = `cat ${
              path}`;
    return executeCommand(command); // 可能遭受命令注入
  }
};

// 防御示例
const secureFileTool = {
            
  name: "readFile",
  description: "读取文件内容",
  parameters: {
            
    path: {
             type: "string", description: "文件路径" }
  },
  handler: async ({
             path }) => {
            
    // 验证输入是否为安全的文件路径
    if (!isValidFilePath(path)) {
            
      throw new Error("无效的文件路径");
    }
    
    // 使用安全的文件读取API而非命令执行
    return fs.promises.readFile(path, 'utf-8');
  }
};

function isValidFilePath(path: string): boolean {
            
  // 实现路径验证逻辑
  return /^[a-zA-Z0-9_-/.]+$/.test(path) && !path.includes('..');
}
1.4.2 权限逃逸攻击

攻击场景:攻击者尝试绕过访问控制,获取未授权的资源。

// 易受攻击的实现
app.get('/api/mcp/resources/:resourceId', async (req, res) => {
            
  const {
             resourceId } = req.params;
  // 危险: 未检查用户是否有权访问该资源
  const resource = await getResource(resourceId);
  res.json(resource);
});

// 防御示例
app.get('/api/mcp/resources/:resourceId', async (req, res) => {
            
  const {
             resourceId } = req.params;
  const userId = getUserIdFromToken(req.headers.authorization);
  
  // 先检查权限
  if (!await hasAccessToResource(userId, resourceId)) {
            
    return res.status(403).json({
             error: '无权访问此资源' });
  }
  
  const resource = await getResource(resourceId);
  res.json(resource);
});
1.4.3 中间人攻击

攻击场景:攻击者截获MCP客户端与服务器之间的通信。

防御策略

始终使用TLS加密通信
实现证书验证
考虑使用HTTP严格传输安全(HSTS)

import https from 'https';
import fs from 'fs';
import express from 'express';

const app = express();

// HTTPS服务器配置
const httpsOptions = {
            
  key: fs.readFileSync('server.key'),
  cert: fs.readFileSync('server.cert'),
  // 强制使用现代TLS版本
  minVersion: 'TLSv1.2'
};

// 设置HSTS头
app.use((req, res, next) => {
            
  res.setHeader(
    'Strict-Transport-Security',
    'max-age=31536000; includeSubDomains; preload'
  );
  next();
});

// 创建HTTPS服务器
https.createServer(httpsOptions, app).listen(443, () => {
            
  console.log('MCP服务器在HTTPS端口443上启动');
});

通过深入分析MCP应用面临的威胁模型并进行全面风险评估,开发者可以有的放矢地构建安全体系,为后续的安全实施奠定坚实基础。

2. 数据加密与传输安全

MCP系统处理的数据可能包含敏感信息,保护这些数据的安全至关重要。本节将探讨如何在MCP应用中实现数据加密与传输安全。

2.1 通信层加密实现

MCP应用需要确保客户端与服务器之间的所有通信都经过加密,防止数据在传输过程中被窃取或篡改。

2.1.1 TLS/SSL配置

在MCP服务器中实现TLS加密:

import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';
import https from 'https';
import fs from 'fs';

// 读取TLS证书
const options = {
            
  key: fs.readFileSync('private-key.pem'),
  cert: fs.readFileSync('certificate.pem'),
  // 推荐的TLS安全配置
  ciphers: 'TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256',
  minVersion: 'TLSv1.2',
  // 可选:要求客户端证书进行双向TLS认证
  requestCert: false,
  rejectUnauthorized: false
};

// 创建HTTPS服务器
const httpsServer = https.createServer(options);

// 配置MCP服务器使用HTTPS
const mcpServer = new McpServer({
            
  transport: {
            
    type: 'http',
    server: httpsServer
  }
});

httpsServer.listen(443, () => {
            
  console.log('MCP HTTPS服务器启动在端口443');
});
2.1.2 客户端连接安全

MCP客户端在连接服务器时应验证TLS证书:

import {
             McpClient } from '@modelcontextprotocol/typescript-sdk';
import https from 'https';
import fs from 'fs';

// 创建安全的HTTPS代理
const httpsAgent = new https.Agent({
            
  // 可信CA证书列表,用于验证服务器证书
  ca: fs.readFileSync('trusted-ca.pem'),
  // 检查服务器名称是否与证书匹配
  checkServerIdentity: (host, cert) => {
            
    // 实现证书验证逻辑
    if (!cert.subject.CN.includes(host)) {
            
      return new Error('证书CN不匹配目标主机');
    }
    return undefined; // 验证通过
  },
  // 可选:客户端证书(双向TLS)
  key: fs.readFileSync('client-key.pem'),
  cert: fs.readFileSync('client-cert.pem')
});

// 配置客户端使用安全连接
const client = new McpClient({
            
  url: 'https://mcp-server.example.com',
  httpsAgent
});

2.2 敏感数据存储加密

除了传输安全,MCP应用还需要保护存储中的敏感数据。

2.2.1 配置文件加密

敏感配置(如API密钥)应加密存储:

import crypto from 'crypto';
import fs from 'fs';

// 加密配置数据
function encryptConfig(configData: any, masterKey: Buffer): string {
            
  // 生成随机初始化向量
  const iv = crypto.randomBytes(16);
  
  // 创建加密器
  const cipher = crypto.createCipheriv('aes-256-gcm', masterKey, iv);
  
  // 加密数据
  let encrypted = cipher.update(JSON.stringify(configData), 'utf8', 'hex');
  encrypted += cipher.final('hex');
  
  // 获取认证标签
  const authTag = cipher.getAuthTag();
  
  // 组合IV、认证标签和加密数据
  return JSON.stringify({
            
    iv: iv.toString('hex'),
    authTag: authTag.toString('hex'),
    encryptedData: encrypted
  });
}

// 解密配置数据
function decryptConfig(encryptedString: string, masterKey: Buffer): any {
            
  const encryptedObj = JSON.parse(encryptedString);
  
  // 从存储的格式中提取组件
  const iv = Buffer.from(encryptedObj.iv, 'hex');
  const authTag = Buffer.from(encryptedObj.authTag, 'hex');
  const encryptedData = encryptedObj.encryptedData;
  
  // 创建解密器
  const decipher = crypto.createDecipheriv('aes-256-gcm', masterKey, iv);
  decipher.setAuthTag(authTag);
  
  // 解密数据
  let decrypted = decipher.update(encryptedData, 'hex', 'utf8');
  decrypted += decipher.final('utf8');
  
  return JSON.parse(decrypted);
}

// 使用示例
const masterKey = crypto.scryptSync('强密码', '盐值', 32); // 安全地生成密钥

// 存储加密配置
const config = {
            
  apiKey: 'super-secret-api-key',
  userCredentials: {
            
    username: 'admin',
    password: 'secret-password'
  }
};

const encryptedConfig = encryptConfig(config, masterKey);
fs.writeFileSync('config.encrypted', encryptedConfig);

// 读取并解密配置
const storedConfig = fs.readFileSync('config.encrypted', 'utf8');
const decryptedConfig = decryptConfig(storedConfig, masterKey);
2.2.2 会话数据加密

MCP会话数据的安全存储:

import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';
import crypto from 'crypto';

// 加密会话存储实现
class EncryptedSessionStore {
            
  private encryptionKey: Buffer;
  private sessions: Map<string, string> = new Map();
  
  constructor(encryptionKey: Buffer) {
            
    this.encryptionKey = encryptionKey;
  }
  
  // 存储加密会话
  async saveSession(sessionId: string, sessionData: any): Promise<void> {
            
    const encrypted = this.encryptData(JSON.stringify(sessionData));
    this.sessions.set(sessionId, encrypted);
  }
  
  // 获取并解密会话
  async getSession(sessionId: string): Promise<any | null> {
            
    const encrypted = this.sessions.get(sessionId);
    if (!encrypted) return null;
    
    try {
            
      const decrypted = this.decryptData(encrypted);
      return JSON.parse(decrypted);
    } catch (error) {
            
      console.error('会话解密失败:', error);
      return null;
    }
  }
  
  // 删除会话
  async deleteSession(sessionId: string): Promise<void> {
            
    this.sessions.delete(sessionId);
  }
  
  // 加密数据
  private encryptData(data: string): string {
            
    const iv = crypto.randomBytes(16);
    const cipher = crypto.createCipheriv('aes-256-gcm', this.encryptionKey, iv);
    
    let encrypted = cipher.update(data, 'utf8', 'hex');
    encrypted += cipher.final('hex');
    
    const authTag = cipher.getAuthTag();
    
    return JSON.stringify({
            
      iv: iv.toString('hex'),
      authTag: authTag.toString('hex'),
      data: encrypted
    });
  }
  
  // 解密数据
  private decryptData(encryptedJson: string): string {
            
    const {
             iv, authTag, data } = JSON.parse(encryptedJson);
    
    const decipher = crypto.createDecipheriv(
      'aes-256-gcm', 
      this.encryptionKey, 
      Buffer.from(iv, 'hex')
    );
    
    decipher.setAuthTag(Buffer.from(authTag, 'hex'));
    
    let decrypted = decipher.update(data, 'hex', 'utf8');
    decrypted += decipher.final('utf8');
    
    return decrypted;
  }
}

// 生成安全的加密密钥
const sessionKey = crypto.randomBytes(32);
const sessionStore = new EncryptedSessionStore(sessionKey);

// 在MCP服务器中使用加密会话存储
const server = new McpServer({
            
  sessionStorage: {
            
    async save(sessionId, data) {
            
      await sessionStore.saveSession(sessionId, data);
    },
    async load(sessionId) {
            
      return sessionStore.getSession(sessionId);
    },
    async delete(sessionId) {
            
      await sessionStore.deleteSession(sessionId);
    }
  }
});

2.3 密钥管理最佳实践

有效的密钥管理是数据加密安全性的关键。

2.3.1 密钥轮换机制

实现定期密钥轮换以提高安全性:

import crypto from 'crypto';

class KeyRotationManager {
            
  private currentKey: Buffer;
  private previousKeys: Map<string, Buffer> = new Map();
  private keyLifetimeMs: number;
  private currentKeyId: string;
  
  constructor(keyLifetimeMs: number = 7 * 24 * 60 * 60 * 1000) {
             // 默认7天
    this.keyLifetimeMs = keyLifetimeMs;
    // 生成初始密钥
    this.currentKeyId = this.generateKeyId();
    this.currentKey = crypto.randomBytes(32);
    
    // 设置定期轮换
    setInterval(() => this.rotateKey(), this.keyLifetimeMs);
  }
  
  private generateKeyId(): string {
            
    return Date.now().toString(36) + Math.random().toString(36).substr(2, 5);
  }
  
  private rotateKey(): void {
            
    // 保存当前密钥
    this.previousKeys.set(this.currentKeyId, this.currentKey);
    
    // 生成新密钥
    this.currentKeyId = this.generateKeyId();
    this.currentKey = crypto.randomBytes(32);
    
    // 移除过期密钥(保留最近的5个密钥)
    const keyIds = Array.from(this.previousKeys.keys());
    if (keyIds.length > 5) {
            
      const oldestKeyId = keyIds[0];
      this.previousKeys.delete(oldestKeyId);
    }
    
    console.log(`密钥已轮换,新密钥ID: ${
              this.currentKeyId}`);
  }
  
  // 获取当前加密密钥和ID
  getCurrentKey(): {
             keyId: string, key: Buffer } {
            
    return {
            
      keyId: this.currentKeyId,
      key: this.currentKey
    };
  }
  
  // 通过ID获取密钥(用于解密旧数据)
  getKeyById(keyId: string): Buffer | null {
            
    if (keyId === this.currentKeyId) {
            
      return this.currentKey;
    }
    
    return this.previousKeys.get(keyId) || null;
  }
  
  // 使用当前密钥加密数据
  encrypt(data: string): {
             keyId: string, encryptedData: string } {
            
    const {
             keyId, key } = this.getCurrentKey();
    const iv = crypto.randomBytes(16);
    
    const cipher = crypto.createCipheriv('aes-256-gcm', key, iv);
    let encrypted = cipher.update(data, 'utf8', 'hex');
    encrypted += cipher.final('hex');
    
    const authTag = cipher.getAuthTag();
    
    return {
            
      keyId,
      encryptedData: JSON.stringify({
            
        iv: iv.toString('hex'),
        authTag: authTag.toString('hex'),
        data: encrypted
      })
    };
  }
  
  // 解密数据(支持使用旧密钥)
  decrypt(keyId: string, encryptedDataJson: string): string {
            
    const key = this.getKeyById(keyId);
    if (!key) {
            
      throw new Error(`找不到ID为 ${
              keyId} 的密钥`);
    }
    
    const {
             iv, authTag, data } = JSON.parse(encryptedDataJson);
    
    const decipher = crypto.createDecipheriv(
      'aes-256-gcm', 
      key, 
      Buffer.from(iv, 'hex')
    );
    
    decipher.setAuthTag(Buffer.from(authTag, 'hex'));
    
    let decrypted = decipher.update(data, 'hex', 'utf8');
    decrypted += decipher.final('utf8');
    
    return decrypted;
  }
}

// 使用密钥轮换管理器
const keyManager = new KeyRotationManager();

// 加密数据
const {
             keyId, encryptedData } = keyManager.encrypt('敏感数据');

// 稍后解密数据
const decrypted = keyManager.decrypt(keyId, encryptedData);
2.3.2 硬件安全模块(HSM)集成

对于高安全性要求的MCP应用,可以考虑集成硬件安全模块:

import crypto from 'crypto';
import {
             HSM } from 'some-hsm-library'; // 假设的HSM库

class HsmKeyManager {
            
  private hsm: HSM;
  
  constructor(hsmConfig: any) {
            
    // 初始化HSM连接
    this.hsm = new HSM(hsmConfig);
  }
  
  // 使用HSM生成密钥
  async generateKey(keyLabel: string): Promise<string> {
            
    return this.hsm.generateKey({
            
      label: keyLabel,
      algorithm: 'AES',
      size: 256,
      extractable: false // 密钥不可导出
    });
  }
  
  // 使用HSM中的密钥加密数据
  async encrypt(data: string, keyLabel: string): Promise<string> {
            
    const iv = crypto.randomBytes(16);
    
    const {
             encrypted, authTag } = await this.hsm.encrypt({
            
      algorithm: 'AES-GCM',
      key: keyLabel,
      iv,
      data: Buffer.from(data, 'utf8')
    });
    
    return JSON.stringify({
            
      iv: iv.toString('hex'),
      authTag: authTag.toString('hex'),
      data: encrypted.toString('hex')
    });
  }
  
  // 使用HSM中的密钥解密数据
  async decrypt(encryptedDataJson: string, keyLabel: string): Promise<string> {
            
    const {
             iv, authTag, data } = JSON.parse(encryptedDataJson);
    
    const decrypted = await this.hsm.decrypt({
            
      algorithm: 'AES-GCM',
      key: keyLabel,
      iv: Buffer.from(iv, 'hex'),
      authTag: Buffer.from(authTag, 'hex'),
      data: Buffer.from(data, 'hex')
    });
    
    return decrypted.toString('utf8');
  }
}

2.4 安全传输协议选择

MCP应用支持多种传输协议,每种都有不同的安全考虑。

2.4.1 HTTP传输安全

HTTP传输是MCP最常用的方式之一,确保其安全性至关重要:

import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';
import express from 'express';
import helmet from 'helmet';
import rateLimit from 'express-rate-limit';

const app = express();

// 使用Helmet设置安全HTTP头
app.use(helmet());

// 设置跨域资源共享(CORS)策略
app.use((req, res, next) => {
            
  res.header('Access-Control-Allow-Origin', 'https://trusted-client.example.com');
  res.header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS');
  res.header('Access-Control-Allow-Headers', 'Content-Type, Authorization');
  next();
});

// 实施速率限制防止滥用
const apiLimiter = rateLimit({
            
  windowMs: 15 * 60 * 1000, // 15分钟
  max: 100, // 每IP最多100请求
  standardHeaders: true,
  legacyHeaders: false,
  message: {
             error: '请求过多,请稍后再试' }
});
app.use('/mcp', apiLimiter);

// 配置MCP服务器使用HTTP传输
const server = new McpServer({
            
  transport: {
            
    type: 'http',
    // 集成Express应用
    path: '/mcp',
    app
  }
});

// 启动服务器
app.listen(3000, () => {
            
  console.log('MCP HTTP服务器在端口3000上启动');
});
2.4.2 WebSocket传输安全

对于使用WebSocket的实时MCP应用:

import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';
import https from 'https';
import fs from 'fs';
import {
             WebSocketServer } from 'ws';
import url from 'url';

// HTTPS服务器配置
const httpsServer = https.createServer({
            
  key: fs.readFileSync('private-key.pem'),
  cert: fs.readFileSync('certificate.pem'),
  minVersion: 'TLSv1.2'
});

// 创建安全的WebSocket服务器
const wss = new WebSocketServer({
             
  server: httpsServer,
  // 验证请求
  verifyClient: (info, cb) => {
            
    const {
             query } = url.parse(info.req.url || '', true);
    
    // 验证访问令牌
    if (!query.token || !isValidToken(query.token as string)) {
            
      return cb(false, 401, 'Unauthorized');
    }
    
    // 验证来源
    const origin = info.origin || '';
    if (!isAllowedOrigin(origin)) {
            
      return cb(false, 403, 'Forbidden');
    }
    
    cb(true);
  }
});

// 验证令牌
function isValidToken(token: string): boolean {
            
  // 实现令牌验证逻辑
  return true; // 示例
}

// 验证允许的来源
function isAllowedOrigin(origin: string): boolean {
            
  const allowedOrigins = [
    'https://app.example.com',
    'https://admin.example.com'
  ];
  return allowedOrigins.includes(origin);
}

// 配置MCP服务器使用WebSocket传输
const server = new McpServer({
            
  transport: {
            
    type: 'websocket',
    server: wss
  }
});

// 启动HTTPS服务器
httpsServer.listen(3000, () => {
            
  console.log('MCP WebSocket服务器在端口3000上启动');
});
2.4.3 stdio传输安全

对于本地进程间通信的stdio传输:

import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';
import crypto from 'crypto';

// 生成本地认证令牌
const localAuthToken = crypto.randomBytes(32).toString('hex');

// 将令牌安全地传递给子进程
process.env.MCP_LOCAL_AUTH_TOKEN = localAuthToken;

// 配置MCP服务器使用stdio传输,并添加认证
const server = new McpServer({
            
  transport: {
            
    type: 'stdio'
  },
  // 添加认证中间件
  middleware: [
    async (context, next) => {
            
      // 从请求中提取令牌
      const authToken = context.request.headers?.authorization;
      
      // 验证令牌
      if (authToken !== `Bearer ${
              localAuthToken}`) {
            
        throw new Error('未授权访问');
      }
      
      return next();
    }
  ]
});

// 启动服务器
server.start();

通过全面实施数据加密和传输安全措施,MCP应用可以有效防止数据泄露和拦截攻击,为用户提供更安全可靠的服务。

3. 注入攻击防御

MCP系统作为连接AI模型与外部工具的桥梁,可能面临各种注入攻击。本节将探讨如何在MCP应用中防御这些攻击。

3.1 输入验证与净化

输入验证是防御注入攻击的第一道防线,必须对所有来自外部的数据进行严格验证。

3.1.1 使用Zod进行输入验证

MCP TypeScript-SDK支持使用Zod库进行参数验证,这是防御注入攻击的有效方法:

import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';
import {
             z } from 'zod';

// 创建MCP服务器
const server = new McpServer();

// 添加使用Zod验证的工具
server.addTool({
            
  name: 'searchDatabase',
  description: '在数据库中搜索记录',
  // 使用Zod定义参数模式
  parameters: z.object({
            
    // 为了安全,限制搜索条件为特定格式
    query: z.string()
      .min(3)
      .max(100)
      .regex(/^[a-zA-Z0-9s]+$/, '只允许字母、数字和空格'),
    limit: z.number().int().min(1).max(100).default(10)
  }),
  handler: async ({
             query, limit }) => {
            
    // 此时,参数已经过验证,确保符合指定模式
    // 执行安全的数据库查询
    return safeDbQuery(query, limit);
  }
});

// 安全的数据库查询实现
async function safeDbQuery(query: string, limit: number) {
            
  // 使用参数化查询,而不是字符串拼接
  // 参数化查询由数据库驱动程序处理,防止SQL注入
  const result = await db.query(
    'SELECT * FROM records WHERE name LIKE ? LIMIT ?',
    [`%${
              query}%`, limit]
  );
  return result;
}
3.1.2 深层输入净化

对于复杂的输入数据,需要进行深层净化:

import DOMPurify from 'dompurify';
import {
             JSDOM } from 'jsdom';

// 创建DOMPurify实例
const window = new JSDOM('').window;
const purify = DOMPurify(window);

// 用于净化HTML内容的工具
server.addTool({
            
  name: 'formatHtmlContent',
  description: '格式化HTML内容并确保安全',
  parameters: z.object({
            
    htmlContent: z.string(),
    allowedTags: z.array(z.string()).default(['p', 'b', 'i', 'a', 'ul', 'li'])
  }),
  handler: async ({
             htmlContent, allowedTags }) => {
            
    // 净化HTML,仅允许特定标签
    const cleanHtml = purify.sanitize(htmlContent, {
            
      ALLOWED_TAGS: allowedTags,
      ALLOWED_ATTR: ['href', 'target', 'title'],
      ALLOW_DATA_ATTR: false
    });
    
    return {
             cleanHtml };
  }
});

3.2 参数校验机制

除了基本的输入验证,MCP应用还应实施全面的参数校验机制。

3.2.1 运行时类型检查

TypeScript的类型系统在编译时提供类型安全,但在运行时添加额外检查同样重要:

// 运行时类型检查工具函数
function validateType(value: any, type: string, fieldName: string): void {
            
  let isValid = false;
  
  switch (type) {
            
    case 'string':
      isValid = typeof value === 'string';
      break;
    case 'number':
      isValid = typeof value === 'number' && !isNaN(value);
      break;
    case 'boolean':
      isValid = typeof value === 'boolean';
      break;
    case 'array':
      isValid = Array.isArray(value);
      break;
    case 'object':
      isValid = typeof value === 'object' && value !== null && !Array.isArray(value);
      break;
    default:
      throw new Error(`不支持的类型: ${
              type}`);
  }
  
  if (!isValid) {
            
    throw new Error(`${
              fieldName} 应该是 ${
              type} 类型,但接收到 ${
              typeof value}`);
  }
}

// 用于深度验证结构的函数
function validateStructure(obj: any, schema: any, path: string = ''): void {
            
  if (!obj || typeof obj !== 'object') {
            
    throw new Error(`${
              path || 'value'} 必须是一个对象`);
  }
  
  for (const [key, definition] of Object.entries(schema)) {
            
    const currentPath = path ? `${
              path}.${
              key}` : key;
    const value = obj[key];
    
    // 检查必填字段
    if (definition.required && (value === undefined || value === null)) {
            
      throw new Error(`缺少必填字段: ${
              currentPath}`);
    }
    
    // 如果有值,验证类型
    if (value !== undefined && value !== null) {
            
      validateType(value, definition.type, currentPath);
      
      // 对象类型的递归验证
      if (definition.type === 'object' && definition.properties) {
            
        validateStructure(value, definition.properties, currentPath);
      }
      
      // 数组类型的元素验证
      if (definition.type === 'array' && definition.items) {
            
        value.forEach((item: any, index: number) => {
            
          validateType(item, definition.items.type, `${
              currentPath}[${
              index}]`);
        });
      }
    }
  }
}

// 在MCP工具中使用结构验证
server.addTool({
            
  name: 'processUserProfile',
  description: '处理用户个人资料',
  parameters: z.object({
            
    profile: z.any() // 使用自定义验证
  }),
  handler: async ({
             profile }) => {
            
    // 定义预期的结构
    const profileSchema = {
            
      username: {
             type: 'string', required: true },
      age: {
             type: 'number', required: false },
      contact: {
            
        type: 'object',
        required: false,
        properties: {
            
          email: {
             type: 'string', required: true },
          phone: {
             type: 'string', required: false }
        }
      },
      interests: {
             
        type: 'array', 
        required: false,
        items: {
             type: 'string' }
      }
    };
    
    // 运行时验证
    validateStructure(profile, profileSchema);
    
    // 处理已验证的用户资料
    return processProfile(profile);
  }
});
3.2.2 中间件参数校验

在MCP服务器中使用中间件进行全局参数校验:

import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';

// 创建MCP服务器
const server = new McpServer();

// 添加参数验证中间件
server.use(async (context, next) => {
            
  // 针对工具调用的参数验证
  if (context.request.method === 'tools/call') {
            
    const {
             name, arguments: args } = context.request.params;
    
    // 针对特定工具的检查
    if (name === 'executeQuery') {
            
      // 执行SQL查询的安全检查
      validateSqlQuery(args.query);
    }
    
    // 检查参数中是否有可疑模式
    detectSuspiciousPatterns(args);
  }
  
  return next();
});

// 验证SQL查询是否安全
function validateSqlQuery(query: string): void {
            
  // 检查是否包含危险的SQL关键字
  const dangerousKeywords = ['DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', '--'];
  
  const upperQuery = query.toUpperCase();
  for (const keyword of dangerousKeywords) {
            
    if (upperQuery.includes(keyword)) {
            
      throw new Error(`查询包含不允许的关键字: ${
              keyword}`);
    }
  }
  
  // 检查多语句查询(通常不安全)
  if (query.includes(';')) {
            
    throw new Error('不允许多语句查询');
  }
}

// 检测各种参数中的可疑模式
function detectSuspiciousPatterns(obj: any): void {
            
  // 将对象转换为字符串以进行模式匹配
  const stringified = JSON.stringify(obj);
  
  // 检测可能的脚本注入
  if (/<script>|javascript:/i.test(stringified)) {
            
    throw new Error('检测到可能的脚本注入');
  }
  
  // 检测可能的命令注入
  if (/$(|`||/g.test(stringified)) {
            
    throw new Error('检测到可能的命令注入');
  }
  
  // 检测路径遍历尝试
  if (/..//g.test(stringified)) {
            
    throw new Error('检测到可能的路径遍历');
  }
}

3.3 防SQL注入策略

MCP工具经常需要连接数据库,这使得SQL注入成为潜在的安全威胁。

3.3.1 参数化查询

始终使用参数化查询而非字符串拼接:

import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';
import {
             Pool } from 'pg'; // 使用PostgreSQL作为示例

// 创建数据库连接池
const pool = new Pool({
            
  user: 'dbuser',
  host: 'localhost',
  database: 'mcpapp',
  password: 'password',
  port: 5432,
});

// 创建MCP服务器
const server = new McpServer();

// 添加安全的数据库查询工具
server.addTool({
            
  name: 'queryUserData',
  description: '查询用户数据',
  parameters: z.object({
            
    userId: z.string().uuid('用户ID必须是有效的UUID'),
    fields: z.array(z.string()).default(['name', 'email'])
  }),
  handler: async ({
             userId, fields }) => {
            
    // 验证请求的字段是否安全
    const allowedFields = ['id', 'name', 'email', 'created_at', 'updated_at'];
    for (const field of fields) {
            
      if (!allowedFields.includes(field)) {
            
        throw new Error(`不允许查询字段: ${
              field}`);
      }
    }
    
    // 安全地构建字段列表(不使用用户输入直接构建查询)
    const fieldList = fields.map(field => `"${
              field}"`).join(', ');
    
    // 使用参数化查询
    const query = `SELECT ${
              fieldList} FROM users WHERE id = $1`;
    const result = await pool.query(query, [userId]);
    
    return result.rows[0] || null;
  }
});
3.3.2 ORM与查询构建器

使用ORM或查询构建器进一步减少SQL注入风险:

import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';
import {
             Knex, knex } from 'knex';

// 初始化Knex查询构建器
const db = knex({
            
  client: 'pg',
  connection: {
            
    host: 'localhost',
    user: 'dbuser',
    password: 'password',
    database: 'mcpapp'
  }
});

// 创建MCP服务器
const server = new McpServer();

// 添加使用Knex的安全查询工具
server.addTool({
            
  name: 'searchRecords',
  description: '搜索记录',
  parameters: z.object({
            
    tableName: z.enum(['users', 'products', 'orders']),
    filters: z.record(z.string(), z.any()).default({
            }),
    page: z.number().int().min(1).default(1),
    pageSize: z.number().int().min(1).max(100).default(20)
  }),
  handler: async ({
             tableName, filters, page, pageSize }) => {
            
    // 计算分页参数
    const offset = (page - 1) * pageSize;
    
    // 使用Knex构建查询,防止SQL注入
    const query = db(tableName)
      .select('*')
      .limit(pageSize)
      .offset(offset);
    
    // 安全地添加过滤条件
    for (const [key, value] of Object.entries(filters)) {
            
      query.where(key, value);
    }
    
    // 执行查询
    const results = await query;
    
    // 获取总记录数
    const countQuery = db(tableName).count('* as total');
    for (const [key, value] of Object.entries(filters)) {
            
      countQuery.where(key, value);
    }
    const [{
             total }] = await countQuery;
    
    return {
            
      data: results,
      pagination: {
            
        total: Number(total),
        page,
        pageSize,
        pages: Math.ceil(Number(total) / pageSize)
      }
    };
  }
});

3.4 防止命令注入

MCP工具可能需要执行系统命令,这是另一个潜在的注入攻击点。

3.4.1 安全的命令执行

确保系统命令执行的安全性:

import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';
import {
             execFile } from 'child_process';
import path from 'path';
import util from 'util';

// 将execFile转换为Promise版本
const execFileAsync = util.promisify(execFile);

// 创建MCP服务器
const server = new McpServer();

// 添加安全的文件分析工具
server.addTool({
            
  name: 'analyzeFile',
  description: '分析文件内容',
  parameters: z.object({
            
    filePath: z.string(),
    analysisType: z.enum(['count', 'search', 'stats']),
    searchTerm: z.string().optional()
  }),
  handler: async ({
             filePath, analysisType, searchTerm }) => {
            
    // 安全验证文件路径
    const normalizedPath = path.normalize(filePath);
    const basePath = '/safe/data/directory/';
    
    if (!normalizedPath.startsWith(basePath)) {
            
      throw new Error('无权访问此目录');
    }
    
    // 确保文件存在
    if (!fileExists(normalizedPath)) {
            
      throw new Error('文件不存在');
    }
    
    let result;
    
    switch (analysisType) {
            
      case 'count':
        // 使用execFile而非exec,防止命令注入
        result = await execFileAsync('wc', ['-l', normalizedPath]);
        return {
             lineCount: parseInt(result.stdout.trim().split(' ')[0]) };
      
      case 'search':
        if (!searchTerm) {
            
          throw new Error('搜索类型需要提供搜索词');
        }
        
        // 注意这里没有使用shell通配符或正则表达式,减少注入风险
        result = await execFileAsync('grep', ['-c', searchTerm, normalizedPath]);
        return {
             matchCount: parseInt(result.stdout.trim()) };
      
      case 'stats':
        // 使用自定义脚本而不是直接用shell命令
        result = await execFileAsync('/usr/local/bin/analyze-file.sh', [normalizedPath]);
        return {
             stats: JSON.parse(result.stdout) };
      
      default:
        throw new Error('不支持的分析类型');
    }
  }
});

// 检查文件是否存在的辅助函数
function fileExists(filePath: string): boolean {
            
  try {
            
    return fs.statSync(filePath).isFile();
  } catch (error) {
            
    return false;
  }
}
3.4.2 隔离环境执行

对于需要执行不可信代码的场景,使用隔离环境:

import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';
import {
             NodeVM } from 'vm2';

// 创建MCP服务器
const server = new McpServer();

// 添加安全的代码执行工具
server.addTool({
            
  name: 'executeUserCode',
  description: '在安全沙箱中执行用户提供的JavaScript代码',
  parameters: z.object({
            
    code: z.string(),
    input: z.any().optional()
  }),
  handler: async ({
             code, input }) => {
            
    // 创建安全的虚拟机沙箱
    const vm = new NodeVM({
            
      console: 'redirect', // 重定向控制台输出
      sandbox: {
             input }, // 注入输入数据
      timeout: 1000, // 执行超时时间(毫秒)
      allowAsync: true,
      // 限制可用模块
      require: {
            
        external: false, // 禁止外部模块
        builtin: ['crypto'], // 只允许特定内建模块
        root: './', // 模块根目录
        mock: {
            
          // 模拟特定模块
          fs: {
            
            readFileSync: () => 'Access denied'
          }
        }
      }
    });
    
    try {
            
      // 捕获控制台输出
      let consoleOutput = [];
      vm.on('console.log', (...args) => {
            
        consoleOutput.push(args.map(arg => String(arg)).join(' '));
      });
      
      // 在沙箱中执行代码
      const result = await vm.run(`module.exports = async function(input) { ${
              code} }`, 'vm.js')(input);
      
      return {
            
        result,
        consoleOutput
      };
    } catch (error) {
            
      return {
            
        error: error.message,
        stack: error.stack
      };
    }
  }
});

通过实施全面的输入验证、参数校验、安全数据库操作和安全命令执行策略,MCP应用可以有效防御各种注入攻击,确保系统的安全性和稳定性。

4. 安全审计与合规性

完善的MCP安全体系不仅包括主动防御措施,还需要建立健全的审计机制和合规实践。本节将探讨如何在MCP应用中实现安全审计和满足合规性要求。

4.1 日志记录与监控

全面的日志记录是安全审计的基础,可以帮助识别可疑活动并在安全事件发生时提供取证依据。

4.1.1 安全日志记录实践
import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';
import winston from 'winston';
import 'winston-daily-rotate-file';

// 创建结构化日志记录器
const logger = winston.createLogger({
            
  level: 'info',
  format: winston.format.combine(
    winston.format.timestamp(),
    winston.format.json()
  ),
  defaultMeta: {
             service: 'mcp-service' },
  transports: [
    // 控制台输出
    new winston.transports.Console({
            
      format: winston.format.combine(
        winston.format.colorize(),
        winston.format.simple()
      ),
    }),
    // 安全事件日志(单独文件)
    new winston.transports.DailyRotateFile({
            
      filename: 'logs/security-%DATE%.log',
      datePattern: 'YYYY-MM-DD',
      maxSize: '20m',
      maxFiles: '14d',
      level: 'warn',
    }),
    // 所有请求日志
    new winston.transports.DailyRotateFile({
            
      filename: 'logs/requests-%DATE%.log',
      datePattern: 'YYYY-MM-DD',
      maxSize: '20m',
      maxFiles: '7d',
    }),
  ],
});

// 创建MCP服务器
const server = new McpServer();

// 添加审计日志中间件
server.use(async (context, next) => {
            
  const requestId = generateRequestId();
  const startTime = Date.now();
  
  // 记录请求信息
  logger.info('MCP请求', {
            
    requestId,
    method: context.request.method,
    params: sanitizeLogData(context.request.params), // 移除敏感信息
    user: context.session?.user?.id || 'anonymous',
    ip: context.request.ip,
    userAgent: context.request.headers?.['user-agent']
  });
  
  try {
            
    // 处理请求
    const result = await next();
    
    // 记录响应信息
    const duration = Date.now() - startTime;
    logger.info('MCP响应', {
            
      requestId,
      duration,
      success: true
    });
    
    return result;
  } catch (error) {
            
    // 记录错误
    const duration = Date.now() - startTime;
    logger.warn('MCP错误', {
            
      requestId,
      duration,
      error: {
            
        message: error.message,
        stack: error.stack,
        code: error.code
      }
    });
    
    // 对于安全相关错误,记录更详细的信息
    if (isSecurityError(error)) {
            
      logger.warn('安全事件', {
            
        requestId,
        type: getSecurityErrorType(error),
        details: error.details,
        user: context.session?.user?.id || 'anonymous',
        ip: context.request.ip
      });
    }
    
    throw error;
  }
});

// 生成唯一请求ID
function generateRequestId(): string {
            
  return `req-${
              Date.now()}-${
              Math.random().toString(36).substring(2, 10)}`;
}

// 从日志中移除敏感数据
function sanitizeLogData(data: any): any {
            
  if (!data) return data;
  
  // 创建副本以避免修改原始对象
  const sanitized = {
             ...data };
  
  // 敏感字段列表
  const sensitiveFields = ['password', 'token', 'apiKey', 'secret', 'creditCard'];
  
  // 递归检查并清理对象
  Object.keys(sanitized).forEach(key => {
            
    if (sensitiveFields.includes(key.toLowerCase())) {
            
      sanitized[key] = '******'; // 替换敏感值
    } else if (typeof sanitized[key] === 'object' && sanitized[key] !== null) {
            
      sanitized[key] = sanitizeLogData(sanitized[key]); // 递归处理嵌套对象
    }
  });
  
  return sanitized;
}

// 检查是否为安全相关错误
function isSecurityError(error: any): boolean {
            
  const securityErrorCodes = [
    'UNAUTHORIZED', 'FORBIDDEN', 'INVALID_TOKEN', 
    'INJECTION_ATTEMPT', 'RATE_LIMIT_EXCEEDED'
  ];
  return securityErrorCodes.includes(error.code);
}

// 获取安全错误类型
function getSecurityErrorType(error: any): string {
            
  switch (error.code) {
            
    case 'UNAUTHORIZED':
    case 'INVALID_TOKEN':
      return 'authentication_failure';
    case 'FORBIDDEN':
      return 'authorization_failure';
    case 'INJECTION_ATTEMPT':
      return 'injection_attempt';
    case 'RATE_LIMIT_EXCEEDED':
      return 'rate_limit_abuse';
    default:
      return 'other_security_event';
  }
}
4.1.2 实时安全监控

除了日志记录,实时监控可以帮助快速发现和响应安全威胁:

import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';
import {
             EventEmitter } from 'events';

// 创建安全监控器
class SecurityMonitor extends EventEmitter {
            
  private failedAuthAttempts: Map<string, {
             count: number, firstAttempt: number }> = new Map();
  private suspiciousActivities: Map<string, {
             count: number, activities: any[] }> = new Map();
  private checkInterval: NodeJS.Timeout | null = null;
  
  constructor(private thresholds = {
             authFailures: 5, suspiciousActivities: 3 }) {
            
    super();
    
    // 启动定期检查
    this.checkInterval = setInterval(() => this.cleanupOldEntries(), 15 * 60 * 1000); // 每15分钟
  }
  
  // 记录身份验证失败
  recordAuthFailure(identifier: string): void {
            
    const entry = this.failedAuthAttempts.get(identifier) || {
             count: 0, firstAttempt: Date.now() };
    entry.count++;
    this.failedAuthAttempts.set(identifier, entry);
    
    // 检查是否超过阈值
    if (entry.count >= this.thresholds.authFailures) {
            
      this.emit('brute_force_attempt', {
            
        identifier,
        attempts: entry.count,
        timeSpan: Date.now() - entry.firstAttempt
      });
    }
  }
  
  // 记录可疑活动
  recordSuspiciousActivity(userId: string, activity: any): void {
            
    const entry = this.suspiciousActivities.get(userId) || {
             count: 0, activities: [] };
    entry.count++;
    entry.activities.push({
             ...activity, timestamp: new Date() });
    this.suspiciousActivities.set(userId, entry);
    
    // 检查是否超过阈值
    if (entry.count >= this.thresholds.suspiciousActivities) {
            
      this.emit('suspicious_user', {
            
        userId,
        activityCount: entry.count,
        recentActivities: entry.activities.slice(-3) // 最近3条活动
      });
    }
  }
  
  // 清理旧条目
  private cleanupOldEntries(): void {
            
    const now = Date.now();
    const oneHourAgo = now - 60 * 60 * 1000;
    
    // 清理旧的身份验证失败记录
    for (const [identifier, entry] of this.failedAuthAttempts.entries()) {
            
      if (entry.firstAttempt < oneHourAgo) {
            
        this.failedAuthAttempts.delete(identifier);
      }
    }
    
    // 只保留过去24小时的可疑活动
    const oneDayAgo = now - 24 * 60 * 60 * 1000;
    for (const [userId, entry] of this.suspiciousActivities.entries()) {
            
      entry.activities = entry.activities.filter(
        activity => activity.timestamp > oneDayAgo
      );
      entry.count = entry.activities.length;
      
      if (entry.count === 0) {
            
        this.suspiciousActivities.delete(userId);
      } else {
            
        this.suspiciousActivities.set(userId, entry);
      }
    }
  }
  
  // 停止监控
  stop(): void {
            
    if (this.checkInterval) {
            
      clearInterval(this.checkInterval);
      this.checkInterval = null;
    }
  }
}

// 创建MCP服务器和安全监控器
const server = new McpServer();
const securityMonitor = new SecurityMonitor();

// 设置警报处理
securityMonitor.on('brute_force_attempt', async (data) => {
            
  // 记录警报
  logger.warn('检测到暴力破解尝试', data);
  
  // 阻止来源
  await blockSource(data.identifier);
  
  // 通知安全团队
  await notifySecurityTeam('brute_force_attempt', data);
});

securityMonitor.on('suspicious_user', async (data) => {
            
  // 记录警报
  logger.warn('检测到可疑用户活动', data);
  
  // 增加用户监控级别
  await increaseUserMonitoringLevel(data.userId);
  
  // 通知安全团队
  await notifySecurityTeam('suspicious_user', data);
});

// 将安全监控集成到MCP服务器
server.use(async (context, next) => {
            
  try {
            
    // 处理请求
    return await next();
  } catch (error) {
            
    // 监控身份验证失败
    if (error.code === 'UNAUTHORIZED' || error.code === 'INVALID_TOKEN') {
            
      const identifier = context.request.ip || 'unknown';
      securityMonitor.recordAuthFailure(identifier);
    }
    
    throw error;
  }
});

// 添加敏感操作监控
server.addTool({
            
  name: 'updateUserPermissions',
  description: '更新用户权限',
  parameters: z.object({
            
    userId: z.string(),
    permissions: z.array(z.string())
  }),
  handler: async ({
             userId, permissions }, context) => {
            
    // 记录敏感操作
    securityMonitor.recordSuspiciousActivity(context.session.user.id, {
            
      action: 'updateUserPermissions',
      targetUser: userId,
      permissions
    });
    
    // 继续处理请求
    return updateUserPermissions(userId, permissions);
  }
});

4.2 异常检测机制

自动化异常检测可以帮助识别潜在的安全威胁:

import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';

// 简单的行为分析引擎
class BehaviorAnalyzer {
            
  private userPatterns: Map<string, UserPattern> = new Map();
  
  // 分析并记录用户行为
  analyzeUserBehavior(userId: string, action: string, context: any): AnomalyResult {
            
    // 获取或创建用户模式
    let pattern = this.userPatterns.get(userId);
    if (!pattern) {
            
      pattern = {
            
        userId,
        actions: new Map(),
        lastSeen: new Date(),
        commonIPs: new Set(),
        timeDistribution: Array(24).fill(0) // 按小时分布
      };
      this.userPatterns.set(userId, pattern);
    }
    
    // 更新用户模式
    pattern.lastSeen = new Date();
    
    // 记录行为
    const actionCount = pattern.actions.get(action) || 0;
    pattern.actions.set(action, actionCount + 1);
    
    // 记录IP
    if (context.ip) {
            
      pattern.commonIPs.add(context.ip);
    }
    
    // 记录时间分布
    const hour = new Date().getHours();
    pattern.timeDistribution[hour]++;
    
    // 检测异常
    return this.detectAnomalies(userId, action, context);
  }
  
  // 检测行为异常
  private detectAnomalies(userId: string, action: string, context: any): AnomalyResult {
            
    const pattern = this.userPatterns.get(userId);
    if (!pattern) return {
             isAnomaly: false };
    
    const anomalies = [];
    
    // 检测IP异常
    if (context.ip && !this.isCommonIP(userId, context.ip)) {
            
      anomalies.push({
            
        type: 'unusual_location',
        details: `不常见的IP地址: ${
              context.ip}`
      });
    }
    
    // 检测时间异常
    const hour = new Date().getHours();
    if (!this.isCommonTimeForUser(userId, hour)) {
            
      anomalies.push({
            
        type: 'unusual_time',
        details: `不常见的活动时间: ${
              hour}:00-${
              hour+1}:00`
      });
    }
    
    // 检测敏感操作
    if (this.isSensitiveAction(action) && !this.isCommonActionForUser(userId, action)) {
            
      anomalies.push({
            
        type: 'unusual_action',
        details: `不常见的敏感操作: ${
              action}`
      });
    }
    
    return {
            
      isAnomaly: anomalies.length > 0,
      anomalies
    };
  }
  
  // 检查是否是用户常用IP
  private isCommonIP(userId: string, ip: string): boolean {
            
    const pattern = this.userPatterns.get(userId);
    if (!pattern) return false;
    
    if (pattern.commonIPs.size <= 3) {
            
      return pattern.commonIPs.has(ip);
    } else {
            
      // 对于有多个IP的用户,需要更复杂的判断逻辑
      return pattern.commonIPs.has(ip);
    }
  }
  
  // 检查是否是用户常见操作
  private isCommonActionForUser(userId: string, action: string): boolean {
            
    const pattern = this.userPatterns.get(userId);
    if (!pattern) return false;
    
    const actionCount = pattern.actions.get(action) || 0;
    return actionCount >= 3; // 简单标准:执行过至少3次
  }
  
  // 检查是否是用户常见活动时间
  private isCommonTimeForUser(userId: string, hour: number): boolean {
            
    const pattern = this.userPatterns.get(userId);
    if (!pattern) return false;
    
    // 简单标准:如果用户在这个时间段至少有过5次活动
    return pattern.timeDistribution[hour] >= 5;
  }
  
  // 检查是否是敏感操作
  private isSensitiveAction(action: string): boolean {
            
    const sensitiveActions = [
      'updateUserPermissions',
      'deleteAccount',
      'resetPassword',
      'accessAdminPanel',
      'exportUserData'
    ];
    
    return sensitiveActions.includes(action);
  }
}

// 异常检测结果类型
interface AnomalyResult {
            
  isAnomaly: boolean;
  anomalies?: Array<{
            
    type: string;
    details: string;
  }>;
}

// 用户行为模式类型
interface UserPattern {
            
  userId: string;
  actions: Map<string, number>;
  lastSeen: Date;
  commonIPs: Set<string>;
  timeDistribution: number[]; // 24小时活动分布
}

// 创建MCP服务器和行为分析器
const server = new McpServer();
const behaviorAnalyzer = new BehaviorAnalyzer();

// 将异常检测集成到MCP服务器
server.use(async (context, next) => {
            
  // 跳过未认证的请求
  if (!context.session?.user?.id) {
            
    return next();
  }
  
  const userId = context.session.user.id;
  const action = getActionFromRequest(context.request);
  
  // 分析用户行为
  const result = behaviorAnalyzer.analyzeUserBehavior(userId, action, {
            
    ip: context.request.ip,
    time: new Date(),
    userAgent: context.request.headers?.['user-agent']
  });
  
  // 如果检测到异常
  if (result.isAnomaly) {
            
    // 记录异常
    logger.warn('检测到异常用户行为', {
            
      userId,
      action,
      anomalies: result.anomalies,
      requestId: context.requestId
    });
    
    // 对于高风险异常,可能需要额外验证或阻止
    if (isHighRiskAnomaly(result.anomalies)) {
            
      // 例如,要求额外验证
      if (!context.request.headers?.['secondary-auth-token']) {
            
        throw new Error('检测到异常活动,需要额外验证');
      }
    }
  }
  
  return next();
});

// 从请求中提取操作类型
function getActionFromRequest(request: any): string {
            
  if (request.method === 'tools/call') {
            
    return request.params.name;
  }
  
  return `${
              request.method}`;
}

// 判断是否高风险异常
function isHighRiskAnomaly(anomalies: any[]): boolean {
            
  if (!anomalies) return false;
  
  // 如果有多个异常,视为高风险
  if (anomalies.length >= 2) return true;
  
  // 特定类型的异常视为高风险
  const highRiskTypes = ['unusual_location', 'unusual_action'];
  return anomalies.some(a => highRiskTypes.includes(a.type));
}

4.3 合规性要求适配

不同行业和地区有不同的合规要求,MCP应用需要灵活适配:

import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';

// 合规性配置
interface ComplianceConfig {
            
  dataRetention: {
            
    logRetentionDays: number;
    userDataRetentionDays: number;
  };
  privacy: {
            
    maskPII: boolean;
    allowExport: boolean;
    requireConsent: boolean;
  };
  security: {
            
    mfaRequired: boolean;
    passwordPolicies: {
            
      minLength: number;
      requireSpecialChars: boolean;
      requireNumbers: boolean;
      expiryDays: number;
    };
    sessionTimeout: number; // 分钟
  };
  audit: {
            
    logLevel: 'minimal' | 'standard' | 'verbose';
    sensitiveOperations: string[];
  };
}

// 根据地区获取合规性配置
function getComplianceConfigForRegion(region: string): ComplianceConfig {
            
  // 实际实现中,这些配置应该从配置文件或数据库获取
  switch (region.toUpperCase()) {
            
    case 'EU':
      return {
            
        // GDPR适配配置
        dataRetention: {
            
          logRetentionDays: 90,
          userDataRetentionDays: 730 // 2年
        },
        privacy: {
            
          maskPII: true,
          allowExport: true,
          requireConsent: true
        },
        security: {
            
          mfaRequired: true,
          passwordPolicies: {
            
            minLength: 12,
            requireSpecialChars: true,
            requireNumbers: true,
            expiryDays: 90
          },
          sessionTimeout: 30
        },
        audit: {
            
          logLevel: 'verbose',
          sensitiveOperations: [/* ... */]
        }
      };
    
    case 'US-HEALTHCARE':
      return {
            
        // HIPAA适配配置
        dataRetention: {
            
          logRetentionDays: 365 * 6, // 6年
          userDataRetentionDays: 365 * 6
        },
        privacy: {
            
          maskPII: true,
          allowExport: false,
          requireConsent: true
        },
        security: {
            
          mfaRequired: true,
          passwordPolicies: {
            
            minLength: 14,
            requireSpecialChars: true,
            requireNumbers: true,
            expiryDays: 60
          },
          sessionTimeout: 15
        },
        audit: {
            
          logLevel: 'verbose',
          sensitiveOperations: [/* ... */]
        }
      };
    
    default:
      // 默认配置
      return {
            
        dataRetention: {
            
          logRetentionDays: 365,
          userDataRetentionDays: 365 * 3
        },
        privacy: {
            
          maskPII: false,
          allowExport: true,
          requireConsent: false
        },
        security: {
            
          mfaRequired: false,
          passwordPolicies: {
            
            minLength: 8,
            requireSpecialChars: false,
            requireNumbers: true,
            expiryDays: 120
          },
          sessionTimeout: 60
        },
        audit: {
            
          logLevel: 'standard',
          sensitiveOperations: [/* ... */]
        }
      };
  }
}

// MCP服务器合规性适配器
class ComplianceAdapter {
            
  private config: ComplianceConfig;
  
  constructor(region: string) {
            
    this.config = getComplianceConfigForRegion(region);
  }
  
  // 应用合规设置到MCP服务器
  applyToServer(server: McpServer): void {
            
    // 设置会话超时
    server.setSessionTimeout(this.config.security.sessionTimeout * 60 * 1000);
    
    // 添加合规中间件
    server.use(async (context, next) => {
            
      // 检查会话是否需要MFA
      if (this.config.security.mfaRequired && 
          context.session?.user && 
          !context.session.user.mfaVerified) {
            
        throw new Error('需要多因素认证');
      }
      
      // 记录敏感操作
      if (this.isSensitiveOperation(context.request)) {
            
        this.logCompliance(context, 'sensitive_operation');
      }
      
      // 隐私同意检查
      if (this.config.privacy.requireConsent && 
          this.involvesUserData(context.request) && 
          !context.session?.user?.hasConsented) {
            
        throw new Error('需要用户同意才能处理个人数据');
      }
      
      return next();
    });
  }
  
  // 获取当前合规配置
  getConfig(): ComplianceConfig {
            
    return {
             ...this.config };
  }
  
  // 检查是否为敏感操作
  private isSensitiveOperation(request: any): boolean {
            
    if (request.method !== 'tools/call') return false;
    
    return this.config.audit.sensitiveOperations.includes(request.params.name);
  }
  
  // 检查请求是否涉及用户数据
  private involvesUserData(request: any): boolean {
            
    // 实际实现应基于请求内容和上下文检查
    return request.params?.userData || request.path?.includes('/users/');
  }
  
  // 记录合规日志
  private logCompliance(context: any, eventType: string): void {
            
    const logLevel = this.config.audit.logLevel;
    
    // 根据配置的日志级别决定记录的详细程度
    let logData: any = {
            
      timestamp: new Date(),
      eventType,
      userId: context.session?.user?.id || 'anonymous',
      action: context.request.method
    };
    
    if (logLevel === 'standard' || logLevel === 'verbose') {
            
      logData.requestId = context.requestId;
      logData.ip = context.request.ip;
    }
    
    if (logLevel === 'verbose') {
            
      logData.request = sanitizeLogData(context.request);
      logData.session = sanitizeLogData(context.session);
    }
    
    // 记录合规日志
    logger.info('合规事件', logData);
  }
}

// 使用合规适配器
const region = process.env.COMPLIANCE_REGION || 'DEFAULT';
const complianceAdapter = new ComplianceAdapter(region);
const server = new McpServer();

// 应用合规设置
complianceAdapter.applyToServer(server);

// 数据保留策略实现
const dataRetentionDays = complianceAdapter.getConfig().dataRetention.userDataRetentionDays;
scheduleDataRetentionJob(dataRetentionDays);

4.4 安全事件响应流程

当安全事件发生时,有效的响应流程至关重要:

import {
             McpServer } from '@modelcontextprotocol/typescript-sdk';

// 安全事件类型
enum SecurityEventSeverity {
            
  INFO = 'info',
  LOW = 'low',
  MEDIUM = 'medium',
  HIGH = 'high',
  CRITICAL = 'critical'
}

interface SecurityEvent {
            
  id: string;
  timestamp: Date;
  type: string;
  severity: SecurityEventSeverity;
  source: {
            
    ip?: string;
    userId?: string;
    sessionId?: string;
  };
  details: any;
  status: 'new' | 'investigating' | 'mitigated' | 'resolved' | 'false_positive';
}

// 安全事件处理器
class SecurityEventHandler {
            
  private events: SecurityEvent[] = [];
  
  // 记录安全事件
  recordEvent(event: Omit<SecurityEvent, 'id' | 'timestamp' | 'status'>): SecurityEvent {
            
    const securityEvent: SecurityEvent = {
            
      id: generateEventId(),
      timestamp: new Date(),
      status: 'new',
      ...event
    };
    
    this.events.push(securityEvent);
    
    // 记录事件
    logger.warn('安全事件', securityEvent);
    
    // 对严重事件进行自动响应
    if (event.severity === SecurityEventSeverity.HIGH || 
        event.severity === SecurityEventSeverity.CRITICAL) {
            
      this.triggerAutomatedResponse(securityEvent);
    }
    
    return securityEvent;
  }
  
  // 触发自动响应
  private async triggerAutomatedResponse(event: SecurityEvent): Promise<void> {
            
    logger.info('触发安全事件自动响应', {
             eventId: event.id });
    
    // 基于事件类型执行不同的响应
    switch (event.type) {
            
      case 'brute_force_attempt':
        // 暂时封禁来源IP
        if (event.source.ip) {
            
          await this.banIP(event.source.ip, 30); // 封禁30分钟
        }
        break;
      
      case 'suspicious_access':
        // 锁定账户
        if (event.source.userId) {
            
          await this.lockAccount(event.source.userId);
        }
        break;
        
      case 'data_exfiltration':
        // 终止用户会话
        if (event.source.sessionId) {
            
          await this.terminateSession(event.source.sessionId);
        }
        // 限制用户访问
        if (event.source.userId) {
            
          await this.restrictUserAccess(event.source.userId);
        }
        break;
      
      case 'injection_attempt':
        // 增加来源IP的监控级别
        if (event.source.ip) {
            
          await this.increaseIPMonitoring(event.source.ip);
        }
        break;
    }
    
    // 对关键事件进行通知
    if (event.severity === SecurityEventSeverity.CRITICAL) {
            
      await this.notifySecurityTeam(event);
    }
    
    // 更新事件状态
    this.updateEventStatus(event.id, 'investigating');
  }
  
  // 更新事件状态
  updateEventStatus(eventId: string, status: SecurityEvent['status']): void {
            
    const event = this.events.find(e => e.id === eventId);
    if (event) {
            
      event.status = status;
      logger.info('更新安全事件状态', {
             eventId, status });
    }
  }
  
  // 查询事件
  getEvents(filters: Partial<{
             
    types: string[];
    severities: SecurityEventSeverity[];
    startDate: Date;
    endDate: Date;
    status: SecurityEvent['status']; 
  }> = {
            }): SecurityEvent[] {
            
    let filtered = [...this.events];
    
    if (filters.types) {
            
      filtered = filtered.filter(e => filters.types!.includes(e.type));
    }
    
    if (filters.severities) {
            
      filtered = filtered.filter(e => filters.severities!.includes(e.severity));
    }
    
    if (filters.status) {
            
      filtered = filtered.filter(e => e.status === filters.status);
    }
    
    if (filters.startDate) {
            
      filtered = filtered.filter(e => e.timestamp >= filters.startDate!);
    }
    
    if (filters.endDate) {
            
      filtered = filtered.filter(e => e.timestamp <= filters.endDate!);
    }
    
    return filtered;
  }
  
  // 安全响应方法
  private async banIP(ip: string, minutes: number): Promise<void> {
            
    // 实现IP封禁逻辑
    logger.info(`封禁IP ${
              ip} ${
              minutes}分钟`);
    // ...实现封禁逻辑
  }
  
  private async lockAccount(userId: string): Promise<void> {
            
    // 实现账户锁定逻辑
    logger.info(`锁定用户账户 ${
              userId}`);
    // ...实现锁定逻辑
  }
  
  private async terminateSession(sessionId: string): Promise<void> {
            
    // 实现会话终止逻辑
    logger.info(`终止会话 ${
              sessionId}`);
    // ...实现会话终止逻辑
  }
  
  private async restrictUserAccess(userId: string): Promise<void> {
            
    // 实现用户访问限制逻辑
    logger.info(`限制用户 ${
              userId} 的访问权限`);
    // ...实现访问限制逻辑
  }
  
  private async increaseIPMonitoring(ip: string): Promise<void> {
            
    // 实现增强监控逻辑
    logger.info(`增加对IP ${
              ip} 的监控级别`);
    // ...实现监控升级逻辑
  }
  
  private async notifySecurityTeam(event: SecurityEvent): Promise<void> {
            
    // 实现安全团队通知逻辑
    logger.info(`通知安全团队关于事件 ${
              event.id}`);
    // ...实现通知逻辑
  }
}

// 生成事件ID
function generateEventId(): string {
            
  return `sec-${
              Date.now()}-${
              Math.random().toString(36).substring(2, 10)}`;
}

// 创建MCP服务器和安全事件处理器
const server = new McpServer();
const securityEventHandler = new SecurityEventHandler();

// 添加安全事件检测中间件
server.use(async (context, next) => {
            
  try {
            
    // 处理请求
    return await next();
  } catch (error) {
            
    // 检测安全相关错误
    if (isSecurityError(error)) {
            
      const severity = getErrorSeverity(error);
      const eventType = getSecurityErrorType(error);
      
      // 记录安全事件
      securityEventHandler.recordEvent({
            
        type: eventType,
        severity,
        source: {
            
          ip: context.request.ip,
          userId: context.session?.user?.id,
          sessionId: context.session?.id
        },
        details: {
            
          error: error.message,
          code: error.code,
          request: {
            
            method: context.request.method,
            params: sanitizeLogData(context.request.params)
          }
        }
      });
    }
    
    throw error;
  }
});

// 获取错误的严重性级别
function getErrorSeverity(error: any): SecurityEventSeverity {
            
  const highSeverityErrors = ['INJECTION_ATTEMPT', 'UNAUTHORIZED_TOOL_ACCESS'];
  const mediumSeverityErrors = ['INVALID_TOKEN', 'RATE_LIMIT_EXCEEDED'];
  
  if (highSeverityErrors.includes(error.code)) {
            
    return SecurityEventSeverity.HIGH;
  } else if (mediumSeverityErrors.includes(error.code)) {
            
    return SecurityEventSeverity.MEDIUM;
  }
  
  return SecurityEventSeverity.LOW;
}

通过实施全面的安全审计和合规措施,MCP应用可以有效识别和应对安全威胁,同时满足不同行业和地区的监管要求,确保系统的安全性和合规性。

结语

本文深入探讨了MCP安全体系建设的四个关键方面:威胁模型与风险评估、数据加密与传输安全、注入攻击防御以及安全审计与合规性。通过全面的安全体系建设,MCP应用可以提供高水平的安全保障,保护敏感数据和用户隐私,确保AI模型与外部工具交互的安全性和可靠性。

安全不是一个静态目标,而是一个持续的过程。随着新威胁的出现和技术的发展,MCP安全体系也需要不断演进和完善。开发者应保持警惕,跟踪安全最佳实践的变化,并持续改进MCP应用的安全措施。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容