011-统一网络控制器 Func 理论与实践

统一网络控制器 Func 详解

随着IT基础设施的规模和复杂性不断增长,网络设备的集中管理和自动化配置变得越来越重要。Func (Federated Universal Conductor) 是一个开源的统一网络控制器,它提供了强大的远程执行框架,可以跨多种网络设备和服务器进行集中化管理。本章将详细介绍Func的架构、安装配置和核心功能,帮助读者掌握这一强大的网络自动化工具。

11.1 Func 的安装

Func是一个基于Python的开源项目,它支持多种操作系统平台,并提供了灵活的安装方式。在安装Func之前,让我们先了解其架构和组件。

11.1.1 业务环境说明

Func采用主从架构(Master-Minion),包含以下主要组件:

Func Master:中央控制服务器,负责发送命令并接收结果
Func Minion:运行在被管理设备上的客户端程序,负责执行来自Master的命令
Certmaster:证书管理组件,负责SSL/TLS证书的签发和验证
Overlord:Web界面组件(可选),提供图形化管理界面

Func的主要特点包括:

安全通信:使用SSL/TLS加密所有通信
模块化设计:通过模块扩展功能,支持自定义模块
多设备支持:可管理各种网络设备和服务器
API友好:提供Python API,便于与其他系统集成
分组管理:支持设备分组,便于批量操作

Func的通信机制基于XML-RPC,使用SSL/TLS确保安全性。Minion启动后会监听端口(默认为51234),等待来自Master的命令。

11.1.2 安装 Func

Func可以通过包管理器、pip或从源码安装。以下是各种安装方法的详细说明。

依赖项

首先,确保系统满足以下依赖要求:

Python 2.7或Python 3.6+(推荐Python 3.8+)
OpenSSL
PyYAML
pyOpenSSL
Python setuptools

使用包管理器安装

CentOS/RHEL 7/8

首先添加EPEL仓库:

bash

# CentOS/RHEL 7
sudo yum install epel-release

# CentOS/RHEL 8
sudo dnf install epel-release

然后安装Func:

bash

# CentOS/RHEL 7
sudo yum install func func-certmaster

# CentOS/RHEL 8
sudo dnf install func func-certmaster

Fedora

bash

sudo dnf install func func-certmaster

Ubuntu/Debian

Ubuntu和Debian的官方仓库可能不包含最新版本的Func,可以通过PPA仓库或从源码安装。

使用pip安装

bash

# 安装Func
pip install func-minion

# 安装Certmaster
pip install certmaster
从源码安装

从源码安装可以获得最新的功能和修复:

bash

# 克隆代码库
git clone https://github.com/func/func.git
cd func

# 安装
python setup.py install

对于Certmaster:

bash

git clone https://github.com/func/certmaster.git
cd certmaster
python setup.py install
Docker安装

也可以使用Docker容器运行Func:

bash

# 拉取官方镜像
docker pull funcproject/func-master
docker pull funcproject/func-minion

# 运行Master容器
docker run -d --name func-master -p 51235:51235 -p 51234:51234 funcproject/func-master

# 运行Minion容器(替换YOUR_MINION_NAME和MASTER_IP)
docker run -d --name func-minion-1 -e MINION_NAME=YOUR_MINION_NAME -e MASTER_IP=MASTER_IP funcproject/func-minion

配置Certmaster

Certmaster是Func的证书管理组件,负责生成和分发SSL/TLS证书。安装后,需要进行配置:

编辑Certmaster配置文件:

bash

sudo nano /etc/certmaster/certmaster.conf

配置示例:

ini

[main]
listen_addr = 0.0.0.0
listen_port = 51235
cadir = /etc/pki/certmaster/ca
certroot = /var/lib/certmaster/certmaster/certs
csrroot = /var/lib/certmaster/certmaster/csrs
cert_dir = /etc/pki/certmaster
autosign = true

启动Certmaster服务:

bash

# 对于systemd系统
sudo systemctl enable certmaster
sudo systemctl start certmaster

# 对于SysV init系统
sudo service certmaster start
sudo chkconfig certmaster on

验证Certmaster运行状态:

bash

sudo systemctl status certmaster

配置Func Master

配置Func Master以管理网络设备和服务器:

编辑Func Master配置文件:

bash

sudo nano /etc/func/minion.conf

配置示例:

ini

[main]
certmaster = localhost
cert_dir = /etc/pki/certmaster
acl_dir = /etc/func/acls
listen_addr = 0.0.0.0
listen_port = 51234
method = sha1

启动Func Master服务:

bash

# 对于systemd系统
sudo systemctl enable funcd
sudo systemctl start funcd

# 对于SysV init系统
sudo service funcd start
sudo chkconfig funcd on

验证Func Master运行状态:

bash

sudo systemctl status funcd

配置Func Minion

在所有需要管理的设备上安装和配置Func Minion:

编辑Minion配置文件:

bash

sudo nano /etc/func/minion.conf

配置示例:

ini

[main]
certmaster = MASTER_IP_ADDRESS
cert_dir = /etc/pki/certmaster
acl_dir = /etc/func/acls
listen_addr = 0.0.0.0
listen_port = 51234
method = sha1

# 设置Minion ID(可选,默认使用主机名)
minion_id = device1.example.com

启动Func Minion服务:

bash

# 对于systemd系统
sudo systemctl enable funcd
sudo systemctl start funcd

# 对于SysV init系统
sudo service funcd start
sudo chkconfig funcd on

证书签发和验证

Minion首次启动时会向Certmaster请求证书:

在Minion上生成证书签名请求(CSR)并发送到Certmaster:

bash

sudo certmaster-request

在Master上查看待处理的证书请求:

bash

sudo certmaster-ca --list-pending

在Master上签发证书(如果未启用自动签名):

bash

sudo certmaster-ca --sign hostname.example.com

验证Minion连接:

bash

