使用Dify+fastmcp 实现mcp服务,内含详细步骤与源码

文章目录

概要
整体架构流程
技术细节

1.配置dify

1.1 添加节点
1.2 Agent节点配置
1.3 Agent策略安装与配置
1.4 Agent mcp服务配置
1.5 查询配置

2. 基于fastmcp进行mcp服务器搭建

2.1 创建server.py函数
2.2 启动mcp服务器
2.3 进入dify进行测试

小结

概要

使用dify+python-fastmcp完成mcp服务器,以实现运维智能客服免知识库进行运维数据获取与分析。

整体架构流程

问题: 配置本地知识库与dify知识库都是不小的工作量,而机器运维场景需要获取实时信息。

基本流程: 使用mcp服务器实现大模型实时根据接口返回数据进行分析,生成分析结果与建议再返回给用户。

场景详情: 本文给出一个运维使用场景,用户可以实时查询某台机器cpu,内存,磁盘使用情况。通过mcp服务器进行远程机器指令调用,将结果返回给agent,最后大模型根据返回结果自行整理语言进行返回

实现效果如下图:

技术细节

1.配置dify

如下图所示,简单在dify中配置mcp服务器配置信息,将dify作为mcp客户端使用。

1.1 添加节点

在dify中创建一个chatflow,然后添加Agent结点。

1.2 Agent节点配置

从这里进入Marketplace。如果发现自己dify没有Agent或者没有MarketPlace的,可能是dify版本过低。这里使用dify1.2版本。

1.3 Agent策略安装与配置

在Marketplace中安装Agent 策略
图片[1] - 使用Dify+fastmcp 实现mcp服务,内含详细步骤与源码 - 宋马
随后在Agent策略中选择刚刚安装的Agent策略,选择FunctionCalling支持MCP工具

1.4 Agent mcp服务配置

随后配置工具与MCP服务器配置,工具个人测试下来是可选配的。
后续再看看具体是什么功能。因为我把工具无论是删除还是禁用,也不影响mcp服务器的调用。如果有知道的大佬,可以评论区告知一下。

1.5 查询配置

查询这里就和其他的节点配置类似,选择开始的query就行。

2. 基于fastmcp进行mcp服务器搭建

环境:python >= 3.10 已安装fastmcp

2.1 创建server.py函数

使用FastMCP创建一个服务器,使用@mac_server.tool进行工具创建。
填写description进行描述,用来使用dify大模型更容易解析语言去定位使用哪个工具。
RemoteQuickHandler是使用paramiko实现远程调用目标机器类。
main函数中设置transport为sse对应上方dify agent配置中的transport 值
其他几个tool函数,是实现获取内存,cpu,磁盘信息的功能。


from fastmcp import FastMCP
import psutil
from remote_excute import RemoteQuickHandler
# 创建一个MCP服务器实例
mcp_server = FastMCP(name = "My MCP Server",port=8002 , host = "0.0.0.0")
 #path = "/xxx" 这里可以自行配置path,不填默认sse

# 定义一个工具
@mcp_server.tool(description = "这是一个计算两个数加法函数")
def add(a: int, b: int) -> int:
    
    return a + b

@mcp_server.tool(description = "这是一个获取当前服务器CPU使用率以及前10高CPU进程信息的函数")
def get_cpu_usage() -> dict:
    """获取当前服务器CPU总使用率及前10高CPU进程信息"""
    # 1. 获取整体CPU使用率(间隔0.1秒采样)
    total_cpu = psutil.cpu_percent(interval=0.1)
    
    # 2. 获取所有进程的CPU使用率(需处理权限问题)
    processes = []
    for proc in psutil.process_iter(['pid', 'name', 'cpu_percent']):
        try:
            # 进程CPU使用率需要主动更新(部分系统需要)
            proc.cpu_percent()  # 第一次调用初始化
            cpu_pct = proc.cpu_percent(interval=0.1)  # 第二次调用获取实际值
            processes.append({
            
                "pid": proc.info['pid'],
                "name": proc.info['name'],
                "cpu_percent": cpu_pct
            })
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            # 跳过已结束或无权限访问的进程
            continue
    
    # 3. 按CPU使用率降序排序,取前10
    top_processes = sorted(processes, key=lambda x: x['cpu_percent'], reverse=True)[:10]
    
    # 返回结构化数据
    return {
            
        "total_cpu_usage": total_cpu,
        "top_10_cpu_processes": top_processes
    }

