统一网络控制器 Func 详解
随着IT基础设施的规模和复杂性不断增长,网络设备的集中管理和自动化配置变得越来越重要。Func (Federated Universal Conductor) 是一个开源的统一网络控制器,它提供了强大的远程执行框架,可以跨多种网络设备和服务器进行集中化管理。本章将详细介绍Func的架构、安装配置和核心功能,帮助读者掌握这一强大的网络自动化工具。
11.1 Func 的安装
Func是一个基于Python的开源项目,它支持多种操作系统平台,并提供了灵活的安装方式。在安装Func之前,让我们先了解其架构和组件。
11.1.1 业务环境说明
Func采用主从架构(Master-Minion),包含以下主要组件:
Func Master:中央控制服务器,负责发送命令并接收结果
Func Minion:运行在被管理设备上的客户端程序,负责执行来自Master的命令
Certmaster:证书管理组件,负责SSL/TLS证书的签发和验证
Overlord:Web界面组件(可选),提供图形化管理界面
Func的主要特点包括:
安全通信:使用SSL/TLS加密所有通信
模块化设计:通过模块扩展功能,支持自定义模块
多设备支持:可管理各种网络设备和服务器
API友好:提供Python API,便于与其他系统集成
分组管理:支持设备分组,便于批量操作
Func的通信机制基于XML-RPC,使用SSL/TLS确保安全性。Minion启动后会监听端口(默认为51234),等待来自Master的命令。
11.1.2 安装 Func
Func可以通过包管理器、pip或从源码安装。以下是各种安装方法的详细说明。
依赖项
首先,确保系统满足以下依赖要求:
Python 2.7或Python 3.6+(推荐Python 3.8+)
OpenSSL
PyYAML
pyOpenSSL
Python setuptools
使用包管理器安装
CentOS/RHEL 7/8:
首先添加EPEL仓库:
bash
# CentOS/RHEL 7
sudo yum install epel-release
# CentOS/RHEL 8
sudo dnf install epel-release
然后安装Func:
bash
# CentOS/RHEL 7
sudo yum install func func-certmaster
# CentOS/RHEL 8
sudo dnf install func func-certmaster
Fedora:
bash
sudo dnf install func func-certmaster
Ubuntu/Debian:
Ubuntu和Debian的官方仓库可能不包含最新版本的Func,可以通过PPA仓库或从源码安装。
使用pip安装
bash
# 安装Func
pip install func-minion
# 安装Certmaster
pip install certmaster
从源码安装
从源码安装可以获得最新的功能和修复:
bash
# 克隆代码库
git clone https://github.com/func/func.git
cd func
# 安装
python setup.py install
对于Certmaster:
bash
git clone https://github.com/func/certmaster.git
cd certmaster
python setup.py install
Docker安装
也可以使用Docker容器运行Func:
bash
# 拉取官方镜像
docker pull funcproject/func-master
docker pull funcproject/func-minion
# 运行Master容器
docker run -d --name func-master -p 51235:51235 -p 51234:51234 funcproject/func-master
# 运行Minion容器(替换YOUR_MINION_NAME和MASTER_IP)
docker run -d --name func-minion-1 -e MINION_NAME=YOUR_MINION_NAME -e MASTER_IP=MASTER_IP funcproject/func-minion
配置Certmaster
Certmaster是Func的证书管理组件,负责生成和分发SSL/TLS证书。安装后,需要进行配置:
编辑Certmaster配置文件:
bash
sudo nano /etc/certmaster/certmaster.conf
配置示例:
ini
[main]
listen_addr = 0.0.0.0
listen_port = 51235
cadir = /etc/pki/certmaster/ca
certroot = /var/lib/certmaster/certmaster/certs
csrroot = /var/lib/certmaster/certmaster/csrs
cert_dir = /etc/pki/certmaster
autosign = true
启动Certmaster服务:
bash
# 对于systemd系统
sudo systemctl enable certmaster
sudo systemctl start certmaster
# 对于SysV init系统
sudo service certmaster start
sudo chkconfig certmaster on
验证Certmaster运行状态:
bash
sudo systemctl status certmaster
配置Func Master
配置Func Master以管理网络设备和服务器:
编辑Func Master配置文件:
bash
sudo nano /etc/func/minion.conf
配置示例:
ini
[main]
certmaster = localhost
cert_dir = /etc/pki/certmaster
acl_dir = /etc/func/acls
listen_addr = 0.0.0.0
listen_port = 51234
method = sha1
启动Func Master服务:
bash
# 对于systemd系统
sudo systemctl enable funcd
sudo systemctl start funcd
# 对于SysV init系统
sudo service funcd start
sudo chkconfig funcd on
验证Func Master运行状态:
bash
sudo systemctl status funcd
配置Func Minion
在所有需要管理的设备上安装和配置Func Minion:
编辑Minion配置文件:
bash
sudo nano /etc/func/minion.conf
配置示例:
ini
[main]
certmaster = MASTER_IP_ADDRESS
cert_dir = /etc/pki/certmaster
acl_dir = /etc/func/acls
listen_addr = 0.0.0.0
listen_port = 51234
method = sha1
# 设置Minion ID(可选,默认使用主机名)
minion_id = device1.example.com
启动Func Minion服务:
bash
# 对于systemd系统
sudo systemctl enable funcd
sudo systemctl start funcd
# 对于SysV init系统
sudo service funcd start
sudo chkconfig funcd on
证书签发和验证
Minion首次启动时会向Certmaster请求证书:
在Minion上生成证书签名请求(CSR)并发送到Certmaster:
bash
sudo certmaster-request
在Master上查看待处理的证书请求:
bash
sudo certmaster-ca --list-pending
在Master上签发证书(如果未启用自动签名):
bash
sudo certmaster-ca --sign hostname.example.com
验证Minion连接:
bash
func 'hostname.example.com' ping
如果返回”1″,则表示连接成功。
高级配置选项
ACL(访问控制列表)
Func支持基于ACL的访问控制,可以限制特定用户或组对特定模块和方法的访问:
bash
sudo mkdir -p /etc/func/acls
sudo nano /etc/func/acls/example.acl
ACL文件示例:
yaml
# 允许admin组访问所有模块和方法
admin:
- .*: .*
# 限制operator组只能访问特定模块和方法
operator:
- system.list_modules: .*
- command.run: .*
- hardware.info: .*
配置Overlord Web界面(可选)
Overlord提供了一个Web界面,方便通过浏览器管理Func:
bash
# 安装Overlord
sudo yum install func-overlord # CentOS/RHEL
sudo dnf install func-overlord # Fedora
# 配置Overlord
sudo nano /etc/func/overlord.conf
# 启动Overlord服务
sudo systemctl enable func-overlord
sudo systemctl start func-overlord
默认情况下,Overlord监听在http://localhost:8080,可以通过浏览器访问。
11.2 Func 常用模块及 API
Func提供了丰富的内置模块,涵盖了网络管理、系统配置和应用部署等多个方面。下面介绍Func的常用模块和API用法。
Python API基础
Func的Python API允许从Python脚本或交互式解释器中调用Func功能:
python
from func.overlord.client import Client
# 创建Client实例
fc = Client("*.example.com")
# 调用ping方法检查连接
results = fc.ping()
for host, result in results.items():
print(f"{host}: {'Online' if result else 'Offline'}")
# 执行shell命令
results = fc.command.run("uptime")
for host, result in results.items():
print(f"{host}: {result}")
命令行工具
除了Python API,Func还提供了命令行工具:
bash
# 基本语法
func [target] module.method [arguments]
# 例如:
func '*' ping
func 'web*' command.run 'uptime'
11.2.1 选择目标主机
Func提供多种方式来选择目标主机:
使用通配符
bash
# 匹配所有Minion
func '*' ping
# 匹配特定前缀的主机
func 'web*' ping
# 匹配特定后缀的主机
func '*.example.com' ping
使用主机列表
bash
# 使用逗号分隔的主机列表
func 'web1.example.com,web2.example.com,db1.example.com' ping
# 在Python API中使用列表
fc = Client(["web1.example.com", "web2.example.com"])
使用组
Func支持将主机组织成组,便于管理:
bash
# 定义组(在Func Master上)
echo "webservers: web1.example.com web2.example.com" > /etc/func/groups/webservers.group
echo "dbservers: db1.example.com db2.example.com" > /etc/func/groups/dbservers.group
# 使用组名
func '@webservers' ping
# 组合使用
func '@webservers,@dbservers' ping
在Python中使用组
python
from func.overlord.client import Client
# 使用组名
fc = Client("@webservers")
results = fc.ping()
# 组合使用
fc = Client("@webservers,@dbservers")
results = fc.ping()
11.2.2 常用模块详解
Func提供了多种内置模块,用于执行不同类型的操作:
1. command模块
用于执行Shell命令:
bash
# 执行单个命令
func '*' command.run 'ls -la /var/log'
# 执行包含管道的命令
func '*' command.run 'ps aux | grep httpd'
# 执行命令并获取状态码
func '*' command.run_status 'service httpd status'
Python API示例:
python
fc = Client("*")
results = fc.command.run("df -h")
for host, result in results.items():
print(f"{host}:
{result}")
# 执行复杂命令
script = """
if [ -f /etc/redhat-release ]; then
echo "RHEL-based system"
cat /etc/redhat-release
else
echo "Non-RHEL system"
cat /etc/os-release
fi
"""
results = fc.command.run(script)
2. file模块
用于管理文件和目录:
bash
# 检查文件是否存在
func '*' file.exists '/etc/passwd'
# 读取文件内容
func 'web1.example.com' file.read '/etc/nginx/nginx.conf'
# 写入文件
func '*' file.write '/tmp/test.txt' 'Hello World'
# 获取文件stat信息
func '*' file.stat '/etc/passwd'
Python API示例:
python
fc = Client("*")
# 上传文件
with open("local_file.txt", "r") as f:
content = f.read()
results = fc.file.write("/tmp/remote_file.txt", content)
# 下载文件
results = fc.file.read("/etc/hostname")
for host, content in results.items():
with open(f"{host}_hostname.txt", "w") as f:
f.write(content)
3. network模块
用于网络配置和诊断:
bash
# 获取网络接口信息
func '*' network.interfaces
# 获取主机名
func '*' network.hostname
# 执行ping测试
func '*' network.ping 'google.com'
# 获取路由表
func '*' network.routes
Python API示例:
python
fc = Client("*")
# 获取所有主机的IP地址
results = fc.network.interfaces()
for host, interfaces in results.items():
print(f"Host: {host}")
for interface, data in interfaces.items():
if 'inet' in data:
print(f" {interface}: {data['inet']}")
4. service模块
用于管理系统服务:
bash
# 获取服务状态
func '*' service.status 'httpd'
# 启动服务
func '*' service.start 'nginx'
# 停止服务
func '*' service.stop 'mysql'
# 重启服务
func '*' service.restart 'postgresql'
Python API示例:
python
fc = Client("*")
# 检查并启动服务
results = fc.service.status("httpd")
for host, status in results.items():
if not status:
print(f"Starting httpd on {host}")
fc.service.start("httpd", [host])
5. yum/apt模块
用于包管理:
bash
# 检查包是否已安装
func '*' yum.check_update 'nginx'
# 安装软件包
func '*' yum.install 'httpd'
# 更新所有包
func '*' yum.update_all
对于Debian/Ubuntu系统:
bash
# 更新包列表
func '*' apt.update
# 安装软件包
func '*' apt.install 'nginx'
Python API示例:
python
fc = Client("*")
# 安装多个包
packages = ["nginx", "httpd", "redis"]
for package in packages:
results = fc.yum.install(package)
for host, result in results.items():
if result[0] == 0: # 检查返回码
print(f"Successfully installed {package} on {host}")
else:
print(f"Failed to install {package} on {host}: {result[1]}")
6. hardware模块
用于获取硬件信息:
bash
# 获取内存信息
func '*' hardware.mem
# 获取CPU信息
func '*' hardware.cpu
# 获取磁盘信息
func '*' hardware.disk
Python API示例:
python
fc = Client("*")
# 获取所有主机的内存信息
results = fc.hardware.mem()
for host, mem_info in results.items():
total_mb = mem_info["total"] // 1024
free_mb = mem_info["free"] // 1024
print(f"{host}: Total Memory: {total_mb}MB, Free: {free_mb}MB")
7. func模块
用于Func自身管理:
bash
# 列出可用模块
func '*' func.list_modules
# 列出模块方法
func '*' func.list_methods 'command'
# 获取方法文档
func '*' func.help 'command.run'
Python API示例:
python
fc = Client("*")
# 获取可用模块列表
results = fc.func.list_modules()
for host, modules in results.items():
print(f"Modules available on {host}:")
for module in sorted(modules):
print(f" - {module}")
8. cron模块
用于管理计划任务:
bash
# 列出cron作业
func '*' cron.list_jobs
# 添加cron作业
func '*' cron.add_job '0 2 * * *' 'root' '/usr/local/bin/backup.sh'
# 删除cron作业
func '*' cron.del_job '0 2 * * *' 'root' '/usr/local/bin/backup.sh'
Python API示例:
python
fc = Client("*")
# 添加多个计划任务
cron_jobs = [
{"minute": "0", "hour": "2", "user": "root", "command": "/usr/local/bin/backup.sh"},
{"minute": "*/5", "hour": "*", "user": "www", "command": "/usr/local/bin/check-web.sh"}
]
for job in cron_jobs:
fc.cron.add_job(job["minute"], job["hour"], "*", "*", "*", job["user"], job["command"])
9. user模块
用于用户管理:
bash
# 添加用户
func '*' user.add 'newuser'
# 设置密码
func '*' user.set_password 'newuser' 'password'
# 将用户添加到组
func '*' user.add_to_group 'newuser' 'wheel'
# 删除用户
func '*' user.delete 'olduser'
Python API示例:
python
fc = Client("*")
# 创建用户并配置
username = "appuser"
results = fc.user.add(username, home="/opt/app", shell="/bin/bash")
# 设置密码(生产环境不推荐直接在脚本中使用明文密码)
import hashlib
import crypt
import random
import string
# 生成随机盐值
salt = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(8))
# 创建加密密码
hashed_pw = crypt.crypt("secure_password", f"$6${salt}$")
fc.user.set_password(username, hashed_pw)
fc.user.add_to_group(username, "app")
10. copy模块
用于文件复制和同步:
bash
# 从Master复制文件到Minion
func '*' copy.from_master '/path/on/master/file.txt' '/path/on/minion/file.txt'
# 从Minion复制文件到Master
func 'minion.example.com' copy.to_master '/path/on/minion/file.txt' '/path/on/master/file.txt'
Python API示例:
python
fc = Client("*")
# 将配置文件分发到所有主机
source_files = [
"/etc/func/master_configs/nginx.conf",
"/etc/func/master_configs/app.conf"
]
dest_dirs = {
"nginx.conf": "/etc/nginx/nginx.conf",
"app.conf": "/etc/app/app.conf"
}
for source in source_files:
filename = os.path.basename(source)
if filename in dest_dirs:
fc.copy.from_master(source, dest_dirs[filename])
11.3 自定义 Func 模块
Func的一个强大特性是能够创建自定义模块,扩展其功能以满足特定需求。本节将介绍如何创建、部署和使用自定义Func模块。
模块基本结构
Func模块是Python类,继承自func.minion.modules.func_module.FuncModule。一个基本模块结构如下:
python
from func.minion.modules.func_module import FuncModule
class MyModule(FuncModule):
# 版本信息
version = "1.0"
# 模块描述
description = "My custom Func module"
# 方法实现
def hello_world(self):
"""
返回Hello World消息
"""
return "Hello World from %s" % self.minion_id
def echo(self, message):
"""
回显传入的消息
@param message: 要回显的消息
@return: 相同的消息
"""
return message
创建自定义模块
下面,我们将创建一个更实用的自定义模块,用于网络诊断:
在Minion上创建模块目录(如果不存在):
bash
sudo mkdir -p /usr/lib/python3/dist-packages/func/minion/modules/custom
创建网络诊断模块:
bash
sudo nano /usr/lib/python3/dist-packages/func/minion/modules/custom/netdiag.py
添加以下代码:
python
from func.minion.modules.func_module import FuncModule
import subprocess
import socket
import json
import re
import os
class NetDiag(FuncModule):
version = "1.0"
description = "Network diagnostics module"
def _run_command(self, command):
"""运行shell命令并返回输出"""
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
return {
'returncode': process.returncode,
'stdout': stdout.decode('utf-8', errors='replace'),
'stderr': stderr.decode('utf-8', errors='replace')
}
def ping_test(self, target, count=4):
"""
执行ping测试
@param target: 目标主机
@param count: ping包数量
@return: ping测试结果
"""
command = f"ping -c {count} {target}"
result = self._run_command(command)
# 解析ping结果
if result['returncode'] == 0:
# 提取ping统计信息
stats_pattern = r"(d+) packets transmitted, (d+) received, (d+)% packet loss, time (d+)ms"
stats_match = re.search(stats_pattern, result['stdout'])
rtt_pattern = r"rtt min/avg/max/mdev = ([d.]+)/([d.]+)/([d.]+)/([d.]+) ms"
rtt_match = re.search(rtt_pattern, result['stdout'])
if stats_match and rtt_match:
return {
'success': True,
'transmitted': int(stats_match.group(1)),
'received': int(stats_match.group(2)),
'packet_loss': int(stats_match.group(3)),
'time_ms': int(stats_match.group(4)),
'rtt': {
'min': float(rtt_match.group(1)),
'avg': float(rtt_match.group(2)),
'max': float(rtt_match.group(3)),
'mdev': float(rtt_match.group(4))
},
'raw_output': result['stdout']
}
return {
'success': False,
'error': result['stderr'] if result['stderr'] else "Ping failed",
'raw_output': result['stdout']
}
def traceroute(self, target, max_hops=30):
"""
执行traceroute
@param target: 目标主机
@param max_hops: 最大跳数
@return: traceroute结果
"""
command = f"traceroute -m {max_hops} {target}"
result = self._run_command(command)
if result['returncode'] == 0:
# 解析traceroute结果
hops = []
lines = result['stdout'].strip().split('
')
# 跳过第一行(标题行)
for line in lines[1:]:
# 提取跳数和信息
hop_match = re.match(r'^s*(d+)s+(.+)$', line)
if hop_match:
hop_num = int(hop_match.group(1))
hop_info = hop_match.group(2).strip()
# 解析IP地址和时间
ip_pattern = r'(d+.d+.d+.d+)'
ips = re.findall(ip_pattern, hop_info)
time_pattern = r'([d.]+) ms'
times = re.findall(time_pattern, hop_info)
hop_data = {
'hop': hop_num,
'ips': ips,
'times': [float(t) for t in times] if times else []
}
# 尝试获取主机名
if ips:
try:
hostname = socket.gethostbyaddr(ips[0])[0]
hop_data['hostname'] = hostname
except:
hop_data['hostname'] = None
hops.append(hop_data)
return {
'success': True,
'hops': hops,
'raw_output': result['stdout']
}
return {
'success': False,
'error': result['stderr'] if result['stderr'] else "Traceroute failed",
'raw_output': result['stdout']
}
def port_scan(self, target, ports):
"""
检查目标主机上的端口是否开放
@param target: 目标主机
@param ports: 要扫描的端口列表
@return: 端口扫描结果
"""
results = {}
for port in ports:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((target, int(port)))
if result == 0:
# 尝试获取服务名称
try:
service = socket.getservbyport(int(port))
except:
service = "unknown"
results[port] = {
'open': True,
'service': service
}
else:
results[port] = {
'open': False
}
sock.close()
except Exception as e:
results[port] = {
'open': False,
'error': str(e)
}
return results
def check_dns(self, domain, dns_server=None):
"""
执行DNS查询
@param domain: 要查询的域名
@param dns_server: 可选的DNS服务器
@return: DNS查询结果
"""
command = f"dig {domain}"
if dns_server:
command += f" @{dns_server}"
result = self._run_command(command)
if result['returncode'] == 0:
# 解析dig结果
answer_section = False
answers = []
for line in result['stdout'].split('
'):
if line.startswith(';; ANSWER SECTION:'):
answer_section = True
continue
if answer_section and line.strip() and not line.startswith(';'):
fields = line.strip().split()
if len(fields) >= 5:
answers.append({
'name': fields[0],
'ttl': fields[1],
'class': fields[2],
'type': fields[3],
'data': ' '.join(fields[4:])
})
if answer_section and line.startswith(';'):
answer_section = False
# 提取查询时间
query_time_match = re.search(r'Query time: (d+) msec', result['stdout'])
query_time = int(query_time_match.group(1)) if query_time_match else None
return {
'success': True,
'answers': answers,
'query_time': query_time,
'raw_output': result['stdout']
}
return {
'success': False,
'error': result['stderr'] if result['stderr'] else "DNS query failed",
'raw_output': result['stdout']
}
def network_stats(self):
"""
获取网络统计信息
@return: 网络接口统计信息
"""
# 读取/proc/net/dev文件获取接口统计信息
stats = {}
try:
with open('/proc/net/dev', 'r') as f:
lines = f.readlines()
# 跳过前两行(标题行)
for line in lines[2:]:
parts = line.strip().split(':')
if len(parts) >= 2:
interface = parts[0].strip()
values = parts[1].strip().split()
if len(values) >= 16:
stats[interface] = {
'rx': {
'bytes': int(values[0]),
'packets': int(values[1]),
'errors': int(values[2]),
'drop': int(values[3])
},
'tx': {
'bytes': int(values[8]),
'packets': int(values[9]),
'errors': int(values[10]),
'drop': int(values[11])
}
}
# 获取接口配置信息
interfaces_info = {}
for interface in stats:
# 获取IP地址
ip_result = self._run_command(f"ip addr show {interface}")
if ip_result['returncode'] == 0:
# 提取IP地址
ip_addresses = []
ip_pattern = r'inets+(d+.d+.d+.d+)/(d+)'
ip_matches = re.finditer(ip_pattern, ip_result['stdout'])
for match in ip_matches:
ip_addresses.append({
'address': match.group(1),
'prefix': match.group(2)
})
# 提取MAC地址
mac_pattern = r'link/ethers+([0-9a-f:]+)'
mac_match = re.search(mac_pattern, ip_result['stdout'])
mac_address = mac_match.group(1) if mac_match else None
interfaces_info[interface] = {
'ip_addresses': ip_addresses,
'mac_address': mac_address
}
return {
'success': True,
'stats': stats,
'interfaces': interfaces_info
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
创建初始化文件,使Func能够发现模块:
bash
sudo nano /usr/lib/python3/dist-packages/func/minion/modules/custom/__init__.py
添加内容:
python
from func.minion.modules.custom.netdiag import NetDiag
重启Func服务以加载新模块:
bash
sudo systemctl restart funcd
部署自定义模块
要在所有Minion上部署自定义模块,可以使用Func本身:
首先,将模块文件复制到所有Minion:
bash
func '*' copy.from_master '/usr/lib/python3/dist-packages/func/minion/modules/custom/netdiag.py' '/usr/lib/python3/dist-packages/func/minion/modules/custom/netdiag.py'
func '*' copy.from_master '/usr/lib/python3/dist-packages/func/minion/modules/custom/__init__.py' '/usr/lib/python3/dist-packages/func/minion/modules/custom/__init__.py'
确保目录存在:
bash
func '*' command.run 'mkdir -p /usr/lib/python3/dist-packages/func/minion/modules/custom'
重启所有Minion上的Func服务:
bash
func '*' service.restart 'funcd'
使用自定义模块
现在,自定义模块已经部署完成,可以开始使用:
bash
# 测试模块是否可用
func '*' func.list_modules | grep netdiag
# 查看模块方法
func '*' func.list_methods 'netdiag'
# 执行ping测试
func '*' netdiag.ping_test 'google.com'
# 执行traceroute
func 'web1.example.com' netdiag.traceroute 'google.com'
# 扫描端口
func 'db1.example.com' netdiag.port_scan 'localhost' '[22, 80, 443, 3306]'
# 检查DNS
func '*' netdiag.check_dns 'example.com' '8.8.8.8'
# 获取网络统计信息
func '*' netdiag.network_stats
Python API中使用自定义模块
在Python脚本中也可以使用自定义模块:
python
from func.overlord.client import Client
# 创建Client实例
fc = Client("*")
# 使用自定义模块
results = fc.netdiag.ping_test("google.com", 5)
for host, result in results.items():
if result['success']:
print(f"{host}: Ping successful, avg={result['rtt']['avg']}ms, loss={result['packet_loss']}%")
else:
print(f"{host}: Ping failed - {result.get('error', 'Unknown error')}")
# 使用端口扫描功能
target_host = "example.com"
ports = [22, 80, 443, 8080, 8443]
results = fc.netdiag.port_scan(target_host, ports)
for host, scan_results in results.items():
print(f"
Port scan from {host} to {target_host}:")
for port, port_info in scan_results.items():
if port_info['open']:
service = port_info.get('service', 'unknown')
print(f" Port {port}: OPEN ({service})")
else:
print(f" Port {port}: CLOSED")
高级自定义模块示例
下面是一个更复杂的自定义模块示例,用于监控和管理Web应用:
python
from func.minion.modules.func_module import FuncModule
import subprocess
import json
import requests
import re
import os
import time
import sqlite3
from datetime import datetime
class WebAppMonitor(FuncModule):
version = "1.0"
description = "Web application monitoring and management module"
def __init__(self):
super(WebAppMonitor, self).__init__()
# 初始化存储目录
self.data_dir = "/var/lib/func/webapp_monitor"
if not os.path.exists(self.data_dir):
os.makedirs(self.data_dir)
# 初始化数据库
self.db_file = os.path.join(self.data_dir, "metrics.db")
self._init_db()
def _init_db(self):
"""初始化SQLite数据库"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
# 创建指标表
cursor.execute('''
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
app_name TEXT,
endpoint TEXT,
response_time REAL,
status_code INTEGER,
response_size INTEGER
)
''')
# 创建事件表
cursor.execute('''
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
app_name TEXT,
event_type TEXT,
description TEXT
)
''')
conn.commit()
conn.close()
def check_endpoint(self, url, method="GET", headers=None, data=None, timeout=10):
"""
检查Web端点并记录指标
@param url: 要检查的URL
@param method: HTTP方法
@param headers: 请求头
@param data: 请求数据
@param timeout: 超时时间(秒)
@return: 检查结果
"""
headers = headers or {}
data = data or {}
try:
# 提取应用名称
app_name = re.search(r'https?://([^/]+)', url).group(1)
# 记录开始时间
start_time = time.time()
# 发送请求
response = requests.request(
method,
url,
headers=headers,
json=data if method in ["POST", "PUT", "PATCH"] else None,
timeout=timeout
)
# 计算响应时间
response_time = time.time() - start_time
# 准备结果
result = {
'success': True,
'url': url,
'status_code': response.status_code,
'response_time': response_time,
'response_size': len(response.content),
'headers': dict(response.headers)
}
# 存储指标
self._store_metric(app_name, url, response_time, response.status_code, len(response.content))
# 检查是否有错误
if response.status_code >= 400:
result['success'] = False
result['error'] = f"HTTP error: {response.status_code}"
# 记录错误事件
self._store_event(app_name, "ERROR", f"HTTP error {response.status_code} for {url}")
return result
except requests.exceptions.Timeout:
self._store_event(app_name, "ERROR", f"Timeout connecting to {url}")
return {
'success': False,
'url': url,
'error': "Request timed out"
}
except requests.exceptions.ConnectionError:
self._store_event(app_name, "ERROR", f"Connection error for {url}")
return {
'success': False,
'url': url,
'error': "Connection error"
}
except Exception as e:
return {
'success': False,
'url': url,
'error': str(e)
}
def _store_metric(self, app_name, endpoint, response_time, status_code, response_size):
"""存储指标到数据库"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO metrics (timestamp, app_name, endpoint, response_time, status_code, response_size)
VALUES (?, ?, ?, ?, ?, ?)
''', (
datetime.now().isoformat(),
app_name,
endpoint,
response_time,
status_code,
response_size
))
conn.commit()
conn.close()
def _store_event(self, app_name, event_type, description):
"""存储事件到数据库"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO events (timestamp, app_name, event_type, description)
VALUES (?, ?, ?, ?)
''', (
datetime.now().isoformat(),
app_name,
event_type,
description
))
conn.commit()
conn.close()
def monitor_endpoints(self, endpoints):
"""
监控多个端点
@param endpoints: 端点配置列表
@return: 监控结果
"""
results = {}
for endpoint in endpoints:
url = endpoint['url']
method = endpoint.get('method', 'GET')
headers = endpoint.get('headers', {})
data = endpoint.get('data', {})
timeout = endpoint.get('timeout', 10)
results[url] = self.check_endpoint(url, method, headers, data, timeout)
return results
def get_metrics(self, app_name=None, hours=24):
"""
获取应用指标
@param app_name: 应用名称(可选)
@param hours: 过去的小时数
@return: 指标数据
"""
conn = sqlite3.connect(self.db_file)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = '''
SELECT * FROM metrics
WHERE timestamp >= datetime('now', '-' || ? || ' hours')
'''
params = [hours]
if app_name:
query += " AND app_name = ?"
params.append(app_name)
query += " ORDER BY timestamp DESC"
cursor.execute(query, params)
rows = cursor.fetchall()
# 转换为字典列表
metrics = []
for row in rows:
metrics.append(dict(row))
conn.close()
return metrics
def get_events(self, app_name=None, event_type=None, hours=24):
"""
获取事件
@param app_name: 应用名称(可选)
@param event_type: 事件类型(可选)
@param hours: 过去的小时数
@return: 事件数据
"""
conn = sqlite3.connect(self.db_file)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = '''
SELECT * FROM events
WHERE timestamp >= datetime('now', '-' || ? || ' hours')
'''
params = [hours]
if app_name:
query += " AND app_name = ?"
params.append(app_name)
if event_type:
query += " AND event_type = ?"
params.append(event_type)
query += " ORDER BY timestamp DESC"
cursor.execute(query, params)
rows = cursor.fetchall()
# 转换为字典列表
events = []
for row in rows:
events.append(dict(row))
conn.close()
return events
def analyze_performance(self, app_name=None, hours=24):
"""
分析应用性能
@param app_name: 应用名称(可选)
@param hours: 分析时间范围(小时)
@return: 性能分析结果
"""
metrics = self.get_metrics(app_name, hours)
if not metrics:
return {
'success': False,
'error': "No metrics found for the specified criteria"
}
# 按应用名称分组指标
apps = {}
for metric in metrics:
app = metric['app_name']
if app not in apps:
apps[app] = {
'response_times': [],
'status_codes': {},
'endpoints': {},
'total_requests': 0
}
# 添加响应时间
apps[app]['response_times'].append(metric['response_time'])
# 统计状态码
status_code = str(metric['status_code'])
apps[app]['status_codes'][status_code] = apps[app]['status_codes'].get(status_code, 0) + 1
# 统计端点
endpoint = metric['endpoint']
if endpoint not in apps[app]['endpoints']:
apps[app]['endpoints'][endpoint] = {
'response_times': [],
'status_codes': {},
'total_requests': 0
}
apps[app]['endpoints'][endpoint]['response_times'].append(metric['response_time'])
apps[app]['endpoints'][endpoint]['status_codes'][status_code] = apps[app]['endpoints'][endpoint]['status_codes'].get(status_code, 0) + 1
apps[app]['endpoints'][endpoint]['total_requests'] += 1
# 增加总请求数
apps[app]['total_requests'] += 1
# 计算统计信息
analysis = {}
for app, data in apps.items():
response_times = data['response_times']
# 跳过没有足够数据的应用
if len(response_times) < 2:
continue
# 计算响应时间统计
avg_response_time = sum(response_times) / len(response_times)
response_times.sort()
median_response_time = response_times[len(response_times) // 2]
p95_response_time = response_times[int(len(response_times) * 0.95)]
# 计算错误率
error_count = sum(data['status_codes'].get(code, 0) for code in data['status_codes'] if int(code) >= 400)
error_rate = error_count / data['total_requests'] if data['total_requests'] > 0 else 0
# 端点分析
endpoints_analysis = {}
for endpoint, endpoint_data in data['endpoints'].items():
endpoint_response_times = endpoint_data['response_times']
if len(endpoint_response_times) < 2:
continue
endpoint_response_times.sort()
endpoints_analysis[endpoint] = {
'avg_response_time': sum(endpoint_response_times) / len(endpoint_response_times),
'median_response_time': endpoint_response_times[len(endpoint_response_times) // 2],
'p95_response_time': endpoint_response_times[int(len(endpoint_response_times) * 0.95)],
'min_response_time': endpoint_response_times[0],
'max_response_time': endpoint_response_times[-1],
'request_count': endpoint_data['total_requests'],
'error_rate': sum(endpoint_data['status_codes'].get(code, 0) for code in endpoint_data['status_codes'] if int(code) >= 400) / endpoint_data['total_requests'] if endpoint_data['total_requests'] > 0 else 0
}
# 保存应用分析结果
analysis[app] = {
'avg_response_time': avg_response_time,
'median_response_time': median_response_time,
'p95_response_time': p95_response_time,
'min_response_time': response_times[0],
'max_response_time': response_times[-1],
'request_count': data['total_requests'],
'error_rate': error_rate,
'status_codes': data['status_codes'],
'endpoints': endpoints_analysis
}
return {
'success': True,
'analysis': analysis
}
11.4 非 Python API 接口支持
Func除了提供Python API外,还支持其他接口方式,便于与不同语言和平台集成。
XMLRPC接口
Func的通信基于XMLRPC协议,这意味着任何支持XMLRPC的语言都可以与Func通信:
PHP示例
php
<?php
// 连接到Func服务器
$client = new xmlrpc_client("https://func-master.example.com:51234/");
$client->setSSLVerifyPeer(true);
$client->setSSLVerifyHost(2);
// 设置证书
$client->setSSLCertificate("/etc/pki/certmaster/certs/client.pem");
$client->setSSLKey("/etc/pki/certmaster/certs/client.pem");
$client->setSSLCAFile("/etc/pki/certmaster/ca/certmaster.crt");
// 创建请求
$request = new xmlrpcmsg("system.listMethods", array());
// 发送请求
$response = $client->send($request);
// 检查结果
if ($response->faultCode()) {
echo "Error: " . $response->faultString() . "
";
} else {
$methods = $response->value();
echo "Available methods:
";
foreach ($methods as $method) {
echo "- " . $method->scalarval() . "
";
}
}
?>
Ruby示例
ruby
require 'xmlrpc/client'
# 禁用证书验证(仅用于测试)
XMLRPC::Client.prepend(Module.new do
def set_auth(user, password)
@http.verify_mode = OpenSSL::SSL::VERIFY_NONE
super
end
end)
# 连接到Func服务器
client = XMLRPC::Client.new2("https://func-master.example.com:51234/")
# 设置证书
client.cert = File.read("/etc/pki/certmaster/certs/client.pem")
client.key = File.read("/etc/pki/certmaster/certs/client.pem")
client.ca_file = "/etc/pki/certmaster/ca/certmaster.crt"
# 调用方法
begin
result = client.call("system.listMethods")
puts "Available methods:"
result.each do |method|
puts "- #{method}"
end
rescue XMLRPC::FaultException => e
puts "Error: #{e.faultCode} - #{e.faultString}"
rescue => e
puts "Error: #{e.message}"
end
Java示例
java
import org.apache.xmlrpc.client.XmlRpcClient;
import org.apache.xmlrpc.client.XmlRpcClientConfigImpl;
import java.net.URL;
import java.security.cert.X509Certificate;
import javax.net.ssl.*;
import java.io.File;
import java.io.FileInputStream;
import java.security.KeyStore;
public class FuncClient {
public static void main(String[] args) {
try {
// 设置SSL上下文
KeyStore keyStore = KeyStore.getInstance("JKS");
keyStore.load(new FileInputStream("/path/to/keystore.jks"), "password".toCharArray());
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keyStore, "password".toCharArray());
KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(new FileInputStream("/path/to/truststore.jks"), "password".toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(trustStore);
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
SSLContext.setDefault(sslContext);
// 设置XMLRPC客户端
XmlRpcClientConfigImpl config = new XmlRpcClientConfigImpl();
config.setServerURL(new URL("https://func-master.example.com:51234/"));
config.setEnabledForExtensions(true);
config.setConnectionTimeout(60 * 1000);
config.setReplyTimeout(60 * 1000);
XmlRpcClient client = new XmlRpcClient();
client.setConfig(config);
// 调用方法
Object[] params = new Object[] {};
Object[] result = (Object[]) client.execute("system.listMethods", params);
System.out.println("Available methods:");
for (Object method : result) {
System.out.println("- " + method.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
RESTful API(使用Func Overlord)
Func Overlord提供了RESTful API接口,可以通过HTTP请求与Func交互:
bash
# 安装Func Overlord
sudo yum install func-overlord # CentOS/RHEL
sudo apt-get install func-overlord # Ubuntu/Debian
配置Overlord:
bash
sudo nano /etc/func/overlord.conf
ini
[overlord]
listen_addr = 0.0.0.0
listen_port = 8080
ssl_cert = /etc/pki/func/certs/web.pem
ssl_key = /etc/pki/func/certs/web.pem
debug = false
启动Overlord服务:
bash
sudo systemctl enable func-overlord
sudo systemctl start func-overlord
使用RESTful API:
bash
# 获取可用模块列表
curl -k -X GET https://func-master:8080/api/modules
# 执行命令
curl -k -X POST https://func-master:8080/api/run
-H "Content-Type: application/json"
-d '{
"target": "*",
"module": "command",
"method": "run",
"args": ["uptime"]
}'
# 获取任务结果
curl -k -X GET https://func-master:8080/api/results/12345
11.5 Func 的 Facts 支持
Func支持Facts系统,用于收集和管理主机信息,类似于Ansible的Facts或SaltStack的Grains。Facts可以用于目标选择、条件执行和配置管理。
内置Facts
Func自动收集一系列系统信息作为Facts:
bash
# 查看所有Facts
func '*' facts.get_all
# 查看特定Fact
func '*' facts.get 'system.os'
主要内置Facts包括:
system.hostname: 主机名
system.os: 操作系统名称
system.os_version: 操作系统版本
system.kernel: 内核版本
system.arch: 系统架构
hardware.cpu_model: CPU型号
hardware.cpu_count: CPU数量
hardware.memory_total: 总内存
network.interfaces: 网络接口
network.ip_addresses: IP地址
自定义Facts
可以创建自定义Facts扩展Func的信息收集能力:
在Minion上创建Facts目录:
bash
sudo mkdir -p /etc/func/facts.d
创建自定义Facts脚本:
bash
sudo nano /etc/func/facts.d/app_info.fact
python
#!/usr/bin/env python3
"""
自定义Fact脚本:收集应用信息
"""
import json
import os
import subprocess
# 初始化结果字典
facts = {
'applications': {}
}
# 检查Java版本
try:
java_output = subprocess.check_output(['java', '-version'], stderr=subprocess.STDOUT, universal_newlines=True)
if 'version' in java_output:
version_line = java_output.split('
')[0]
version = version_line.split('"')[1] if '"' in version_line else 'unknown'
facts['applications']['java'] = {
'installed': True,
'version': version
}
except:
facts['applications']['java'] = {
'installed': False
}
# 检查MySQL版本
try:
mysql_output = subprocess.check_output(['mysql', '--version'], universal_newlines=True)
if 'Distrib' in mysql_output:
version = mysql_output.split('Distrib')[1].split(',')[0].strip()
facts['applications']['mysql'] = {
'installed': True,
'version': version
}
except:
facts['applications']['mysql'] = {
'installed': False
}
# 检查Nginx版本
try:
nginx_output = subprocess.check_output(['nginx', '-v'], stderr=subprocess.STDOUT, universal_newlines=True)
if 'nginx version' in nginx_output:
version = nginx_output.split('nginx version:')[1].strip()
facts['applications']['nginx'] = {
'installed': True,
'version': version
}
except:
facts['applications']['nginx'] = {
'installed': False
}
# 检查自定义应用配置
app_config_path = '/etc/myapp/config.json'
if os.path.exists(app_config_path):
try:
with open(app_config_path, 'r') as f:
app_config = json.load(f)
facts['applications']['myapp'] = {
'installed': True,
'version': app_config.get('version', 'unknown'),
'environment': app_config.get('environment', 'unknown'),
'config_path': app_config_path
}
except:
facts['applications']['myapp'] = {
'installed': True,
'error': 'Failed to parse config'
}
# 输出JSON格式的Facts
print(json.dumps(facts))
设置执行权限:
bash
sudo chmod +x /etc/func/facts.d/app_info.fact
刷新Facts:
bash
func '*' facts.refresh
使用自定义Facts:
bash
# 获取应用信息
func '*' facts.get 'applications'
# 基于Facts进行目标选择
func '@facts.applications.nginx.installed=true' command.run 'nginx -t'
# 条件执行
func '*' command.run 'if [ "{
{ facts.applications.java.installed }}" = "true" ]; then java -version; else echo "Java not installed"; fi'
在Python API中使用Facts
python
from func.overlord.client import Client
# 创建Client实例
fc = Client("*")
# 获取所有Facts
facts = fc.facts.get_all()
# 处理Facts数据
for host, host_facts in facts.items():
print(f"Host: {host}")
# 检查操作系统
os_name = host_facts.get('system', {}).get('os', 'unknown')
os_version = host_facts.get('system', {}).get('os_version', 'unknown')
print(f" OS: {os_name} {os_version}")
# 检查应用
apps = host_facts.get('applications', {})
print(" Installed applications:")
for app_name, app_info in apps.items():
if app_info.get('installed', False):
version = app_info.get('version', 'unknown')
print(f" - {app_name} (version: {version})")
# 基于Facts执行操作
if apps.get('nginx', {}).get('installed', False):
print(" Checking Nginx configuration...")
result = fc.command.run("nginx -t", [host])
print(f" Result: {result[host]}")
通过Func的Facts系统,可以收集丰富的系统和应用信息,并基于这些信息进行智能的目标选择和条件执行,实现更加精确和高效的系统管理。
Func的灵活架构、可扩展的模块系统和多种接口支持,使其成为现代IT基础设施管理的强大工具。无论是传统的物理服务器、虚拟化环境还是网络设备,Func都能提供统一的管理接口,简化运维工作,提高自动化水平。















暂无评论内容