func 'hostname.example.com' ping

如果返回”1″,则表示连接成功。

高级配置选项

ACL(访问控制列表)

Func支持基于ACL的访问控制,可以限制特定用户或组对特定模块和方法的访问:

bash

sudo mkdir -p /etc/func/acls
sudo nano /etc/func/acls/example.acl

ACL文件示例:

yaml

# 允许admin组访问所有模块和方法
admin:
  - .*: .*

# 限制operator组只能访问特定模块和方法
operator:
  - system.list_modules: .*
  - command.run: .*
  - hardware.info: .*
配置Overlord Web界面(可选)

Overlord提供了一个Web界面,方便通过浏览器管理Func:

bash

# 安装Overlord
sudo yum install func-overlord  # CentOS/RHEL
sudo dnf install func-overlord  # Fedora

# 配置Overlord
sudo nano /etc/func/overlord.conf

# 启动Overlord服务
sudo systemctl enable func-overlord
sudo systemctl start func-overlord

默认情况下,Overlord监听在http://localhost:8080,可以通过浏览器访问。

11.2 Func 常用模块及 API

Func提供了丰富的内置模块,涵盖了网络管理、系统配置和应用部署等多个方面。下面介绍Func的常用模块和API用法。

Python API基础

Func的Python API允许从Python脚本或交互式解释器中调用Func功能:

python

from func.overlord.client import Client

# 创建Client实例
fc = Client("*.example.com")

# 调用ping方法检查连接
results = fc.ping()
for host, result in results.items():
    print(f"{host}: {'Online' if result else 'Offline'}")

# 执行shell命令
results = fc.command.run("uptime")
for host, result in results.items():
    print(f"{host}: {result}")

命令行工具

除了Python API,Func还提供了命令行工具:

bash

# 基本语法
func [target] module.method [arguments]

# 例如:
func '*' ping
func 'web*' command.run 'uptime'

11.2.1 选择目标主机

Func提供多种方式来选择目标主机:

使用通配符

bash

# 匹配所有Minion
func '*' ping

# 匹配特定前缀的主机
func 'web*' ping

# 匹配特定后缀的主机
func '*.example.com' ping
使用主机列表

bash

# 使用逗号分隔的主机列表
func 'web1.example.com,web2.example.com,db1.example.com' ping

# 在Python API中使用列表
fc = Client(["web1.example.com", "web2.example.com"])
使用组

Func支持将主机组织成组,便于管理:

bash

# 定义组(在Func Master上)
echo "webservers: web1.example.com web2.example.com" > /etc/func/groups/webservers.group
echo "dbservers: db1.example.com db2.example.com" > /etc/func/groups/dbservers.group

# 使用组名
func '@webservers' ping

# 组合使用
func '@webservers,@dbservers' ping
在Python中使用组

python

from func.overlord.client import Client

# 使用组名
fc = Client("@webservers")
results = fc.ping()

# 组合使用
fc = Client("@webservers,@dbservers")
results = fc.ping()

11.2.2 常用模块详解

Func提供了多种内置模块,用于执行不同类型的操作:

1. command模块

用于执行Shell命令:

bash

# 执行单个命令
func '*' command.run 'ls -la /var/log'

# 执行包含管道的命令
func '*' command.run 'ps aux | grep httpd'

# 执行命令并获取状态码
func '*' command.run_status 'service httpd status'

Python API示例:

python