@mcp_server.tool(description = "这是一个查询目标服务器CPU使用率以及前10高CPU进程信息的函数")
def get_host_cpu_usage(host_ip: str) -> dict:
    """获取目标服务器CPU总使用率及前10高CPU进程信息"""
    print('host_id:', host_ip)
    handle = RemoteQuickHandler(ip=host_ip,username = 'dxadm', password = 'DMXXG-djdl@168168%11')
    
    # 构造获取CPU信息的Linux命令(组合mpstat和ps命令)
    command =  """
    ps -eo user,comm,pcpu --sort=-pcpu | head -n 11
    """
    command_total = """
    vmstat 1 2 | tail -1 | awk '{print 100 - $15}'
    """
    # 执行命令并获取结构化结果(根据remote_excute.py的command_with_result返回值调整)
    status, result_data = handle.command_with_result(command)
    status, result_total = handle.command_with_result(command_total)
    # 错误处理:如果有执行失败信息
    if result_data["FAIL"]:
        return {
            "error": f"执行命令失败: {
              result_data['FAIL'][0]['msg']}"}
    if result_total["FAIL"]:
        return {
            "error": f"执行命令失败: {
              result_total['FAIL'][0]['msg']}"}
    # 解析成功输出(从SUCCESS列表中提取msg字段)
    output = result_data["SUCCESS"][0]["msg"]
    lines = output.strip().split("
")
    
    
    # 提取CPU总使用率(第一行)
    print(result_total['SUCCESS'][0]['msg'].strip())
    cpu_usage = float(result_total['SUCCESS'][0]['msg'].strip())
    
    # 提取前10进程信息(跳过标题行,取后续10行)
    top_processes = []
    for line in lines[1:11]:  # ps命令输出前11行包含标题,所以从第2行开始取10行
        if line.strip():
            parts = line.strip().split()
            if len(parts) >= 3:  # 至少包含user、comm、pcpu三个字段
                top_processes.append({
            
                    "user": parts[0],
                    "command": " ".join(parts[1:-1]),  # 合并命令名(可能含空格)
                    "cpu_usage": parts[-1]  # 最后一个字段是CPU使用率
                })
    
    return {
            
        "cpu_usage": cpu_usage,
        "top_processes": top_processes
    }

@mcp_server.tool(description = "这是一个查询目标服务器内存使用情况的函数")
def get_host_memory_usage(host_ip: str) -> dict:
    """获取目标服务器内存总容量(GB)、已用内存(GB)、可用内存(GB)、内存使用率及前5内存占用进程信息(含PID)"""
    print('查询内存的目标IP:', host_ip)
    handle = RemoteQuickHandler(ip=host_ip, username='dxadm', password='DMXXG-djdl@168168%11')
    
    # 构造获取内存信息的Linux命令(总体内存+进程内存)
    command_total = """
    free -b | grep Mem
    """
    command_process = """
    ps -eo pid,user,comm,%mem --sort=-%mem | head -n 6  # 前5进程+标题行(含PID)
    """
    
    # 执行两个命令并获取结果(原有逻辑不变)
    status, result_total = handle.command_with_result(command_total)
    status, result_process = handle.command_with_result(command_process)
    
    # 错误处理(原有逻辑不变)
    if result_total["FAIL"]:
        return {
            "error": f"执行总体内存查询失败: {
              result_total['FAIL'][0]['msg']}"}
    if result_process["FAIL"]:
        return {
            "error": f"执行进程内存查询失败: {
              result_process['FAIL'][0]['msg']}"}
    
    # 解析总体内存信息(新增单位转换)
    total_output = result_total["SUCCESS"][0]["msg"].strip()
    total_parts = total_output.split()
    if len(total_parts) < 7:
        return {
            "error": f"总体内存解析失败,无效格式: {
              total_output}"}
    try:
        # 字节转GB(1GB=1024^3字节)
        total_memory_gb = round(int(total_parts[1]) / (1024 ** 3), 2)
        used_memory_gb = round(int(total_parts[2]) / (1024 ** 3), 2)
        available_memory_gb = round(int(total_parts[6]) / (1024 ** 3), 2)
        memory_usage_percent = round(used_memory_gb / total_memory_gb * 100, 2) if total_memory_gb != 0 else 0.0
    except (IndexError, ValueError) as e:
        return {
            "error": f"总体内存解析错误: {
              str(e)}"}
    
    # 解析进程内存信息(原有逻辑,已包含PID)
    process_output = result_process["SUCCESS"][0]["msg"].strip()
    process_lines = process_output.split("
")
    top_processes = []
    if len(process_lines) >= 2:  # 至少包含标题行和一个数据行
        for line in process_lines[1:6]:  # 跳过标题行,取前5数据行
            line = line.strip()
            if not line:
                continue
            parts = line.split()
            if len(parts) >= 4:  # 至少包含pid,user,comm,%mem四个字段
                try:
                    top_processes.append({
            
                        "pid": parts[0],  # PID已保留,用于区分同名进程
                        "user": parts[1],
                        "command": " ".join(parts[2:-1]),  # 合并命令名(可能含空格)
                        "memory_percent": float(parts[-1])  # 内存使用率(%)
                    })
                except (IndexError, ValueError):
                    continue  # 跳过解析失败的行
    
    return {
            
        "total_memory_gb": total_memory_gb,
        "used_memory_gb": used_memory_gb,
        "available_memory_gb": available_memory_gb,
        "memory_usage_percent": memory_usage_percent,
        "top_5_memory_processes": top_processes  # 包含PID的进程信息
    }

@mcp_server.tool(description = "这是一个查询目标服务器磁盘使用情况的函数(按GB显示,返回所有目录信息)")
def get_host_disk_usage(host_ip: str) -> dict:
    """获取目标服务器所有目录的磁盘使用情况(按GB显示),包含总容量、已用空间、可用空间及磁盘使用率"""
    print('查询硬盘的目标IP:', host_ip)
    handle = RemoteQuickHandler(ip=host_ip, username='dxadm', password='DMXXG-djdl@168168%11')
    
    # 构造获取磁盘信息的Linux命令(使用df -h获取人类可读格式,包含所有目录)
    command = """
    df -h
    """
    
    # 执行命令并获取结果
    status, result = handle.command_with_result(command)
    
    # 错误处理
    if result["FAIL"]:
        return {
            "error": f"执行硬盘查询命令失败: {
              result['FAIL'][0]['msg']}"}
    
    # 解析输出(df -h的格式:Filesystem Size Used Avail Use% Mounted on)
    output = result["SUCCESS"][0]["msg"].strip()
    lines = output.split("
")
    if len(lines) < 2:  # 至少包含标题行和一个数据行
        return {
            "error": f"硬盘信息解析失败,无效输出格式: {
              output}"}
    
    # 跳过标题行(第一行),处理所有数据行
    disk_info = []
    for line in lines[1:]:
        line = line.strip()
        if not line:
            continue  # 跳过空行
        
        parts = line.split()
        if len(parts) < 6:  # 确保至少有6个字段(Filesystem, Size, Used, Avail, Use%, Mounted)
            continue  # 跳过格式异常的行
        
        try:
            # 解析各字段(单位可能是G/M/T,转换为GB)
            size = _convert_to_gb(parts[1])
            used = _convert_to_gb(parts[2])
            avail = _convert_to_gb(parts[3])
            usage_percent = float(parts[4].strip('%'))
            mount_point = parts[5]
            
            disk_info.append({
            
                "filesystem": parts[0],
                "total_gb": size,
                "used_gb": used,
                "available_gb": avail,
                "usage_percent": usage_percent,
                "mount_point": mount_point
            })
        except (IndexError, ValueError):
            continue  # 跳过解析失败的行
    
    return {
            "disk_usage": disk_info}

def _convert_to_gb(size_str: str) -> float:
    """辅助函数:将df -h的大小字符串(如'5.5G'、'100M'、'2T')转换为GB数值"""
    size_str = size_str.upper()
    if 'T' in size_str:
        return float(size_str.replace('T', '')) * 1024  # 1T=1024GB
    elif 'G' in size_str:
        return float(size_str.replace('G', ''))
    elif 'M' in size_str:
        return float(size_str.replace('M', '')) / 1024  # 1M=0.0009765625GB
    elif 'K' in size_str:
        return float(size_str.replace('K', '')) / (1024 * 1024)  # 1K=0.00000095367GB
    else:
        return float(size_str)  # 假设是GB单位(如直接显示数字)

    
if __name__ == "__main__":
    mcp_server.run(transport='sse')

2.2 启动mcp服务器

最简单的启动即可。

python3 server.py
2.3 进入dify进行测试

在dify中点击发布更新后,点击预览即可进入聊天页面,然后发送需要的内容即可。

小结

本文介绍了使用dify + python-fastmcp 实现MCP服务器。
在此demo上,还可接入前后端,进行功能扩展。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容