一、概述
1.1 背景介绍
在运维工作中,大量重复性任务占据了工程师60%-80%的工作时间:日志分析、批量操作、监控告警、资源清理等。这些任务虽然简单,但手工执行效率低且易出错。Python凭借其简洁的语法、丰富的标准库和第三方模块,成为运维自动化的首选语言。本文将分享10个经过生产环境验证的Python脚本,帮助运维工程师从重复劳动中解放出来。
1.2 技术特点
- • 开发效率高:Python语法简洁,开发周期仅为Shell脚本的1/3,适合快速实现运维需求
- • 生态系统完善:提供paramiko、requests、psutil等成熟运维库,避免重复造轮轮
- • 跨平台兼容:同一脚本可在Linux、Windows、macOS上运行,减少维护成本
- • 易于维护扩展:代码可读性强,便于团队协作和功能迭代
1.3 适用场景
- • 场景一:需要批量管理100台以上服务器的运维团队,如配置分发、命令执行、文件同步
- • 场景二:日均处理GB级日志的业务系统,需要自动化分析异常、统计访问量、生成报表
- • 场景三:多云环境资源管理,包括虚拟机、容器、存储的自动清理和成本优化
- • 场景四:7x24小时监控场景,需要自动化健康检查、告警处理、故障自愈
1.4 环境要求
二、详细步骤
2.1 准备工作
◆ 2.1.1 系统检查
# 检查Python版本
python3 --version
# 检查pip版本
pip3 --version
# 检查系统资源
free -h
df -h
◆ 2.1.2 安装依赖
# 升级pip
pip3 install --upgrade pip
# 安装常用运维库
pip3 install paramiko requests psutil schedule pymysql redis elasticsearch prometheus-client
# 验证安装
pip3 list | grep -E "paramiko|requests|psutil"
2.2 核心配置
◆ 2.2.1 配置SSH密钥认证
# 生成SSH密钥对
ssh-keygen -t rsa -b 4096 -f ~/.ssh/ops_rsa -N ""
# 分发公钥到目标服务器(示例)
ssh-copy-id -i ~/.ssh/ops_rsa.pub root@192.168.1.100
说明:使用密钥认证替代密码登录,提高安全性并支持批量操作。建议为运维脚本单独创建密钥对,便于权限管理和审计。
◆ 2.2.2 配置文件示例
# 配置文件:config.yml
servers:
-host:192.168.1.100
port:22
user:root
key_file:~/.ssh/ops_rsa
-host:192.168.1.101
port:22
user:root
key_file:~/.ssh/ops_rsa
mysql:
host:192.168.1.200
port:3306
user:monitor
password:your_password
database:ops
redis:
host:192.168.1.201
port:6379
password:your_redis_password
db:0
log:
level:INFO
file:/var/log/ops/automation.log
max_size:100# MB
backup_count:10
参数说明:
- •
mysql/redis:数据库连接信息,用于存储执行结果和状态
◆ 2.2.3 日志配置
# logging_config.py
import logging
from logging.handlers import RotatingFileHandler
defsetup_logger(log_file='/var/log/ops/automation.log', level=logging.INFO):
logger = logging.getLogger('ops_automation')
logger.setLevel(level)
# 轮转文件处理器
handler = RotatingFileHandler(
log_file,
maxBytes=100*1024*1024, # 100MB
backupCount=10
)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
2.3 启动和验证
◆ 2.3.1 基础测试
# 测试SSH连接
python3 -c "import paramiko; print('paramiko OK')"
# 测试配置文件读取
python3 -c "import yaml; print(yaml.safe_load(open('config.yml')))"
◆ 2.3.2 功能验证
# 验证SSH批量执行(示例脚本1)
python3 batch_ssh_executor.py "uptime"
# 预期输出
# [192.168.1.100] SUCCESS: 10:30:23 up 45 days, 2:15, 1 user, load average: 0.15, 0.10, 0.08
# [192.168.1.101] SUCCESS: 10:30:24 up 30 days, 5:20, 1 user, load average: 0.25, 0.20, 0.18
三、示例代码和配置
3.1 完整配置示例
◆ 3.1.1 脚本1:批量SSH命令执行器
#!/usr/bin/env python3
# 文件路径:batch_ssh_executor.py
"""
批量SSH命令执行器
支持并发执行、结果收集、异常处理
"""
import paramiko
import yaml
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from logging_config import setup_logger
logger = setup_logger()
classSSHExecutor:
def__init__(self, config_file='config.yml'):
withopen(config_file) as f:
self.config = yaml.safe_load(f)
self.servers = self
.config['servers']
defexecute_on_host(self, server, command, timeout=30):
"""在单个主机上执行命令"""
host = server['host']
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 使用密钥认证
key = paramiko.RSAKey.from_private_key_file(server['key_file'])
client.connect(
hostname=host,
port=server['port'],
username=server['user'],
pkey=key,
timeout=10
)
stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
exit_code = stdout.channel.recv_exit_status()
result = {
'host': host,
'success': exit_code == 0,
'stdout': stdout.read().decode('utf-8', errors='ignore').strip(),
'stderr': stderr.read().decode('utf-8', errors='ignore').strip(),
'exit_code': exit_code
}
client.close()
logger.info(f"[{host}] Command executed, exit_code={exit_code}")
return result
except Exception as e:
logger.error(f"[{host}] Error: {str(e)}")
return {
'host': host,
'success': False,
'stdout': '',
'stderr': str(e),
'exit_code': -1
}
defexecute_parallel(self, command, max_workers=10):
"""并发执行命令"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as
executor:
futures = {
executor.submit(self.execute_on_host, server, command): server
for server inself.servers
}
for future in as_completed(futures):
results.append(future.result())
return results
defprint_results(self, results):
"""格式化输出结果"""
success_count = sum(1for r in results if r['success'])
print(f"\n执行完成: 成功 {success_count}/{len(results)}\n")
for result insorted(results, key=lambda x: x['host']):
status = "SUCCESS"if result['success'] else"FAILED"
print(f"[{result['host']}] {status}")
if result['stdout']:
print(f" 输出: {result['stdout']}")
if result['stderr']:
print(f" 错误: {result['stderr']}")
print()
if __name__ == '__main__':
iflen(sys.argv) 2:
print("用法: python3 batch_ssh_executor.py ''")
sys.exit(1)
command = sys.argv[1]
executor = SSHExecutor()
results = executor.execute_parallel(command)
executor.print_results(results)
◆ 3.1.2 脚本2:日志分析与告警
#!/usr/bin/env python3
# 文件名:log_analyzer.py
"""
日志分析工具
功能:错误统计、异常检测、自动告警
"""
import re
import json
from collections import Counter, defaultdict
from datetime import datetime, timedelta
import requests
from logging_config import setup_logger
logger = setup_logger()
classLogAnalyzer:
def__init__(self, log_file):
self.log_file = log_file
self.error_patterns = {
'http_5xx': r'HTTP/\d\.\d"\s5\d{2}',
'exception': r'(Exception|Error|Fatal)',
'timeout': r'(timeout|timed out)',
'connection_refused': r'Connection refused',
'out_of_memory': r'(OutOfMemory|OOM|Cannot allocate memory)'
}
defparse_nginx_log(self, line):
"""解析Nginx日志格式"""
pattern = r'(\S+) - - \[(.*?)\] "(.*?)" (\d{3}) (\d+) "(.*?)" "(.*?)"'
match = re.match(pattern, line)
ifmatch:
return {
'ip': match.group(1),
'time': match.group(2),
'request': match.group(3),
'status': int(match.group(4)),
'size': int(
match.group(5)),
'referer': match.group(6),
'user_agent': match.group(7)
}
returnNone
defanalyze(self, time_window=60):
"""分析最近N分钟的日志"""
now = datetime.now()
cutoff_time = now - timedelta(minutes=time_window)
stats = {
'total_requests': 0,
'error_count': defaultdict(int),
'status_codes': Counter(),
'top_ips': Counter(),
'slow_requests': []
}
withopen(self.log_file, 'r') as f:
for line in f:
entry = self.parse_nginx_log(line)
ifnot entry:
continue
# 时间过滤
log_time = datetime.strptime(entry['time'], '%d/%b/%Y:%H:%M:%S %z')
if log_time.replace(tzinfo=None) < cutoff_time:
continue
stats['total_requests'] += 1
stats['status_codes'][entry['status']] += 1
stats['top_ips'][entry['ip']] += 1
# 错误检测
for error_type, pattern inself.error_patterns.items():
if re.search(pattern, line):
stats['error_count'][error_type] += 1
# 5xx错误记录
if500 <= entry['status'] 600:
stats[
'slow_requests'].append({
'time': entry['time'],
'request': entry['request'],
'status': entry['status']
})
return stats
defcheck_alert_conditions(self, stats):
"""检查告警条件"""
alerts = []
# 5xx错误率超过5%
if stats['total_requests'] > 0:
error_5xx = sum(count for code, count in stats['status_codes'].items()
if500 <= code 600)
error_rate = error_5xx / stats['total_requests']
if error_rate > 0.05:
alerts.append({
'level': 'critical',
'message': f'5xx错误率: {error_rate*100:.2f}% ({error_5xx}/{stats["total_requests"]})'
})
# OOM错误
if stats['error_count']['out_of_memory'] > 0:
alerts.append({
'level': 'critical',
'message': f'检测到OOM错误: {stats["error_count"]["out_of_memory"]}次'
})
# 连接超时
if stats['error_count']['timeout'] > 100:
alerts.append({
'level': 'warning',
'message': f'超时错误异常: {stats["error_count"]["timeout"]}
次'
})
return alerts
defsend_alert(self, alerts, webhook_url):
"""发送告警到企业微信/钉钉"""
ifnot alerts:
return
message = "【日志告警】\n" + "\n".join(
f"[{a['level'].upper()}] {a['message']}"for a in alerts
)
payload = {
"msgtype": "text",
"text": {"content": message}
}
try:
response = requests.post(webhook_url, json=payload, timeout=5)
if response.status_code == 200:
logger.info("告警发送成功")
else:
logger.error(f"告警发送失败: {response.status_code}")
except Exception as e:
logger.error(f"告警发送异常: {str(e)}")
if __name__ == '__main__':
analyzer = LogAnalyzer('/var/log/nginx/access.log')
stats = analyzer.analyze(time_window=5)
print(f"总请求数: {stats['total_requests']}")
print(f"状态码分布: {dict(stats['status_codes'])}")
print(f"Top 10 IP: {stats['top_ips'].most_common(10)}")
print(f"错误统计: {dict(stats['error_count'])}")
alerts = analyzer.check_alert_conditions(stats)
if alerts:
print("\n触发告警:")
for alert in alerts:
print(f" [{alert['level']}] {alert['message']}")
# 发送告警(替换为实际webhook地址)
# analyzer.send_alert(alerts, 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx')
◆ 3.1.3 脚本3:系统资源监控
#!/usr/bin/env python3
# 文件名:system_monitor.py
"""
系统资源监控
监控CPU、内存、磁盘、网络,支持Prometheus集成
"""
import psutil
import time
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from logging_config import setup_logger
logger = setup_logger()
classSystemMonitor:
def__init__(self, pushgateway_url='localhost:9091', job_name='system_monitor'):
self.pushgateway_url = pushgateway_url
self.job_name = job_name
self.registry = CollectorRegistry()
# 定义指标
self.cpu_gauge = Gauge('system_cpu_percent', 'CPU使用率', registry=self.registry)
self.memory_gauge = Gauge('system_memory_percent', '内存使用率', registry=self.registry)
self.disk_gauge = Gauge('system_disk_percent', '磁盘使用率',
['mountpoint'], registry=self.registry)
self.network_gauge = Gauge('system_network_bytes', '网络流量',
['interface', 'direction'], registry=self.registry)
defcollect_metrics(self):
"""采集系统指标"""
metrics = {}
# CPU
cpu_percent = psutil.cpu_percent(interval=1)
metrics['cpu'] = cpu_percent
self.cpu_gauge.set(cpu_percent)
# 内存
memory = psutil.virtual_memory()
metrics['memory'] = {
'percent': memory.percent,
'total': memory.total,
'available': memory.available,
'used': memory.used
}
self.memory_gauge.set(memory.percent)
# 磁盘
metrics['disk'] = {}
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
metrics['disk'][partition.mountpoint] = {
'percent': usage.percent,
'total': usage.total,
'used': usage.used,
'free': usage.free
}
self.disk_gauge.labels(mountpoint=partition.mountpoint).set(usage.percent)
except PermissionError:
continue
# 网络
net_io = psutil.net_io_counters(pernic=True)
metrics['network'] = {}
for interface, stats in net_io.items():
metrics['network'][interface] = {
'bytes_sent': stats.bytes_sent,
'bytes_recv': stats.bytes_recv
}
self.network_gauge.labels(interface=interface, direction='sent').set(stats.bytes_sent)
self.network_gauge.labels(interface=interface, direction='recv').set(stats.bytes_recv)
return metrics
defcheck_thresholds(self, metrics):
"""检查阈值告警"""
alerts = []
if metrics[
'cpu'] > 80:
alerts.append(f"CPU使用率过高: {metrics['cpu']:.1f}%")
if metrics['memory']['percent'] > 85:
alerts.append(f"内存使用率过高: {metrics['memory']['percent']:.1f}%")
for mount, stats in metrics['disk'].items():
if stats['percent'] > 90:
alerts.append(f"磁盘空间不足: {mount} ({stats['percent']:.1f}%)")
return alerts
defpush_metrics(self):
"""推送指标到Pushgateway"""
try:
push_to_gateway(self.pushgateway_url, job=self.job_name, registry=self.registry)
logger.info("指标推送成功")
except Exception as e:
logger.error(f"指标推送失败: {str(e)}")
defrun(self, interval=60):
"""持续监控"""
logger.info(f"开始监控,采集间隔: {interval}秒")
whileTrue:
try:
metrics = self.collect_metrics()
alerts = self.check_thresholds(metrics)
if alerts:
logger.warning("触发告警: " + "; ".join(alerts))
self.push_metrics()
time.sleep(interval)
except KeyboardInterrupt:
logger.info("监控停止")
break
except Exception as e:
logger.error(f"监控异常: {str(e)}")
time.sleep(interval)
if __name__ == '__main__':
monitor = SystemMonitor()
# 单次采集
metrics = monitor.collect_metrics()
print(f"CPU: {metrics['cpu']:.1f}%")
print(f"内存: {metrics['memory']['percent']:.1f}%")
print("磁盘:")
for mount, stats in metrics['disk'].items():
print(f" {mount}: {stats['percent']:.1f}%")
# 持续监控(取消注释启用)
# monitor.run(interval=60)
◆ 3.1.4 脚本4:MySQL慢查询分析
#!/usr/bin/env python3
# 文件名:mysql_slow_query_analyzer.py
"""
MySQL慢查询分析
解析慢查询日志,生成优化建议
"""
import re
import pymysql
from collections import defaultdict
from logging_config import setup_logger
logger = setup_logger()
classSlowQueryAnalyzer:
def__init__(self, slow_log_file, db_config):
self.slow_log_file = slow_log_file
self.db_config = db_config
self.queries = []
defparse_slow_log
(self):
"""解析慢查询日志"""
current_query = {}
withopen(self.slow_log_file, 'r') as f:
for line in f:
# Time行
if line.startswith('# Time:'):
if current_query:
self.queries.append(current_query)
current_query = {'time': line.split(':', 1)[1].strip()}
# User@Host行
elif line.startswith('# User@Host:'):
match = re.search(r'(\w+)\[(\w+)\] @ (\S+)', line)
ifmatch:
current_query['user'] = match.group(1)
current_query['host'] = match.group(3)
# Query_time行
elif line.startswith('# Query_time:'):
match = re.search(
r'Query_time: ([\d.]+)\s+Lock_time: ([\d.]+)\s+Rows_sent: (\d+)\s+Rows_examined: (\d+)',
line
)
ifmatch:
current_query['query_time'] = float(match.group(1))
current_query['lock_time'] = float(match.group(2))
current_query['rows_sent'] = int(match.group(3))
current_query['rows_examined'] = int(match.group(4))
# SQL语句
elifnot line.startswith('#')
and line.strip():
current_query['sql'] = current_query.get('sql', '') + line.strip() + ' '
if current_query:
self.queries.append(current_query)
logger.info(f"解析完成,共 {len(self.queries)} 条慢查询")
defanalyze(self):
"""分析慢查询"""
stats = {
'total': len(self.queries),
'avg_query_time': 0,
'max_query_time': 0,
'top_queries': [],
'table_scan': []
}
ifnotself.queries:
return stats
# 基础统计
total_time = sum(q['query_time'] for q inself.queries)
stats['avg_query_time'] = total_time / len(self.queries)
stats['max_query_time'] = max(q['query_time'] for q inself.queries)
# Top 10耗时查询
sorted_queries = sorted(self.queries, key=lambda x: x['query_time'], reverse=True)
stats['top_queries'] = sorted_queries[:10]
# 全表扫描检测(rows_examined > 10000)
stats['table_scan'] = [
q for q inself.queries
if q.get('rows_examined', 0) > 10000
]
return stats
defget_explain_plan(self, sql):
"""获取EXPLAIN执行计划"""
try:
conn = pymysql.connect(**self.db_config)
cursor = conn.cursor()
cursor.execute(f"EXPLAIN {sql}")
result = cursor.fetchall()
cursor.close()
conn.close()
return result
except Exception as e:
logger.error(f"EXPLAIN失败: {str(e)}")
returnNone
defgenerate_report(self, stats):
"""生成分析报告"""
report = []
report.append("=" * 80)
report.append("MySQL慢查询分析报告")
report.append("=" * 80)
report.append(f"总慢查询数: {stats['total']}")
report.append(f"平均查询时间: {stats['avg_query_time']:.2f}秒")
report.append(f"最大查询时间: {stats['max_query_time']:.2f}秒")
report.append("")
report.append("Top 10耗时查询:")
for i, query inenumerate(stats['top_queries'], 1):
report.append(f"\n{i}. 查询时间: {query['query_time']:.2f}秒")
report.append(f" 扫描行数: {query.get('rows_examined', 0)}")
report.append(f" SQL: {query.get(
'sql', '')[:200]}")
if stats['table_scan']:
report.append(f"\n发现 {len(stats['table_scan'])} 个全表扫描查询")
for query in stats['table_scan'][:5]:
report.append(f" - {query.get('sql', '')[:100]}")
return"\n".join(report)
if __name__ == '__main__':
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'your_password',
'database': 'test'
}
analyzer = SlowQueryAnalyzer('/var/lib/mysql/slow.log', db_config)
analyzer.parse_slow_log()
stats = analyzer.analyze()
print(analyzer.generate_report(stats))
◆ 3.1.5 脚本5:文件同步工具
#!/usr/bin/env python3
# 文件名:file_sync.py
"""
文件同步工具
支持增量同步、断点续传、校验
"""
import os
import hashlib
import paramiko
from pathlib import Path
from logging_config import setup_logger
logger = setup_logger()
classFileSync:
def__init__(self, ssh_config):
self.ssh_config = ssh_config
self.client = None
self.sftp =
None
defconnect(self):
"""建立SSH连接"""
try:
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
key = paramiko.RSAKey.from_private_key_file(self.ssh_config['key_file'])
self.client.connect(
hostname=self.ssh_config['host'],
port=self.ssh_config['port'],
username=self.ssh_config['user'],
pkey=key
)
self.sftp = self.client.open_sftp()
logger.info(f"连接成功: {self.ssh_config['host']}")
except Exception as e:
logger.error(f"连接失败: {str(e)}")
raise
defdisconnect(self):
"""关闭连接"""
ifself.sftp:
self.sftp.close()
ifself.client:
self.client.close()
defcalculate_md5(self, file_path):
"""计算文件MD5"""
hash_md5 = hashlib.md5()
withopen(file_path, "rb") as f:
for chunk initer(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
defremote_file_exists(self, remote_path):
"""检查远程文件是否存在"""
try:
self.sftp.stat(remote_path)
returnTrue
except FileNotFoundError:
returnFalse
defsync_file(self, local_path, remote_path, check_md5=True):
"""同步单个文件"""
try:
# 确保远程目录存在
remote_dir = os.path.dirname(remote_path)
try:
self.sftp.stat(remote_dir)
except FileNotFoundError:
self._create_remote_dir(remote_dir)
# MD5校验
need_upload = True
if check_md5 andself.remote_file_exists(remote_path):
local_md5 = self.calculate_md5(local_path)
# 远程MD5计算(需要执行命令)
stdin, stdout, stderr = self.client.exec_command(f"md5sum {remote_path}")
remote_md5 = stdout.read().decode().split()[0]
if local_md5 == remote_md5:
logger.info(f"文件未变化,跳过: {local_path}")
need_upload = False
if need_upload:
self.sftp.put(local_path, remote_path)
logger.info(f"上传成功: {local_path} -> {remote_path}")
returnTrue
returnFalse
except Exception as e:
logger.error(f"同步失败 {local_path}: {str(e)}")
returnFalse
def_create_remote_dir(self, remote_dir):
"""递归创建远程目录"""
dirs = []
while remote_dir != '/':
dirs.append(remote_dir)
remote_dir = os.path.dirname(remote_dir)
for dir_path inreversed(dirs):
try:
self
.sftp.stat(dir_path)
except FileNotFoundError:
self.sftp.mkdir(dir_path)
logger.info(f"创建目录: {dir_path}")
defsync_directory(self, local_dir, remote_dir, exclude_patterns=None):
"""同步整个目录"""
exclude_patterns = exclude_patterns or []
synced_count = 0
skipped_count = 0
for root, dirs, files in os.walk(local_dir):
# 计算相对路径
rel_path = os.path.relpath(root, local_dir)
remote_root = os.path.join(remote_dir, rel_path).replace('\\', '/')
for file in files:
# 排除规则
ifany(pattern in file for pattern in exclude_patterns):
continue
local_file = os.path.join(root, file)
remote_file = os.path.join(remote_root, file).replace('\\', '/')
ifself.sync_file(local_file, remote_file):
synced_count += 1
else:
skipped_count += 1
logger.info(f"同步完成: 上传{synced_count}个文件,跳过{skipped_count}个")
if __name__ == '__main__':
ssh_config = {
'host': '192.168.1.100',
'port': 22,
'user': 'root',
'key_file': '~/.ssh/ops_rsa'
}
sync = FileSync(ssh_config)
sync.connect()
# 同步单个文件
# sync.sync_file('/local/config.yml', '/remote/config.yml')
# 同步目录
sync.sync_directory(
'/local/app',
'/remote/app',
exclude_patterns=['.git', '.pyc',
'__pycache__']
)
sync.disconnect()
3.2 实际应用案例
◆ 案例一:自动化证书续期检查
场景描述:管理100+个域名的SSL证书,需要提前30天发现即将过期的证书并告警。
实现代码:
#!/usr/bin/env python3
# 文件名:ssl_cert_checker.py
import ssl
import socket
from datetime import datetime, timedelta
import requests
classSSLCertChecker:
def__init__(self, domains, alert_days=30):
self.domains = domains
self.alert_days = alert_days
defcheck_cert_expiry(self, domain, port=443):
"""检查证书过期时间"""
try:
context = ssl.create_default_context()
with socket.create_connection((domain, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
# 解析过期时间
expire_date = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
days_left = (expire_date - datetime.now()).days
return {
'domain': domain,
'expire_date': expire_date,
'days_left': days_left,
'issuer': dict(x[0] for x in cert['issuer'])
}
except Exception as e:
return {
'domain': domain,
'error': str
(e)
}
defcheck_all(self):
"""检查所有域名"""
results = []
alerts = []
for domain inself.domains:
result = self.check_cert_expiry(domain)
results.append(result)
if'days_left'in result and result['days_left'] self.alert_days:
alerts.append(f"{domain} 证书将在 {result['days_left']} 天后过期")
return results, alerts
# 使用示例
domains = ['example.com', 'api.example.com', 'www.example.com']
checker = SSLCertChecker(domains)
results, alerts = checker.check_all()
for result in results:
if'days_left'in result:
print(f"{result['domain']}: 剩余 {result['days_left']} 天")
else:
print(f"{result['domain']}: 检查失败 - {result['error']}")
if alerts:
print("\n告警:")
for alert in alerts:
print(f" - {alert}")
运行结果:
example.com: 剩余 85 天
api.example.com: 剩余 12 天
www.example.com: 剩余 45 天
告警:
- api.example.com 证书将在 12 天后过期
◆ 案例二:Docker容器资源清理
场景描述:定期清理停止超过7天的容器、未使用的镜像和volume,释放磁盘空间。
实现代码:
#!/usr/bin/env python3
# 文件名:docker_cleanup.py
import subprocess
import json
from datetime import datetime, timedelta
classDockerCleaner:
def__init__(self, dry_run=True):
self.dry_run = dry_run
defget_stopped_containers(self, days=7):
"""获取停止超过N天的容器"""
cutoff_time = datetime.now() - timedelta(days=days)
cmd = "docker ps -a --format '{{json .}}'"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
stopped_containers = []
for line in result.stdout.strip().split('\n'):
ifnot line:
continue
container = json.loads(line)
if container['State'] != 'exited':
continue
# 获取容器详细信息
inspect_cmd = f"docker inspect {container['ID']}"
inspect_result = subprocess.run(inspect_cmd, shell=True, capture_output=True, text=True)
detail = json.loads(inspect_result.stdout)[0]
finished_at = datetime.fromisoformat(detail['State']['FinishedAt'].split('.')[0])
if finished_at < cutoff_time:
stopped_containers.append({
'id': container['ID'],
'name': container['Names'],
'finished_at': finished_at
})
return stopped_containers
defremove_containers(self, containers):
"""删除容器"""
for container in containers:
cmd = f"docker rm {container['id']}"
ifself.dry_run:
print(f"[DRY RUN] {cmd}")
else:
subprocess.run(cmd, shell=True)
print(f"已删除容器: {container['name']}")
defprune_images(self):
"""清理未使用的镜像"""
cmd = "docker image prune -a -f"
ifself.dry_run:
print(f"[DRY RUN] {cmd}")
else:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
print(result.stdout)
defprune_volumes(self):
"""清理未使用的volume"""
cmd = "docker volume prune -f"
ifself.dry_run:
print(f"[DRY RUN] {cmd}")
else:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
print(result.stdout)
defcleanup(self, container_days=7):
"""执行清理"""
print(f"开始清理(DRY RUN: {self.dry_run})")
# 清理容器
containers = self.get_stopped_containers(container_days)
print(
f"\n发现 {len(containers)} 个停止超过{container_days}天的容器")
self.remove_containers(containers)
# 清理镜像
print("\n清理未使用的镜像...")
self.prune_images()
# 清理volume
print("\n清理未使用的volume...")
self.prune_volumes()
# 使用示例
cleaner = DockerCleaner(dry_run=False)
cleaner.cleanup(container_days=7)
◆ 案例三:定时数据库备份
场景描述:每天凌晨2点自动备份MySQL数据库,保留最近30天的备份,自动清理过期文件。
实现步骤:
#!/usr/bin/env python3
# 文件名:mysql_backup.py
import os
import subprocess
from datetime import datetime, timedelta
import gzip
import shutil
classMySQLBackup:
def__init__(self, config):
self.host = config['host']
self.user = config['user']
self.password = config['password']
self.databases = config['databases']
self.backup_dir = config['backup_dir']
self.retention_days = config.get('retention_days', 30)
defbackup_database(self, database):
"""备份单个数据库"""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_file = f"{self.backup_dir}/
{database}_{timestamp}.sql"
# mysqldump命令
cmd = [
'mysqldump',
f'--host={self.host}',
f'--user={self.user}',
f'--password={self.password}',
'--single-transaction',
'--routines',
'--triggers',
'--events',
database
]
try:
withopen(backup_file, 'w') as f:
subprocess.run(cmd, stdout=f, check=True)
# 压缩
withopen(backup_file, 'rb') as f_in:
with gzip.open(f"{backup_file}.gz", 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(backup_file)
print(f"备份成功: {database} -> {backup_file}.gz")
returnf"{backup_file}.gz"
except subprocess.CalledProcessError as e:
print(f"备份失败: {database} - {str(e)}")
returnNone
defcleanup_old_backups(self):
"""清理过期备份"""
cutoff_time = datetime.now() - timedelta(days=self.retention_days)
for filename in os.listdir(self.backup_dir):
ifnot filename.endswith('.sql.gz'):
continue
file_path = os.path.join(self
.backup_dir, filename)
file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_time < cutoff_time:
os.remove(file_path)
print(f"删除过期备份: {filename}")
defrun(self):
"""执行备份"""
print(f"开始备份,时间: {datetime.now()}")
for database inself.databases:
self.backup_database(database)
self.cleanup_old_backups()
print("备份完成")
# 配置
config = {
'host': 'localhost',
'user': 'backup',
'password': 'your_password',
'databases': ['app_db', 'user_db'],
'backup_dir': '/data/mysql_backups',
'retention_days': 30
}
backup = MySQLBackup(config)
backup.run()
# 编辑crontab
crontab -e
# 添加定时任务(每天凌晨2点执行)
0 2 * * * /usr/bin/python3 /opt/scripts/mysql_backup.py >> /var/log/mysql_backup.log 2>&1
# 查看备份文件
ls -lh /data/mysql_backups/
# 测试恢复
gunzip < app_db_20250115_020001.sql.gz | mysql -u root -p app_db_test
四、最佳实践和注意事项
4.1 最佳实践
◆ 4.1.1 性能优化
-
• 并发控制:使用ThreadPoolExecutor时合理设置max_workers,避免过多并发导致系统负载过高
# 根据CPU核心数动态调整
import os
max_workers = min(32, (os.cpu_count() or1) * 4)
- • 连接池复用:对于MySQL、Redis等数据库连接,使用连接池减少建立连接的开销
from dbutils.pooled_db import PooledDB
import pymysql
pool = PooledDB(
creator=pymysql,
maxconnections=10,
host='localhost',
user='root',
password='password'
)
- • 批量操作:批量执行数据库写入或API调用,减少网络往返次数
# 批量插入
cursor.executemany(
"INSERT INTO logs (message, level) VALUES (%s, %s)",
[(msg, level) for msg, level in log_entries]
)
◆ 4.1.2 安全加固
- • 敏感信息管理:使用环境变量或密钥管理系统(如HashiCorp Vault)存储密码
import os
from dotenv import load_dotenv
load_dotenv()
DB_PASSWORD = os.getenv('DB_PASSWORD')
- • SSH密钥权限:确保私钥文件权限为600,防止被其他用户读取
chmod 600 ~/.ssh/ops_rsa
- • 输入验证:对用户输入进行严格校验,防止命令注入
import shlex
# 安全的命令参数处理
safe_command = shlex.quote(user_input)
◆ 4.1.3 高可用配置
- • 异常重试机制:网络操作添加重试逻辑,提高脚本鲁棒性
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
defapi_call():
response = requests.get('https://api.example.com')
response.raise_for_status()
return response.json()
- • 健康检查:长时间运行的脚本应定期检查依赖服务的健康状态
- • 备份策略:关键操作前先备份,如修改配置文件、删除数据等
import shutil
shutil.copy2('/etc/nginx/nginx.conf', '/etc/nginx/nginx.conf.backup')
4.2 注意事项
◆ 4.2.1 配置注意事项
- • 生产环境执行脚本前务必在测试环境充分验证,特别是涉及删除、修改操作的脚本
- • SSH操作时避免使用root用户,应创建专用运维账号并限制sudo权限
- • 定时任务的脚本应使用绝对路径,避免因环境变量差异导致执行失败
◆ 4.2.2 常见错误
| | |
|---|
|
| |
| | |
| | 检查防火墙规则,增加max_connections |
| | |
| | |
◆ 4.2.3 兼容性问题
- • 版本兼容:Python 3.6+引入了f-string,3.8+支持海象运算符,编写脚本时需考虑目标环境的Python版本
- • 平台兼容:路径处理使用os.path或pathlib,避免硬编码Windows或Linux路径分隔符
- • 组件依赖:paramiko 2.7+要求cryptography 3.0+,升级时需注意依赖版本兼容性
五、故障排查和监控
5.1 故障排查
◆ 5.1.1 日志查看
# 查看脚本执行日志
tail -f /var/log/ops/automation.log
# 查看crontab执行记录
grep CRON /var/log/syslog | tail -20
# 查看Python异常堆栈
grep -A 20 "Traceback" /var/log/ops/automation.log
◆ 5.1.2 常见问题排查
问题一:SSH连接超时
# 测试SSH连接
ssh -vvv -i ~/.ssh/ops_rsa root@192.168.1.100
# 检查防火墙
sudo iptables -L -n | grep 22
解决方案:
- 1. 检查目标服务器SSH服务是否运行:
systemctl status sshd - 2. 验证防火墙规则是否允许22端口:
firewall-cmd --list-all - 3. 确认网络连通性:
ping 192.168.1.100 - 4. 检查/etc/hosts.allow和/etc/hosts.deny配置
问题二:内存占用持续增长
# 监控Python进程内存
ps aux | grep python | sort -k4 -nr | head -5
# 使用memory_profiler分析
pip3 install memory-profiler
python3 -m memory_profiler script.py
解决方案:
问题三:脚本执行缓慢
- • 排查:使用cProfile分析性能瓶颈
import cProfile
cProfile.run('main()', sort='cumulative')
- • 解决:优化数据库查询(添加索引)、使用并发加速I/O密集型任务、减少不必要的日志输出
◆ 5.1.3 调试模式
# 开启详细日志
import logging
logging.basicConfig(level=logging.DEBUG)
# paramiko开启调试
paramiko.util.log_to_file('/tmp/paramiko.log', level=logging.DEBUG)
# requests显示HTTP请求详情
import http.client
http.client.HTTPConnection.debuglevel = 1
5.2 性能监控
◆ 5.2.1 关键指标监控
# 脚本执行时间监控
import time
from functools import wraps
deftiming_decorator(func):
@wraps(func)
defwrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
logger.info(f"{func.__name__} 执行耗时: {duration:.2f}秒")
return result
return wrapper
@timing_decorator
defbatch_operation():
# 批量操作逻辑
pass
# 监控脚本资源占用
top -p $(pgrep -f "python.*automation")
# 监控网络流量
iftop -i eth0
# 监控磁盘IO
iostat -x 1
◆ 5.2.2 监控指标说明
◆ 5.2.3 监控告警配置
# Prometheus告警规则示例
groups:
-name:python_automation_alerts
interval:30s
rules:
-alert:ScriptExecutionTimeout
expr:script_duration_seconds>600
for:5m
labels:
severity:warning
annotations:
summary:"脚本执行超时: {{ $labels.script_name }}"
description:"执行时间 {{ $value }}秒"
-alert:HighErrorRate
expr:rate(script_errors_total[5m])>0.05
for:5m
labels:
severity:critical
annotations:
summary:"脚本错误率过高"
description:"错误率 {{ $value | humanizePercentage }}"
5.3 备份与恢复
◆ 5.3.1 备份策略
#!/bin/bash
# 脚本文件备份脚本
BACKUP_DIR="/data/backups/scripts"
SOURCE_DIR="/opt/ops_scripts"
DATE=$(date +%Y%m%d)
# 创建备份目录
mkdir -p $BACKUP_DIR
# 打包备份
tar -czf $BACKUP_DIR/scripts_$DATE.tar.gz \
-C $SOURCE_DIR \
--exclude='*.pyc' \
--exclude='__pycache__' \
--exclude='.git' \
.
# 保留最近30天备份
find $BACKUP_DIR -name "scripts_*.tar.gz" -mtime +30 -delete
echo"备份完成: $BACKUP_DIR/scripts_$DATE.tar.gz"
◆ 5.3.2 恢复流程
- 1. 停止相关服务:
# 停止crontab任务
crontab -r
- 2. 恢复数据:
# 解压备份文件
tar -xzf /data/backups/scripts/scripts_20250115.tar.gz -C /opt/ops_scripts_restore
- 3. 验证完整性:
# 检查文件数量
diff -r /opt/ops_scripts /opt/ops_scripts_restore
# 测试脚本语法
python3 -m py_compile /opt/ops_scripts_restore/*.py
- 4. 重启服务:
# 恢复crontab
crontab /data/backups/crontab_backup.txt
六、总结
6.1 技术要点回顾
- • Python运维自动化的核心价值在于减少重复劳动、提高执行效率、降低人为错误率
- • 选择合适的库是成功的关键:paramiko处理SSH、psutil监控系统、schedule实现定时任务
- • 生产环境脚本必须具备完善的日志、异常处理、重试机制,确保故障可追溯、可恢复
- • 安全性不容忽视:密钥管理、权限控制、输入校验缺一不可
6.2 进阶学习方向
- 1. 异步编程优化:学习asyncio和aiohttp,将I/O密集型脚本改造为异步版本,大幅提升性能
- • 实践建议:改造批量HTTP请求脚本,对比同步和异步版本的性能差异
2. 容器化部署:将运维脚本打包为Docker镜像,实现跨环境一致性执行- • 实践建议:使用Alpine Linux作为基础镜像,减小镜像体积
3. 可观测性增强:集成OpenTelemetry实现分布式追踪,结合Grafana可视化脚本执行情况- • 学习资源:OpenTelemetry Python SDK
- • 实践建议:为关键脚本添加Trace和Metrics,建立运维自动化的观测体系
6.3 参考资料
- • Python官方文档 - 标准库和语言特性权威参考
- • Paramiko官方文档 - SSH自动化必备库
- • Real Python运维教程 - 实战导向的Python运维教程
- • Awesome Python - 精选Python运维相关库和工具
附录
A. 命令速查表
# Python环境管理
python3 -m venv /opt/venv # 创建虚拟环境
source /opt/venv/bin/activate # 激活虚拟环境
pip3 freeze > requirements.txt # 导出依赖
pip3 install -r requirements.txt # 安装依赖
# 常用运维命令
python3 batch_ssh_executor.py "df -h"# 批量检查磁盘
python3 log_analyzer.py # 分析日志
python3 system_monitor.py # 系统监控
python3 mysql_backup.py # 数据库备份
# 调试和性能分析
python3 -m pdb script.py # 调试脚本
python3 -m cProfile -o profile.stats script.py # 性能分析
python3 -m trace --count script.py # 代码覆盖率
# 代码质量检查
pylint script.py # 代码规范检查
black script.py # 代码格式化
mypy script.py # 类型检查
B. 配置参数详解
paramiko.SSHClient参数:
- •
timeout:连接超时时间(秒),默认无超时,建议设置10-30秒 - •
banner_timeout:SSH banner读取超时,默认15秒 - •
auth_timeout:认证超时,默认15秒 - •
allow_agent:是否使用SSH agent,默认True - •
look_for_keys:是否搜索~/.ssh/目录下的密钥,默认True
logging.RotatingFileHandler参数:
- •
maxBytes:单个日志文件最大字节数,建议100MB - •
backupCount:保留的日志文件数量,建议10-30个 - •
encoding:日志文件编码,建议utf-8 - •
delay:延迟创建文件直到第一次写入,默认False
ThreadPoolExecutor参数:
- •
max_workers:最大线程数,建议为CPU核心数的2-4倍 - •
thread_name_prefix:线程名称前缀,便于调试 - •
initializer:线程启动时执行的初始化函数
C. 术语表