fc = Client("*")
results = fc.command.run("df -h")
for host, result in results.items():
    print(f"{host}:
{result}")

# 执行复杂命令
script = """
if [ -f /etc/redhat-release ]; then
    echo "RHEL-based system"
    cat /etc/redhat-release
else
    echo "Non-RHEL system"
    cat /etc/os-release
fi
"""
results = fc.command.run(script)
2. file模块

用于管理文件和目录:

bash

# 检查文件是否存在
func '*' file.exists '/etc/passwd'

# 读取文件内容
func 'web1.example.com' file.read '/etc/nginx/nginx.conf'

# 写入文件
func '*' file.write '/tmp/test.txt' 'Hello World'

# 获取文件stat信息
func '*' file.stat '/etc/passwd'

Python API示例:

python

fc = Client("*")

# 上传文件
with open("local_file.txt", "r") as f:
    content = f.read()
results = fc.file.write("/tmp/remote_file.txt", content)

# 下载文件
results = fc.file.read("/etc/hostname")
for host, content in results.items():
    with open(f"{host}_hostname.txt", "w") as f:
        f.write(content)
3. network模块

用于网络配置和诊断:

bash

# 获取网络接口信息
func '*' network.interfaces

# 获取主机名
func '*' network.hostname

# 执行ping测试
func '*' network.ping 'google.com'

# 获取路由表
func '*' network.routes

Python API示例:

python

fc = Client("*")

# 获取所有主机的IP地址
results = fc.network.interfaces()
for host, interfaces in results.items():
    print(f"Host: {host}")
    for interface, data in interfaces.items():
        if 'inet' in data:
            print(f"  {interface}: {data['inet']}")
4. service模块

用于管理系统服务:

bash

# 获取服务状态
func '*' service.status 'httpd'

# 启动服务
func '*' service.start 'nginx'

# 停止服务
func '*' service.stop 'mysql'

# 重启服务
func '*' service.restart 'postgresql'

Python API示例:

python

fc = Client("*")

# 检查并启动服务
results = fc.service.status("httpd")
for host, status in results.items():
    if not status:
        print(f"Starting httpd on {host}")
        fc.service.start("httpd", [host])
5. yum/apt模块

用于包管理:

bash

# 检查包是否已安装
func '*' yum.check_update 'nginx'

# 安装软件包
func '*' yum.install 'httpd'

# 更新所有包
func '*' yum.update_all

对于Debian/Ubuntu系统:

bash

# 更新包列表
func '*' apt.update

# 安装软件包
func '*' apt.install 'nginx'

Python API示例:

python

fc = Client("*")

# 安装多个包
packages = ["nginx", "httpd", "redis"]
for package in packages:
    results = fc.yum.install(package)
    for host, result in results.items():
        if result[0] == 0:  # 检查返回码
            print(f"Successfully installed {package} on {host}")
        else:
            print(f"Failed to install {package} on {host}: {result[1]}")
6. hardware模块

用于获取硬件信息:

bash

# 获取内存信息
func '*' hardware.mem

# 获取CPU信息
func '*' hardware.cpu

# 获取磁盘信息
func '*' hardware.disk

Python API示例:

python

fc = Client("*")

# 获取所有主机的内存信息
results = fc.hardware.mem()
for host, mem_info in results.items():
    total_mb = mem_info["total"] // 1024
    free_mb = mem_info["free"] // 1024
    print(f"{host}: Total Memory: {total_mb}MB, Free: {free_mb}MB")
7. func模块

用于Func自身管理:

bash

# 列出可用模块
func '*' func.list_modules

# 列出模块方法
func '*' func.list_methods 'command'

# 获取方法文档
func '*' func.help 'command.run'

Python API示例:

python

fc = Client("*")

# 获取可用模块列表
results = fc.func.list_modules()
for host, modules in results.items():
    print(f"Modules available on {host}:")
    for module in sorted(modules):
        print(f"  - {module}")
8. cron模块

用于管理计划任务:

bash

# 列出cron作业
func '*' cron.list_jobs

# 添加cron作业
func '*' cron.add_job '0 2 * * *' 'root' '/usr/local/bin/backup.sh'

# 删除cron作业
func '*' cron.del_job '0 2 * * *' 'root' '/usr/local/bin/backup.sh'

Python API示例:

python

fc = Client("*")

# 添加多个计划任务
cron_jobs = [
    {"minute": "0", "hour": "2", "user": "root", "command": "/usr/local/bin/backup.sh"},
    {"minute": "*/5", "hour": "*", "user": "www", "command": "/usr/local/bin/check-web.sh"}
]

for job in cron_jobs:
    fc.cron.add_job(job["minute"], job["hour"], "*", "*", "*", job["user"], job["command"])
9. user模块

用于用户管理:

bash

# 添加用户
func '*' user.add 'newuser'

# 设置密码
func '*' user.set_password 'newuser' 'password'

# 将用户添加到组
func '*' user.add_to_group 'newuser' 'wheel'

# 删除用户
func '*' user.delete 'olduser'

Python API示例:

python

fc = Client("*")

# 创建用户并配置
username = "appuser"
results = fc.user.add(username, home="/opt/app", shell="/bin/bash")

# 设置密码(生产环境不推荐直接在脚本中使用明文密码)
import hashlib
import crypt
import random
import string

# 生成随机盐值
salt = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(8))
# 创建加密密码
hashed_pw = crypt.crypt("secure_password", f"$6${salt}$")

fc.user.set_password(username, hashed_pw)
fc.user.add_to_group(username, "app")
10. copy模块

用于文件复制和同步:

bash

# 从Master复制文件到Minion
func '*' copy.from_master '/path/on/master/file.txt' '/path/on/minion/file.txt'

# 从Minion复制文件到Master
func 'minion.example.com' copy.to_master '/path/on/minion/file.txt' '/path/on/master/file.txt'

Python API示例:

python

fc = Client("*")

# 将配置文件分发到所有主机
source_files = [
    "/etc/func/master_configs/nginx.conf",
    "/etc/func/master_configs/app.conf"
]

dest_dirs = {
    "nginx.conf": "/etc/nginx/nginx.conf",
    "app.conf": "/etc/app/app.conf"
}

for source in source_files:
    filename = os.path.basename(source)
    if filename in dest_dirs:
        fc.copy.from_master(source, dest_dirs[filename])

11.3 自定义 Func 模块

Func的一个强大特性是能够创建自定义模块,扩展其功能以满足特定需求。本节将介绍如何创建、部署和使用自定义Func模块。

模块基本结构

Func模块是Python类,继承自func.minion.modules.func_module.FuncModule。一个基本模块结构如下:

python

from func.minion.modules.func_module import FuncModule

class MyModule(FuncModule):
    
    # 版本信息
    version = "1.0"
    
    # 模块描述
    description = "My custom Func module"
    
    # 方法实现
    def hello_world(self):
        """
        返回Hello World消息
        """
        return "Hello World from %s" % self.minion_id
    
    def echo(self, message):
        """
        回显传入的消息
        
        @param message: 要回显的消息
        @return: 相同的消息
        """
        return message

创建自定义模块

下面,我们将创建一个更实用的自定义模块,用于网络诊断:

在Minion上创建模块目录(如果不存在):

bash

sudo mkdir -p /usr/lib/python3/dist-packages/func/minion/modules/custom

创建网络诊断模块:

bash

sudo nano /usr/lib/python3/dist-packages/func/minion/modules/custom/netdiag.py

添加以下代码:

python

from func.minion.modules.func_module import FuncModule
import subprocess
import socket
import json
import re
import os

