文章目录
概要
整体架构流程
技术细节
1.配置dify
1.1 添加节点
1.2 Agent节点配置
1.3 Agent策略安装与配置
1.4 Agent mcp服务配置
1.5 查询配置
2. 基于fastmcp进行mcp服务器搭建
2.1 创建server.py函数
2.2 启动mcp服务器
2.3 进入dify进行测试
小结
概要
使用dify+python-fastmcp完成mcp服务器,以实现运维智能客服免知识库进行运维数据获取与分析。
整体架构流程
问题: 配置本地知识库与dify知识库都是不小的工作量,而机器运维场景需要获取实时信息。
基本流程: 使用mcp服务器实现大模型实时根据接口返回数据进行分析,生成分析结果与建议再返回给用户。
场景详情: 本文给出一个运维使用场景,用户可以实时查询某台机器cpu,内存,磁盘使用情况。通过mcp服务器进行远程机器指令调用,将结果返回给agent,最后大模型根据返回结果自行整理语言进行返回
实现效果如下图:
技术细节
1.配置dify
如下图所示,简单在dify中配置mcp服务器配置信息,将dify作为mcp客户端使用。
1.1 添加节点
在dify中创建一个chatflow,然后添加Agent结点。
1.2 Agent节点配置
从这里进入Marketplace。如果发现自己dify没有Agent或者没有MarketPlace的,可能是dify版本过低。这里使用dify1.2版本。
1.3 Agent策略安装与配置
在Marketplace中安装Agent 策略
随后在Agent策略中选择刚刚安装的Agent策略,选择FunctionCalling支持MCP工具
1.4 Agent mcp服务配置
随后配置工具与MCP服务器配置,工具个人测试下来是可选配的。
后续再看看具体是什么功能。因为我把工具无论是删除还是禁用,也不影响mcp服务器的调用。如果有知道的大佬,可以评论区告知一下。
1.5 查询配置
查询这里就和其他的节点配置类似,选择开始的query就行。
2. 基于fastmcp进行mcp服务器搭建
环境:python >= 3.10 已安装fastmcp
2.1 创建server.py函数
使用FastMCP创建一个服务器,使用@mac_server.tool进行工具创建。
填写description进行描述,用来使用dify大模型更容易解析语言去定位使用哪个工具。
RemoteQuickHandler是使用paramiko实现远程调用目标机器类。
main函数中设置transport为sse对应上方dify agent配置中的transport 值
其他几个tool函数,是实现获取内存,cpu,磁盘信息的功能。
from fastmcp import FastMCP
import psutil
from remote_excute import RemoteQuickHandler
# 创建一个MCP服务器实例
mcp_server = FastMCP(name = "My MCP Server",port=8002 , host = "0.0.0.0")
#path = "/xxx" 这里可以自行配置path,不填默认sse
# 定义一个工具
@mcp_server.tool(description = "这是一个计算两个数加法函数")
def add(a: int, b: int) -> int:
return a + b
@mcp_server.tool(description = "这是一个获取当前服务器CPU使用率以及前10高CPU进程信息的函数")
def get_cpu_usage() -> dict:
"""获取当前服务器CPU总使用率及前10高CPU进程信息"""
# 1. 获取整体CPU使用率(间隔0.1秒采样)
total_cpu = psutil.cpu_percent(interval=0.1)
# 2. 获取所有进程的CPU使用率(需处理权限问题)
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
try:
# 进程CPU使用率需要主动更新(部分系统需要)
proc.cpu_percent() # 第一次调用初始化
cpu_pct = proc.cpu_percent(interval=0.1) # 第二次调用获取实际值
processes.append({
"pid": proc.info['pid'],
"name": proc.info['name'],
"cpu_percent": cpu_pct
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
# 跳过已结束或无权限访问的进程
continue
# 3. 按CPU使用率降序排序,取前10
top_processes = sorted(processes, key=lambda x: x['cpu_percent'], reverse=True)[:10]
# 返回结构化数据
return {
"total_cpu_usage": total_cpu,
"top_10_cpu_processes": top_processes
}
@mcp_server.tool(description = "这是一个查询目标服务器CPU使用率以及前10高CPU进程信息的函数")
def get_host_cpu_usage(host_ip: str) -> dict:
"""获取目标服务器CPU总使用率及前10高CPU进程信息"""
print('host_id:', host_ip)
handle = RemoteQuickHandler(ip=host_ip,username = 'dxadm', password = 'DMXXG-djdl@168168%11')
# 构造获取CPU信息的Linux命令(组合mpstat和ps命令)
command = """
ps -eo user,comm,pcpu --sort=-pcpu | head -n 11
"""
command_total = """
vmstat 1 2 | tail -1 | awk '{print 100 - $15}'
"""
# 执行命令并获取结构化结果(根据remote_excute.py的command_with_result返回值调整)
status, result_data = handle.command_with_result(command)
status, result_total = handle.command_with_result(command_total)
# 错误处理:如果有执行失败信息
if result_data["FAIL"]:
return {
"error": f"执行命令失败: {
result_data['FAIL'][0]['msg']}"}
if result_total["FAIL"]:
return {
"error": f"执行命令失败: {
result_total['FAIL'][0]['msg']}"}
# 解析成功输出(从SUCCESS列表中提取msg字段)
output = result_data["SUCCESS"][0]["msg"]
lines = output.strip().split("
")
# 提取CPU总使用率(第一行)
print(result_total['SUCCESS'][0]['msg'].strip())
cpu_usage = float(result_total['SUCCESS'][0]['msg'].strip())
# 提取前10进程信息(跳过标题行,取后续10行)
top_processes = []
for line in lines[1:11]: # ps命令输出前11行包含标题,所以从第2行开始取10行
if line.strip():
parts = line.strip().split()
if len(parts) >= 3: # 至少包含user、comm、pcpu三个字段
top_processes.append({
"user": parts[0],
"command": " ".join(parts[1:-1]), # 合并命令名(可能含空格)
"cpu_usage": parts[-1] # 最后一个字段是CPU使用率
})
return {
"cpu_usage": cpu_usage,
"top_processes": top_processes
}
@mcp_server.tool(description = "这是一个查询目标服务器内存使用情况的函数")
def get_host_memory_usage(host_ip: str) -> dict:
"""获取目标服务器内存总容量(GB)、已用内存(GB)、可用内存(GB)、内存使用率及前5内存占用进程信息(含PID)"""
print('查询内存的目标IP:', host_ip)
handle = RemoteQuickHandler(ip=host_ip, username='dxadm', password='DMXXG-djdl@168168%11')
# 构造获取内存信息的Linux命令(总体内存+进程内存)
command_total = """
free -b | grep Mem
"""
command_process = """
ps -eo pid,user,comm,%mem --sort=-%mem | head -n 6 # 前5进程+标题行(含PID)
"""
# 执行两个命令并获取结果(原有逻辑不变)
status, result_total = handle.command_with_result(command_total)
status, result_process = handle.command_with_result(command_process)
# 错误处理(原有逻辑不变)
if result_total["FAIL"]:
return {
"error": f"执行总体内存查询失败: {
result_total['FAIL'][0]['msg']}"}
if result_process["FAIL"]:
return {
"error": f"执行进程内存查询失败: {
result_process['FAIL'][0]['msg']}"}
# 解析总体内存信息(新增单位转换)
total_output = result_total["SUCCESS"][0]["msg"].strip()
total_parts = total_output.split()
if len(total_parts) < 7:
return {
"error": f"总体内存解析失败,无效格式: {
total_output}"}
try:
# 字节转GB(1GB=1024^3字节)
total_memory_gb = round(int(total_parts[1]) / (1024 ** 3), 2)
used_memory_gb = round(int(total_parts[2]) / (1024 ** 3), 2)
available_memory_gb = round(int(total_parts[6]) / (1024 ** 3), 2)
memory_usage_percent = round(used_memory_gb / total_memory_gb * 100, 2) if total_memory_gb != 0 else 0.0
except (IndexError, ValueError) as e:
return {
"error": f"总体内存解析错误: {
str(e)}"}
# 解析进程内存信息(原有逻辑,已包含PID)
process_output = result_process["SUCCESS"][0]["msg"].strip()
process_lines = process_output.split("
")
top_processes = []
if len(process_lines) >= 2: # 至少包含标题行和一个数据行
for line in process_lines[1:6]: # 跳过标题行,取前5数据行
line = line.strip()
if not line:
continue
parts = line.split()
if len(parts) >= 4: # 至少包含pid,user,comm,%mem四个字段
try:
top_processes.append({
"pid": parts[0], # PID已保留,用于区分同名进程
"user": parts[1],
"command": " ".join(parts[2:-1]), # 合并命令名(可能含空格)
"memory_percent": float(parts[-1]) # 内存使用率(%)
})
except (IndexError, ValueError):
continue # 跳过解析失败的行
return {
"total_memory_gb": total_memory_gb,
"used_memory_gb": used_memory_gb,
"available_memory_gb": available_memory_gb,
"memory_usage_percent": memory_usage_percent,
"top_5_memory_processes": top_processes # 包含PID的进程信息
}
@mcp_server.tool(description = "这是一个查询目标服务器磁盘使用情况的函数(按GB显示,返回所有目录信息)")
def get_host_disk_usage(host_ip: str) -> dict:
"""获取目标服务器所有目录的磁盘使用情况(按GB显示),包含总容量、已用空间、可用空间及磁盘使用率"""
print('查询硬盘的目标IP:', host_ip)
handle = RemoteQuickHandler(ip=host_ip, username='dxadm', password='DMXXG-djdl@168168%11')
# 构造获取磁盘信息的Linux命令(使用df -h获取人类可读格式,包含所有目录)
command = """
df -h
"""
# 执行命令并获取结果
status, result = handle.command_with_result(command)
# 错误处理
if result["FAIL"]:
return {
"error": f"执行硬盘查询命令失败: {
result['FAIL'][0]['msg']}"}
# 解析输出(df -h的格式:Filesystem Size Used Avail Use% Mounted on)
output = result["SUCCESS"][0]["msg"].strip()
lines = output.split("
")
if len(lines) < 2: # 至少包含标题行和一个数据行
return {
"error": f"硬盘信息解析失败,无效输出格式: {
output}"}
# 跳过标题行(第一行),处理所有数据行
disk_info = []
for line in lines[1:]:
line = line.strip()
if not line:
continue # 跳过空行
parts = line.split()
if len(parts) < 6: # 确保至少有6个字段(Filesystem, Size, Used, Avail, Use%, Mounted)
continue # 跳过格式异常的行
try:
# 解析各字段(单位可能是G/M/T,转换为GB)
size = _convert_to_gb(parts[1])
used = _convert_to_gb(parts[2])
avail = _convert_to_gb(parts[3])
usage_percent = float(parts[4].strip('%'))
mount_point = parts[5]
disk_info.append({
"filesystem": parts[0],
"total_gb": size,
"used_gb": used,
"available_gb": avail,
"usage_percent": usage_percent,
"mount_point": mount_point
})
except (IndexError, ValueError):
continue # 跳过解析失败的行
return {
"disk_usage": disk_info}
def _convert_to_gb(size_str: str) -> float:
"""辅助函数:将df -h的大小字符串(如'5.5G'、'100M'、'2T')转换为GB数值"""
size_str = size_str.upper()
if 'T' in size_str:
return float(size_str.replace('T', '')) * 1024 # 1T=1024GB
elif 'G' in size_str:
return float(size_str.replace('G', ''))
elif 'M' in size_str:
return float(size_str.replace('M', '')) / 1024 # 1M=0.0009765625GB
elif 'K' in size_str:
return float(size_str.replace('K', '')) / (1024 * 1024) # 1K=0.00000095367GB
else:
return float(size_str) # 假设是GB单位(如直接显示数字)
if __name__ == "__main__":
mcp_server.run(transport='sse')
2.2 启动mcp服务器
最简单的启动即可。
python3 server.py
2.3 进入dify进行测试
在dify中点击发布更新后,点击预览即可进入聊天页面,然后发送需要的内容即可。
小结
本文介绍了使用dify + python-fastmcp 实现MCP服务器。
在此demo上,还可接入前后端,进行功能扩展。
暂无评论内容