class NetDiag(FuncModule):
    
    version = "1.0"
    description = "Network diagnostics module"
    
    def _run_command(self, command):
        """运行shell命令并返回输出"""
        process = subprocess.Popen(
            command,
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        stdout, stderr = process.communicate()
        return {
            'returncode': process.returncode,
            'stdout': stdout.decode('utf-8', errors='replace'),
            'stderr': stderr.decode('utf-8', errors='replace')
        }
    
    def ping_test(self, target, count=4):
        """
        执行ping测试
        
        @param target: 目标主机
        @param count: ping包数量
        @return: ping测试结果
        """
        command = f"ping -c {count} {target}"
        result = self._run_command(command)
        
        # 解析ping结果
        if result['returncode'] == 0:
            # 提取ping统计信息
            stats_pattern = r"(d+) packets transmitted, (d+) received, (d+)% packet loss, time (d+)ms"
            stats_match = re.search(stats_pattern, result['stdout'])
            
            rtt_pattern = r"rtt min/avg/max/mdev = ([d.]+)/([d.]+)/([d.]+)/([d.]+) ms"
            rtt_match = re.search(rtt_pattern, result['stdout'])
            
            if stats_match and rtt_match:
                return {
                    'success': True,
                    'transmitted': int(stats_match.group(1)),
                    'received': int(stats_match.group(2)),
                    'packet_loss': int(stats_match.group(3)),
                    'time_ms': int(stats_match.group(4)),
                    'rtt': {
                        'min': float(rtt_match.group(1)),
                        'avg': float(rtt_match.group(2)),
                        'max': float(rtt_match.group(3)),
                        'mdev': float(rtt_match.group(4))
                    },
                    'raw_output': result['stdout']
                }
        
        return {
            'success': False,
            'error': result['stderr'] if result['stderr'] else "Ping failed",
            'raw_output': result['stdout']
        }
    
    def traceroute(self, target, max_hops=30):
        """
        执行traceroute
        
        @param target: 目标主机
        @param max_hops: 最大跳数
        @return: traceroute结果
        """
        command = f"traceroute -m {max_hops} {target}"
        result = self._run_command(command)
        
        if result['returncode'] == 0:
            # 解析traceroute结果
            hops = []
            lines = result['stdout'].strip().split('
')
            
            # 跳过第一行(标题行)
            for line in lines[1:]:
                # 提取跳数和信息
                hop_match = re.match(r'^s*(d+)s+(.+)$', line)
                if hop_match:
                    hop_num = int(hop_match.group(1))
                    hop_info = hop_match.group(2).strip()
                    
                    # 解析IP地址和时间
                    ip_pattern = r'(d+.d+.d+.d+)'
                    ips = re.findall(ip_pattern, hop_info)
                    
                    time_pattern = r'([d.]+) ms'
                    times = re.findall(time_pattern, hop_info)
                    
                    hop_data = {
                        'hop': hop_num,
                        'ips': ips,
                        'times': [float(t) for t in times] if times else []
                    }
                    
                    # 尝试获取主机名
                    if ips:
                        try:
                            hostname = socket.gethostbyaddr(ips[0])[0]
                            hop_data['hostname'] = hostname
                        except:
                            hop_data['hostname'] = None
                    
                    hops.append(hop_data)
            
            return {
                'success': True,
                'hops': hops,
                'raw_output': result['stdout']
            }
        
        return {
            'success': False,
            'error': result['stderr'] if result['stderr'] else "Traceroute failed",
            'raw_output': result['stdout']
        }
    
    def port_scan(self, target, ports):
        """
        检查目标主机上的端口是否开放
        
        @param target: 目标主机
        @param ports: 要扫描的端口列表
        @return: 端口扫描结果
        """
        results = {}
        
        for port in ports:
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.settimeout(1)
                result = sock.connect_ex((target, int(port)))
                if result == 0:
                    # 尝试获取服务名称
                    try:
                        service = socket.getservbyport(int(port))
                    except:
                        service = "unknown"
                    
                    results[port] = {
                        'open': True,
                        'service': service
                    }
                else:
                    results[port] = {
                        'open': False
                    }
                sock.close()
            except Exception as e:
                results[port] = {
                    'open': False,
                    'error': str(e)
                }
        
        return results
    
    def check_dns(self, domain, dns_server=None):
        """
        执行DNS查询
        
        @param domain: 要查询的域名
        @param dns_server: 可选的DNS服务器
        @return: DNS查询结果
        """
        command = f"dig {domain}"
        if dns_server:
            command += f" @{dns_server}"
        
        result = self._run_command(command)
        
        if result['returncode'] == 0:
            # 解析dig结果
            answer_section = False
            answers = []
            
            for line in result['stdout'].split('
'):
                if line.startswith(';; ANSWER SECTION:'):
                    answer_section = True
                    continue
                
                if answer_section and line.strip() and not line.startswith(';'):
                    fields = line.strip().split()
                    if len(fields) >= 5:
                        answers.append({
                            'name': fields[0],
                            'ttl': fields[1],
                            'class': fields[2],
                            'type': fields[3],
                            'data': ' '.join(fields[4:])
                        })
                
                if answer_section and line.startswith(';'):
                    answer_section = False
            
            # 提取查询时间
            query_time_match = re.search(r'Query time: (d+) msec', result['stdout'])
            query_time = int(query_time_match.group(1)) if query_time_match else None
            
            return {
                'success': True,
                'answers': answers,
                'query_time': query_time,
                'raw_output': result['stdout']
            }
        
        return {
            'success': False,
            'error': result['stderr'] if result['stderr'] else "DNS query failed",
            'raw_output': result['stdout']
        }
    
    def network_stats(self):
        """
        获取网络统计信息
        
        @return: 网络接口统计信息
        """
        # 读取/proc/net/dev文件获取接口统计信息
        stats = {}
        
        try:
            with open('/proc/net/dev', 'r') as f:
                lines = f.readlines()
                
                # 跳过前两行(标题行)
                for line in lines[2:]:
                    parts = line.strip().split(':')
                    if len(parts) >= 2:
                        interface = parts[0].strip()
                        values = parts[1].strip().split()
                        
                        if len(values) >= 16:
                            stats[interface] = {
                                'rx': {
                                    'bytes': int(values[0]),
                                    'packets': int(values[1]),
                                    'errors': int(values[2]),
                                    'drop': int(values[3])
                                },
                                'tx': {
                                    'bytes': int(values[8]),
                                    'packets': int(values[9]),
                                    'errors': int(values[10]),
                                    'drop': int(values[11])
                                }
                            }
            
            # 获取接口配置信息
            interfaces_info = {}
            for interface in stats:
                # 获取IP地址
                ip_result = self._run_command(f"ip addr show {interface}")
                
                if ip_result['returncode'] == 0:
                    # 提取IP地址
                    ip_addresses = []
                    ip_pattern = r'inets+(d+.d+.d+.d+)/(d+)'
                    ip_matches = re.finditer(ip_pattern, ip_result['stdout'])
                    
                    for match in ip_matches:
                        ip_addresses.append({
                            'address': match.group(1),
                            'prefix': match.group(2)
                        })
                    
                    # 提取MAC地址
                    mac_pattern = r'link/ethers+([0-9a-f:]+)'
                    mac_match = re.search(mac_pattern, ip_result['stdout'])
                    mac_address = mac_match.group(1) if mac_match else None
                    
                    interfaces_info[interface] = {
                        'ip_addresses': ip_addresses,
                        'mac_address': mac_address
                    }
            
            return {
                'success': True,
                'stats': stats,
                'interfaces': interfaces_info
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }

创建初始化文件,使Func能够发现模块:

bash

sudo nano /usr/lib/python3/dist-packages/func/minion/modules/custom/__init__.py

添加内容:

python

from func.minion.modules.custom.netdiag import NetDiag

重启Func服务以加载新模块:

bash

sudo systemctl restart funcd

部署自定义模块

要在所有Minion上部署自定义模块,可以使用Func本身:

首先,将模块文件复制到所有Minion:

bash

func '*' copy.from_master '/usr/lib/python3/dist-packages/func/minion/modules/custom/netdiag.py' '/usr/lib/python3/dist-packages/func/minion/modules/custom/netdiag.py'

func '*' copy.from_master '/usr/lib/python3/dist-packages/func/minion/modules/custom/__init__.py' '/usr/lib/python3/dist-packages/func/minion/modules/custom/__init__.py'

确保目录存在:

bash

func '*' command.run 'mkdir -p /usr/lib/python3/dist-packages/func/minion/modules/custom'

重启所有Minion上的Func服务:

bash

func '*' service.restart 'funcd'

使用自定义模块

现在,自定义模块已经部署完成,可以开始使用:

bash

# 测试模块是否可用
func '*' func.list_modules | grep netdiag

# 查看模块方法
func '*' func.list_methods 'netdiag'

# 执行ping测试
func '*' netdiag.ping_test 'google.com'

# 执行traceroute
func 'web1.example.com' netdiag.traceroute 'google.com'

# 扫描端口
func 'db1.example.com' netdiag.port_scan 'localhost' '[22, 80, 443, 3306]'

# 检查DNS
func '*' netdiag.check_dns 'example.com' '8.8.8.8'

# 获取网络统计信息
func '*' netdiag.network_stats

Python API中使用自定义模块

在Python脚本中也可以使用自定义模块:

python

from func.overlord.client import Client

# 创建Client实例
fc = Client("*")

# 使用自定义模块
results = fc.netdiag.ping_test("google.com", 5)
for host, result in results.items():
    if result['success']:
        print(f"{host}: Ping successful, avg={result['rtt']['avg']}ms, loss={result['packet_loss']}%")
    else:
        print(f"{host}: Ping failed - {result.get('error', 'Unknown error')}")

# 使用端口扫描功能
target_host = "example.com"
ports = [22, 80, 443, 8080, 8443]
results = fc.netdiag.port_scan(target_host, ports)

for host, scan_results in results.items():
    print(f"
Port scan from {host} to {target_host}:")
    for port, port_info in scan_results.items():
        if port_info['open']:
            service = port_info.get('service', 'unknown')
            print(f"  Port {port}: OPEN ({service})")
        else:
            print(f"  Port {port}: CLOSED")

高级自定义模块示例

下面是一个更复杂的自定义模块示例,用于监控和管理Web应用:

python

from func.minion.modules.func_module import FuncModule
import subprocess
import json
import requests
import re
import os
import time
import sqlite3
from datetime import datetime

class WebAppMonitor(FuncModule):
    
    version = "1.0"
    description = "Web application monitoring and management module"
    
    def __init__(self):
        super(WebAppMonitor, self).__init__()
        
        # 初始化存储目录
        self.data_dir = "/var/lib/func/webapp_monitor"
        if not os.path.exists(self.data_dir):
            os.makedirs(self.data_dir)
        
        # 初始化数据库
        self.db_file = os.path.join(self.data_dir, "metrics.db")
        self._init_db()
    
    def _init_db(self):
        """初始化SQLite数据库"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        # 创建指标表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS metrics (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp TEXT,
            app_name TEXT,
            endpoint TEXT,
            response_time REAL,
            status_code INTEGER,
            response_size INTEGER
        )
        ''')
        
        # 创建事件表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS events (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            timestamp TEXT,
            app_name TEXT,
            event_type TEXT,
            description TEXT
        )
        ''')
        
        conn.commit()
        conn.close()
    
    def check_endpoint(self, url, method="GET", headers=None, data=None, timeout=10):
        """
        检查Web端点并记录指标
        
        @param url: 要检查的URL
        @param method: HTTP方法
        @param headers: 请求头
        @param data: 请求数据
        @param timeout: 超时时间(秒)
        @return: 检查结果
        """
        headers = headers or {}
        data = data or {}
        
        try:
            # 提取应用名称
            app_name = re.search(r'https?://([^/]+)', url).group(1)
            
            # 记录开始时间
            start_time = time.time()
            
            # 发送请求
            response = requests.request(
                method,
                url,
                headers=headers,
                json=data if method in ["POST", "PUT", "PATCH"] else None,
                timeout=timeout
            )
            
            # 计算响应时间
            response_time = time.time() - start_time
            
            # 准备结果
            result = {
                'success': True,
                'url': url,
                'status_code': response.status_code,
                'response_time': response_time,
                'response_size': len(response.content),
                'headers': dict(response.headers)
            }
            
            # 存储指标
            self._store_metric(app_name, url, response_time, response.status_code, len(response.content))
            
            # 检查是否有错误
            if response.status_code >= 400:
                result['success'] = False
                result['error'] = f"HTTP error: {response.status_code}"
                
                # 记录错误事件
                self._store_event(app_name, "ERROR", f"HTTP error {response.status_code} for {url}")
            
            return result
            
        except requests.exceptions.Timeout:
            self._store_event(app_name, "ERROR", f"Timeout connecting to {url}")
            return {
                'success': False,
                'url': url,
                'error': "Request timed out"
            }
        except requests.exceptions.ConnectionError:
            self._store_event(app_name, "ERROR", f"Connection error for {url}")
            return {
                'success': False,
                'url': url,
                'error': "Connection error"
            }
        except Exception as e:
            return {
                'success': False,
                'url': url,
                'error': str(e)
            }
    
    def _store_metric(self, app_name, endpoint, response_time, status_code, response_size):
        """存储指标到数据库"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        cursor.execute('''
        INSERT INTO metrics (timestamp, app_name, endpoint, response_time, status_code, response_size)
        VALUES (?, ?, ?, ?, ?, ?)
        ''', (
            datetime.now().isoformat(),
            app_name,
            endpoint,
            response_time,
            status_code,
            response_size
        ))
        
        conn.commit()
        conn.close()
    
    def _store_event(self, app_name, event_type, description):
        """存储事件到数据库"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        cursor.execute('''
        INSERT INTO events (timestamp, app_name, event_type, description)
        VALUES (?, ?, ?, ?)
        ''', (
            datetime.now().isoformat(),
            app_name,
            event_type,
            description
        ))
        
        conn.commit()
        conn.close()
    
    def monitor_endpoints(self, endpoints):
        """
        监控多个端点
        
        @param endpoints: 端点配置列表
        @return: 监控结果
        """
        results = {}
        
        for endpoint in endpoints:
            url = endpoint['url']
            method = endpoint.get('method', 'GET')
            headers = endpoint.get('headers', {})
            data = endpoint.get('data', {})
            timeout = endpoint.get('timeout', 10)
            
            results[url] = self.check_endpoint(url, method, headers, data, timeout)
        
        return results
    
    def get_metrics(self, app_name=None, hours=24):
        """
        获取应用指标
        
        @param app_name: 应用名称(可选)
        @param hours: 过去的小时数
        @return: 指标数据
        """
        conn = sqlite3.connect(self.db_file)
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()
        
        query = '''
        SELECT * FROM metrics
        WHERE timestamp >= datetime('now', '-' || ? || ' hours')
        '''
        
        params = [hours]
        
        if app_name:
            query += " AND app_name = ?"
            params.append(app_name)
        
        query += " ORDER BY timestamp DESC"
        
        cursor.execute(query, params)
        rows = cursor.fetchall()
        
        # 转换为字典列表
        metrics = []
        for row in rows:
            metrics.append(dict(row))
        
        conn.close()
        
        return metrics
    
    def get_events(self, app_name=None, event_type=None, hours=24):
        """
        获取事件
        
        @param app_name: 应用名称(可选)
        @param event_type: 事件类型(可选)
        @param hours: 过去的小时数
        @return: 事件数据
        """
        conn = sqlite3.connect(self.db_file)
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()
        
        query = '''
        SELECT * FROM events
        WHERE timestamp >= datetime('now', '-' || ? || ' hours')
        '''
        
        params = [hours]
        
        if app_name:
            query += " AND app_name = ?"
            params.append(app_name)
        
        if event_type:
            query += " AND event_type = ?"
            params.append(event_type)
        
        query += " ORDER BY timestamp DESC"
        
        cursor.execute(query, params)
        rows = cursor.fetchall()
        
        # 转换为字典列表
        events = []
        for row in rows:
            events.append(dict(row))
        
        conn.close()
        
        return events
    
    def analyze_performance(self, app_name=None, hours=24):
        """
        分析应用性能
        
        @param app_name: 应用名称(可选)
        @param hours: 分析时间范围(小时)
        @return: 性能分析结果
        """
        metrics = self.get_metrics(app_name, hours)
        
        if not metrics:
            return {
                'success': False,
                'error': "No metrics found for the specified criteria"
            }
        
        # 按应用名称分组指标
        apps = {}
        for metric in metrics:
            app = metric['app_name']
            if app not in apps:
                apps[app] = {
                    'response_times': [],
                    'status_codes': {},
                    'endpoints': {},
                    'total_requests': 0
                }
            
            # 添加响应时间
            apps[app]['response_times'].append(metric['response_time'])
            
            # 统计状态码
            status_code = str(metric['status_code'])
            apps[app]['status_codes'][status_code] = apps[app]['status_codes'].get(status_code, 0) + 1
            
            # 统计端点
            endpoint = metric['endpoint']
            if endpoint not in apps[app]['endpoints']:
                apps[app]['endpoints'][endpoint] = {
                    'response_times': [],
                    'status_codes': {},
                    'total_requests': 0
                }
            
            apps[app]['endpoints'][endpoint]['response_times'].append(metric['response_time'])
            apps[app]['endpoints'][endpoint]['status_codes'][status_code] = apps[app]['endpoints'][endpoint]['status_codes'].get(status_code, 0) + 1
            apps[app]['endpoints'][endpoint]['total_requests'] += 1
            
            # 增加总请求数
            apps[app]['total_requests'] += 1
        
        # 计算统计信息
        analysis = {}
        for app, data in apps.items():
            response_times = data['response_times']
            
            # 跳过没有足够数据的应用
            if len(response_times) < 2:
                continue
            
            # 计算响应时间统计
            avg_response_time = sum(response_times) / len(response_times)
            response_times.sort()
            median_response_time = response_times[len(response_times) // 2]
            p95_response_time = response_times[int(len(response_times) * 0.95)]
            
            # 计算错误率
            error_count = sum(data['status_codes'].get(code, 0) for code in data['status_codes'] if int(code) >= 400)
            error_rate = error_count / data['total_requests'] if data['total_requests'] > 0 else 0
            
            # 端点分析
            endpoints_analysis = {}
            for endpoint, endpoint_data in data['endpoints'].items():
                endpoint_response_times = endpoint_data['response_times']
                
                if len(endpoint_response_times) < 2:
                    continue
                
                endpoint_response_times.sort()
                endpoints_analysis[endpoint] = {
                    'avg_response_time': sum(endpoint_response_times) / len(endpoint_response_times),
                    'median_response_time': endpoint_response_times[len(endpoint_response_times) // 2],
                    'p95_response_time': endpoint_response_times[int(len(endpoint_response_times) * 0.95)],
                    'min_response_time': endpoint_response_times[0],
                    'max_response_time': endpoint_response_times[-1],
                    'request_count': endpoint_data['total_requests'],
                    'error_rate': sum(endpoint_data['status_codes'].get(code, 0) for code in endpoint_data['status_codes'] if int(code) >= 400) / endpoint_data['total_requests'] if endpoint_data['total_requests'] > 0 else 0
                }
            
            # 保存应用分析结果
            analysis[app] = {
                'avg_response_time': avg_response_time,
                'median_response_time': median_response_time,
                'p95_response_time': p95_response_time,
                'min_response_time': response_times[0],
                'max_response_time': response_times[-1],
                'request_count': data['total_requests'],
                'error_rate': error_rate,
                'status_codes': data['status_codes'],
                'endpoints': endpoints_analysis
            }
        
        return {
            'success': True,
            'analysis': analysis
        }

11.4 非 Python API 接口支持

Func除了提供Python API外,还支持其他接口方式,便于与不同语言和平台集成。

XMLRPC接口

Func的通信基于XMLRPC协议,这意味着任何支持XMLRPC的语言都可以与Func通信:

PHP示例

php

<?php
// 连接到Func服务器
$client = new xmlrpc_client("https://func-master.example.com:51234/");
$client->setSSLVerifyPeer(true);
$client->setSSLVerifyHost(2);

// 设置证书
$client->setSSLCertificate("/etc/pki/certmaster/certs/client.pem");
$client->setSSLKey("/etc/pki/certmaster/certs/client.pem");
$client->setSSLCAFile("/etc/pki/certmaster/ca/certmaster.crt");

// 创建请求
$request = new xmlrpcmsg("system.listMethods", array());

// 发送请求
$response = $client->send($request);

// 检查结果
if ($response->faultCode()) {
    echo "Error: " . $response->faultString() . "
";
} else {
    $methods = $response->value();
    echo "Available methods:
";
    foreach ($methods as $method) {
        echo "- " . $method->scalarval() . "
";
    }
}
?>
Ruby示例

ruby

require 'xmlrpc/client'

# 禁用证书验证(仅用于测试)
XMLRPC::Client.prepend(Module.new do
  def set_auth(user, password)
    @http.verify_mode = OpenSSL::SSL::VERIFY_NONE
    super
  end
end)

# 连接到Func服务器
client = XMLRPC::Client.new2("https://func-master.example.com:51234/")

# 设置证书
client.cert = File.read("/etc/pki/certmaster/certs/client.pem")
client.key = File.read("/etc/pki/certmaster/certs/client.pem")
client.ca_file = "/etc/pki/certmaster/ca/certmaster.crt"

# 调用方法
begin
  result = client.call("system.listMethods")
  puts "Available methods:"
  result.each do |method|
    puts "- #{method}"
  end
rescue XMLRPC::FaultException => e
  puts "Error: #{e.faultCode} - #{e.faultString}"
rescue => e
  puts "Error: #{e.message}"
end
Java示例

java

import org.apache.xmlrpc.client.XmlRpcClient;
import org.apache.xmlrpc.client.XmlRpcClientConfigImpl;
import java.net.URL;
import java.security.cert.X509Certificate;
import javax.net.ssl.*;
import java.io.File;
import java.io.FileInputStream;
import java.security.KeyStore;

public class FuncClient {
    public static void main(String[] args) {
        try {
            // 设置SSL上下文
            KeyStore keyStore = KeyStore.getInstance("JKS");
            keyStore.load(new FileInputStream("/path/to/keystore.jks"), "password".toCharArray());
            
            KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
            kmf.init(keyStore, "password".toCharArray());
            
            KeyStore trustStore = KeyStore.getInstance("JKS");
            trustStore.load(new FileInputStream("/path/to/truststore.jks"), "password".toCharArray());
            
            TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            tmf.init(trustStore);
            
            SSLContext sslContext = SSLContext.getInstance("TLS");
            sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
            SSLContext.setDefault(sslContext);
            
            // 设置XMLRPC客户端
            XmlRpcClientConfigImpl config = new XmlRpcClientConfigImpl();
            config.setServerURL(new URL("https://func-master.example.com:51234/"));
            config.setEnabledForExtensions(true);
            config.setConnectionTimeout(60 * 1000);
            config.setReplyTimeout(60 * 1000);
            
            XmlRpcClient client = new XmlRpcClient();
            client.setConfig(config);
            
            // 调用方法
            Object[] params = new Object[] {};
            Object[] result = (Object[]) client.execute("system.listMethods", params);
            
            System.out.println("Available methods:");
            for (Object method : result) {
                System.out.println("- " + method.toString());
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

RESTful API(使用Func Overlord)

Func Overlord提供了RESTful API接口,可以通过HTTP请求与Func交互:

bash

# 安装Func Overlord
sudo yum install func-overlord  # CentOS/RHEL
sudo apt-get install func-overlord  # Ubuntu/Debian

配置Overlord:

bash

sudo nano /etc/func/overlord.conf

ini

[overlord]
listen_addr = 0.0.0.0
listen_port = 8080
ssl_cert = /etc/pki/func/certs/web.pem
ssl_key = /etc/pki/func/certs/web.pem
debug = false

启动Overlord服务:

bash

sudo systemctl enable func-overlord
sudo systemctl start func-overlord

使用RESTful API:

bash

# 获取可用模块列表
curl -k -X GET https://func-master:8080/api/modules

# 执行命令
curl -k -X POST https://func-master:8080/api/run 
  -H "Content-Type: application/json" 
  -d '{
    "target": "*",
    "module": "command",
    "method": "run",
    "args": ["uptime"]
  }'

# 获取任务结果
curl -k -X GET https://func-master:8080/api/results/12345

11.5 Func 的 Facts 支持

Func支持Facts系统,用于收集和管理主机信息,类似于Ansible的Facts或SaltStack的Grains。Facts可以用于目标选择、条件执行和配置管理。

内置Facts

Func自动收集一系列系统信息作为Facts:

bash

# 查看所有Facts
func '*' facts.get_all

# 查看特定Fact
func '*' facts.get 'system.os'

主要内置Facts包括:

system.hostname: 主机名
system.os: 操作系统名称
system.os_version: 操作系统版本
system.kernel: 内核版本
system.arch: 系统架构
hardware.cpu_model: CPU型号
hardware.cpu_count: CPU数量
hardware.memory_total: 总内存
network.interfaces: 网络接口
network.ip_addresses: IP地址

自定义Facts

可以创建自定义Facts扩展Func的信息收集能力:

在Minion上创建Facts目录:

bash

sudo mkdir -p /etc/func/facts.d

创建自定义Facts脚本:

bash

sudo nano /etc/func/facts.d/app_info.fact

python

#!/usr/bin/env python3
"""
自定义Fact脚本:收集应用信息
"""
import json
import os
import subprocess

# 初始化结果字典
facts = {
    'applications': {}
}

# 检查Java版本
try:
    java_output = subprocess.check_output(['java', '-version'], stderr=subprocess.STDOUT, universal_newlines=True)
    if 'version' in java_output:
        version_line = java_output.split('
')[0]
        version = version_line.split('"')[1] if '"' in version_line else 'unknown'
        facts['applications']['java'] = {
            'installed': True,
            'version': version
        }
except:
    facts['applications']['java'] = {
        'installed': False
    }

# 检查MySQL版本
try:
    mysql_output = subprocess.check_output(['mysql', '--version'], universal_newlines=True)
    if 'Distrib' in mysql_output:
        version = mysql_output.split('Distrib')[1].split(',')[0].strip()
        facts['applications']['mysql'] = {
            'installed': True,
            'version': version
        }
except:
    facts['applications']['mysql'] = {
        'installed': False
    }

# 检查Nginx版本
try:
    nginx_output = subprocess.check_output(['nginx', '-v'], stderr=subprocess.STDOUT, universal_newlines=True)
    if 'nginx version' in nginx_output:
        version = nginx_output.split('nginx version:')[1].strip()
        facts['applications']['nginx'] = {
            'installed': True,
            'version': version
        }
except:
    facts['applications']['nginx'] = {
        'installed': False
    }

# 检查自定义应用配置
app_config_path = '/etc/myapp/config.json'
if os.path.exists(app_config_path):
    try:
        with open(app_config_path, 'r') as f:
            app_config = json.load(f)
            facts['applications']['myapp'] = {
                'installed': True,
                'version': app_config.get('version', 'unknown'),
                'environment': app_config.get('environment', 'unknown'),
                'config_path': app_config_path
            }
    except:
        facts['applications']['myapp'] = {
            'installed': True,
            'error': 'Failed to parse config'
        }

# 输出JSON格式的Facts
print(json.dumps(facts))

设置执行权限:

bash

sudo chmod +x /etc/func/facts.d/app_info.fact

刷新Facts:

bash

func '*' facts.refresh

使用自定义Facts:

bash

# 获取应用信息
func '*' facts.get 'applications'

# 基于Facts进行目标选择
func '@facts.applications.nginx.installed=true' command.run 'nginx -t'

# 条件执行
func '*' command.run 'if [ "{
           { facts.applications.java.installed }}" = "true" ]; then java -version; else echo "Java not installed"; fi'

在Python API中使用Facts

python

from func.overlord.client import Client

# 创建Client实例
fc = Client("*")

# 获取所有Facts
facts = fc.facts.get_all()

# 处理Facts数据
for host, host_facts in facts.items():
    print(f"Host: {host}")
    
    # 检查操作系统
    os_name = host_facts.get('system', {}).get('os', 'unknown')
    os_version = host_facts.get('system', {}).get('os_version', 'unknown')
    print(f"  OS: {os_name} {os_version}")
    
    # 检查应用
    apps = host_facts.get('applications', {})
    print("  Installed applications:")
    for app_name, app_info in apps.items():
        if app_info.get('installed', False):
            version = app_info.get('version', 'unknown')
            print(f"    - {app_name} (version: {version})")
    
    # 基于Facts执行操作
    if apps.get('nginx', {}).get('installed', False):
        print("  Checking Nginx configuration...")
        result = fc.command.run("nginx -t", [host])
        print(f"    Result: {result[host]}")

通过Func的Facts系统,可以收集丰富的系统和应用信息,并基于这些信息进行智能的目标选择和条件执行,实现更加精确和高效的系统管理。

Func的灵活架构、可扩展的模块系统和多种接口支持,使其成为现代IT基础设施管理的强大工具。无论是传统的物理服务器、虚拟化环境还是网络设备,Func都能提供统一的管理接口,简化运维工作,提高自动化水平。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
ZETA_Alphar的头像 - 宋马
评论 抢沙发

请登录后发表评论

    暂无评论内容