Py学习  »  Python

偷懒指南:10个Python脚本搞定80%的重复工作

AirPython • 3 月前 • 1136 次点击  

来源:网络

一、概述

1.1 背景介绍

在运维工作中,大量重复性任务占据了工程师60%-80%的工作时间:日志分析、批量操作、监控告警、资源清理等。这些任务虽然简单,但手工执行效率低且易出错。Python凭借其简洁的语法、丰富的标准库和第三方模块,成为运维自动化的首选语言。本文将分享10个经过生产环境验证的Python脚本,帮助运维工程师从重复劳动中解放出来。

1.2 技术特点

  • • 开发效率高:Python语法简洁,开发周期仅为Shell脚本的1/3,适合快速实现运维需求
  • • 生态系统完善:提供paramiko、requests、psutil等成熟运维库,避免重复造轮轮
  • • 跨平台兼容:同一脚本可在Linux、Windows、macOS上运行,减少维护成本
  • • 易于维护扩展:代码可读性强,便于团队协作和功能迭代

1.3 适用场景

  • • 场景一:需要批量管理100台以上服务器的运维团队,如配置分发、命令执行、文件同步
  • • 场景二:日均处理GB级日志的业务系统,需要自动化分析异常、统计访问量、生成报表
  • • 场景三:多云环境资源管理,包括虚拟机、容器、存储的自动清理和成本优化
  • • 场景四:7x24小时监控场景,需要自动化健康检查、告警处理、故障自愈

1.4 环境要求

组件
版本要求
说明
操作系统
CentOS 7+/Ubuntu 18.04+
建议使用LTS版本
Python
3.8+
推荐3.10+,支持最新语法特性
pip
20.0+
用于安装依赖包
硬件配置
2C4G+
根据实际负载调整

二、详细步骤

2.1 准备工作

◆ 2.1.1 系统检查

# 检查Python版本
python3 --version

# 检查pip版本
pip3 --version

# 检查系统资源
free -h
df -h

◆ 2.1.2 安装依赖

# 升级pip
pip3 install --upgrade pip

# 安装常用运维库
pip3 install paramiko requests psutil schedule pymysql redis elasticsearch prometheus-client

# 验证安装
pip3 list | grep -E "paramiko|requests|psutil"

2.2 核心配置

◆ 2.2.1 配置SSH密钥认证

# 生成SSH密钥对
ssh-keygen -t rsa -b 4096 -f ~/.ssh/ops_rsa -N ""

# 分发公钥到目标服务器(示例)
ssh-copy-id -i ~/.ssh/ops_rsa.pub root@192.168.1.100

说明:使用密钥认证替代密码登录,提高安全性并支持批量操作。建议为运维脚本单独创建密钥对,便于权限管理和审计。

◆ 2.2.2 配置文件示例

# 配置文件:config.yml
servers:
-host:192.168.1.100
port:22
user:root
key_file:~/.ssh/ops_rsa
-host:192.168.1.101
port:22
user:root
key_file:~/.ssh/ops_rsa

mysql:
host:192.168.1.200
port:3306
user:monitor
password:your_password
database:ops

redis:
host:192.168.1.201
port:6379
password:your_redis_password
db:0

log:
level:INFO
file:/var/log/ops/automation.log
max_size:100# MB
backup_count:10

参数说明

  • • servers:目标服务器列表,支持批量操作
  • • mysql/redis:数据库连接信息,用于存储执行结果和状态
  • • log:日志配置,建议使用轮转避免磁盘占满

◆ 2.2.3 日志配置

# logging_config.py
import logging
from logging.handlers import RotatingFileHandler

defsetup_logger(log_file='/var/log/ops/automation.log', level=logging.INFO):
    logger = logging.getLogger('ops_automation')
    logger.setLevel(level)

# 轮转文件处理器
    handler = RotatingFileHandler(
        log_file,
        maxBytes=100*1024*1024,  # 100MB
        backupCount=10
    )

    formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)

return logger

2.3 启动和验证

◆ 2.3.1 基础测试

# 测试SSH连接
python3 -c "import paramiko; print('paramiko OK')"

# 测试配置文件读取
python3 -c "import yaml; print(yaml.safe_load(open('config.yml')))"

◆ 2.3.2 功能验证

# 验证SSH批量执行(示例脚本1)
python3 batch_ssh_executor.py "uptime"

# 预期输出
# [192.168.1.100] SUCCESS: 10:30:23 up 45 days, 2:15, 1 user, load average: 0.15, 0.10, 0.08
# [192.168.1.101] SUCCESS: 10:30:24 up 30 days, 5:20, 1 user, load average: 0.25, 0.20, 0.18

三、示例代码和配置

3.1 完整配置示例

◆ 3.1.1 脚本1:批量SSH命令执行器

#!/usr/bin/env python3
# 文件路径:batch_ssh_executor.py
"""
批量SSH命令执行器
支持并发执行、结果收集、异常处理
"""


import paramiko
import yaml
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from logging_config import setup_logger

logger = setup_logger()

classSSHExecutor:
def__init__(self, config_file='config.yml'):
withopen(config_file) as f:
self.config = yaml.safe_load(f)
self.servers = self .config['servers']

defexecute_on_host(self, server, command, timeout=30):
"""在单个主机上执行命令"""
        host = server['host']
try:
            client = paramiko.SSHClient()
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# 使用密钥认证
            key = paramiko.RSAKey.from_private_key_file(server['key_file'])
            client.connect(
                hostname=host,
                port=server['port'],
                username=server['user'],
                pkey=key,
                timeout=10
            )

            stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
            exit_code = stdout.channel.recv_exit_status()

            result = {
'host': host,
'success': exit_code == 0,
'stdout': stdout.read().decode('utf-8', errors='ignore').strip(),
'stderr': stderr.read().decode('utf-8', errors='ignore').strip(),
'exit_code': exit_code
            }

            client.close()
            logger.info(f"[{host}] Command executed, exit_code={exit_code}")
return result

except Exception as e:
            logger.error(f"[{host}] Error: {str(e)}")
return {
'host': host,
'success'False,
'stdout''',
'stderr'str(e),
'exit_code': -1
            }

defexecute_parallel(self, command, max_workers=10):
"""并发执行命令"""
        results = []
with ThreadPoolExecutor(max_workers=max_workers) as  executor:
            futures = {
                executor.submit(self.execute_on_host, server, command): server
for server inself.servers
            }

for future in as_completed(futures):
                results.append(future.result())

return results

defprint_results(self, results):
"""格式化输出结果"""
        success_count = sum(1for r in results if r['success'])
print(f"\n执行完成: 成功 {success_count}/{len(results)}\n")

for result insorted(results, key=lambda x: x['host']):
            status = "SUCCESS"if result['success'else"FAILED"
print(f"[{result['host']}{status}")
if result['stdout']:
print(f"  输出: {result['stdout']}")
if result['stderr']:
print(f"  错误: {result['stderr']}")
print()

if __name__ == '__main__':
iflen(sys.argv) 2:
print("用法: python3 batch_ssh_executor.py ''")
        sys.exit(1)

    command = sys.argv[1]
    executor = SSHExecutor()
    results = executor.execute_parallel(command)
    executor.print_results(results)

◆ 3.1.2 脚本2:日志分析与告警

#!/usr/bin/env python3
# 文件名:log_analyzer.py
"""
日志分析工具
功能:错误统计、异常检测、自动告警
"""


import re
import json
from collections import Counter, defaultdict
from datetime import datetime, timedelta
import requests
from logging_config import setup_logger

logger = setup_logger()

classLogAnalyzer:
def__init__(self, log_file):
self.log_file = log_file
self.error_patterns = {
'http_5xx'r'HTTP/\d\.\d"\s5\d{2}',
'exception'r'(Exception|Error|Fatal)',
'timeout'r'(timeout|timed out)',
'connection_refused'r'Connection refused',
'out_of_memory'r'(OutOfMemory|OOM|Cannot allocate memory)'
        }

defparse_nginx_log(self, line):
"""解析Nginx日志格式"""
        pattern = r'(\S+) - - \[(.*?)\] "(.*?)" (\d{3}) (\d+) "(.*?)" "(.*?)"'
match = re.match(pattern, line)
ifmatch:
return {
'ip'match.group(1),
'time'match.group(2),
'request'match.group(3),
'status'int(match.group(4)),
'size'int( match.group(5)),
'referer'match.group(6),
'user_agent'match.group(7)
            }
returnNone

defanalyze(self, time_window=60):
"""分析最近N分钟的日志"""
        now = datetime.now()
        cutoff_time = now - timedelta(minutes=time_window)

        stats = {
'total_requests'0,
'error_count': defaultdict(int),
'status_codes': Counter(),
'top_ips': Counter(),
'slow_requests': []
        }

withopen(self.log_file, 'r'as f:
for line in f:
                entry = self.parse_nginx_log(line)
ifnot entry:
continue

# 时间过滤
                log_time = datetime.strptime(entry['time'], '%d/%b/%Y:%H:%M:%S %z')
if log_time.replace(tzinfo=None) < cutoff_time:
continue

                stats['total_requests'] += 1
                stats['status_codes'][entry['status']] += 1
                stats['top_ips'][entry['ip']] += 1

# 错误检测
for error_type, pattern inself.error_patterns.items():
if re.search(pattern, line):
                        stats['error_count'][error_type] += 1

# 5xx错误记录
if500 <= entry['status'] 600:
                    stats[ 'slow_requests'].append({
'time': entry['time'],
'request': entry['request'],
'status': entry['status']
                    })

return stats

defcheck_alert_conditions(self, stats):
"""检查告警条件"""
        alerts = []

# 5xx错误率超过5%
if stats['total_requests'] > 0:
            error_5xx = sum(count for code, count in stats['status_codes'].items()
if500 <= code 600)
            error_rate = error_5xx / stats['total_requests']
if error_rate > 0.05:
                alerts.append({
'level''critical',
'message'f'5xx错误率: {error_rate*100:.2f}% ({error_5xx}/{stats["total_requests"]})'
                })

# OOM错误
if stats['error_count']['out_of_memory'] > 0:
            alerts.append({
'level''critical',
'message'f'检测到OOM错误: {stats["error_count"]["out_of_memory"]}次'
            })

# 连接超时
if stats['error_count']['timeout'] > 100:
            alerts.append({
'level''warning',
'message'f'超时错误异常: {stats["error_count"]["timeout"]} 次'
            })

return alerts

defsend_alert(self, alerts, webhook_url):
"""发送告警到企业微信/钉钉"""
ifnot alerts:
return

        message = "【日志告警】\n" + "\n".join(
f"[{a['level'].upper()}{a['message']}"for a in alerts
        )

        payload = {
"msgtype""text",
"text": {"content": message}
        }

try:
            response = requests.post(webhook_url, json=payload, timeout=5)
if response.status_code == 200:
                logger.info("告警发送成功")
else:
                logger.error(f"告警发送失败: {response.status_code}")
except Exception as e:
            logger.error(f"告警发送异常: {str(e)}")

if __name__ == '__main__':
    analyzer = LogAnalyzer('/var/log/nginx/access.log')
    stats = analyzer.analyze(time_window=5)

print(f"总请求数: {stats['total_requests']}")
print(f"状态码分布: {dict(stats['status_codes'])}")
print(f"Top 10 IP: {stats['top_ips'].most_common(10)}")
print(f"错误统计: {dict(stats['error_count'])}")

    alerts = analyzer.check_alert_conditions(stats)
if alerts:
print("\n触发告警:")
for alert in alerts:
print(f"  [{alert['level']}{alert['message']}")

# 发送告警(替换为实际webhook地址)
# analyzer.send_alert(alerts, 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=xxx')

◆ 3.1.3 脚本3:系统资源监控

#!/usr/bin/env python3
# 文件名:system_monitor.py
"""
系统资源监控
监控CPU、内存、磁盘、网络,支持Prometheus集成
"""


import psutil
import time
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from logging_config import setup_logger

logger = setup_logger()

classSystemMonitor:
def__init__(self, pushgateway_url='localhost:9091', job_name='system_monitor'):
self.pushgateway_url = pushgateway_url
self.job_name = job_name
self.registry = CollectorRegistry()

# 定义指标
self.cpu_gauge = Gauge('system_cpu_percent''CPU使用率', registry=self.registry)
self.memory_gauge = Gauge('system_memory_percent''内存使用率', registry=self.registry)
self.disk_gauge = Gauge('system_disk_percent''磁盘使用率',
                               ['mountpoint'], registry=self.registry)
self.network_gauge = Gauge('system_network_bytes''网络流量',
                                  ['interface''direction'], registry=self.registry)

defcollect_metrics(self):
"""采集系统指标"""
        metrics = {}

# CPU
        cpu_percent = psutil.cpu_percent(interval=1)
        metrics['cpu'] = cpu_percent
self.cpu_gauge.set(cpu_percent)

# 内存
        memory = psutil.virtual_memory()
        metrics['memory'] = {
'percent': memory.percent,
'total': memory.total,
'available': memory.available,
'used': memory.used
        }
self.memory_gauge.set(memory.percent)

# 磁盘
        metrics['disk'] = {}
for partition in psutil.disk_partitions():
try:
                usage = psutil.disk_usage(partition.mountpoint)
                metrics['disk'][partition.mountpoint] = {
'percent': usage.percent,
'total': usage.total,
'used': usage.used,
'free': usage.free
                }
self.disk_gauge.labels(mountpoint=partition.mountpoint).set(usage.percent)
except PermissionError:
continue

# 网络
        net_io = psutil.net_io_counters(pernic=True)
        metrics['network'] = {}
for interface, stats in net_io.items():
            metrics['network'][interface] = {
'bytes_sent': stats.bytes_sent,
'bytes_recv': stats.bytes_recv
            }
self.network_gauge.labels(interface=interface, direction='sent').set(stats.bytes_sent)
self.network_gauge.labels(interface=interface, direction='recv').set(stats.bytes_recv)

return metrics

defcheck_thresholds(self, metrics):
"""检查阈值告警"""
        alerts = []

if metrics[ 'cpu'] > 80:
            alerts.append(f"CPU使用率过高: {metrics['cpu']:.1f}%")

if metrics['memory']['percent'] > 85:
            alerts.append(f"内存使用率过高: {metrics['memory']['percent']:.1f}%")

for mount, stats in metrics['disk'].items():
if stats['percent'] > 90:
                alerts.append(f"磁盘空间不足: {mount} ({stats['percent']:.1f}%)")

return alerts

defpush_metrics(self):
"""推送指标到Pushgateway"""
try:
            push_to_gateway(self.pushgateway_url, job=self.job_name, registry=self.registry)
            logger.info("指标推送成功")
except Exception as e:
            logger.error(f"指标推送失败: {str(e)}")

defrun(self, interval=60):
"""持续监控"""
        logger.info(f"开始监控,采集间隔: {interval}秒")
whileTrue:
try:
                metrics = self.collect_metrics()
                alerts = self.check_thresholds(metrics)

if alerts:
                    logger.warning("触发告警: " + "; ".join(alerts))

self.push_metrics()
                time.sleep(interval)

except KeyboardInterrupt:
                logger.info("监控停止")
break
except Exception as e:
                logger.error(f"监控异常: {str(e)}")
                time.sleep(interval)

if __name__ == '__main__':
    monitor = SystemMonitor()

# 单次采集
    metrics = monitor.collect_metrics()
print(f"CPU: {metrics['cpu']:.1f}%")
print(f"内存: {metrics['memory']['percent']:.1f}%")
print("磁盘:")
for mount, stats in metrics['disk'].items():
print(f"  {mount}{stats['percent']:.1f}%")

# 持续监控(取消注释启用)
# monitor.run(interval=60)

◆ 3.1.4 脚本4:MySQL慢查询分析

#!/usr/bin/env python3
# 文件名:mysql_slow_query_analyzer.py
"""
MySQL慢查询分析
解析慢查询日志,生成优化建议
"""


import re
import pymysql
from collections import defaultdict
from logging_config import setup_logger

logger = setup_logger()

classSlowQueryAnalyzer:
def__init__(self, slow_log_file, db_config):
self.slow_log_file = slow_log_file
self.db_config = db_config
self.queries = []

defparse_slow_log (self):
"""解析慢查询日志"""
        current_query = {}

withopen(self.slow_log_file, 'r'as f:
for line in f:
# Time行
if line.startswith('# Time:'):
if current_query:
self.queries.append(current_query)
                    current_query = {'time': line.split(':'1)[1].strip()}

# User@Host行
elif line.startswith('# User@Host:'):
match = re.search(r'(\w+)\[(\w+)\] @ (\S+)', line)
ifmatch:
                        current_query['user'] = match.group(1)
                        current_query['host'] = match.group(3)

# Query_time行
elif line.startswith('# Query_time:'):
match = re.search(
r'Query_time: ([\d.]+)\s+Lock_time: ([\d.]+)\s+Rows_sent: (\d+)\s+Rows_examined: (\d+)',
                        line
                    )
ifmatch:
                        current_query['query_time'] = float(match.group(1))
                        current_query['lock_time'] = float(match.group(2))
                        current_query['rows_sent'] = int(match.group(3))
                        current_query['rows_examined'] = int(match.group(4))

# SQL语句
elifnot line.startswith('#' and line.strip():
                    current_query['sql'] = current_query.get('sql''') + line.strip() + ' '

if current_query:
self.queries.append(current_query)

        logger.info(f"解析完成,共 {len(self.queries)} 条慢查询")

defanalyze(self):
"""分析慢查询"""
        stats = {
'total'len(self.queries),
'avg_query_time'0,
'max_query_time'0,
'top_queries': [],
'table_scan': []
        }

ifnotself.queries:
return stats

# 基础统计
        total_time = sum(q['query_time'for q inself.queries)
        stats['avg_query_time'] = total_time / len(self.queries)
        stats['max_query_time'] = max(q['query_time'for q inself.queries)

# Top 10耗时查询
        sorted_queries = sorted(self.queries, key=lambda x: x['query_time'], reverse=True)
        stats['top_queries'] = sorted_queries[:10]

# 全表扫描检测(rows_examined > 10000)
        stats['table_scan'] = [
            q for q inself.queries
if q.get('rows_examined'0) > 10000
        ]

return stats

defget_explain_plan(self, sql):
"""获取EXPLAIN执行计划"""
try:
            conn = pymysql.connect(**self.db_config)
            cursor = conn.cursor()

            cursor.execute(f"EXPLAIN {sql}")
            result = cursor.fetchall()

            cursor.close()
            conn.close()

return result
except Exception as e:
            logger.error(f"EXPLAIN失败: {str(e)}")
returnNone

defgenerate_report(self, stats):
"""生成分析报告"""
        report = []
        report.append("=" * 80)
        report.append("MySQL慢查询分析报告")
        report.append("=" * 80)
        report.append(f"总慢查询数: {stats['total']}")
        report.append(f"平均查询时间: {stats['avg_query_time']:.2f}秒")
        report.append(f"最大查询时间: {stats['max_query_time']:.2f}秒")
        report.append("")

        report.append("Top 10耗时查询:")
for i, query inenumerate(stats['top_queries'], 1):
            report.append(f"\n{i}. 查询时间: {query['query_time']:.2f}秒")
            report.append(f"   扫描行数: {query.get('rows_examined'0)}")
            report.append(f"   SQL: {query.get( 'sql''')[:200]}")

if stats['table_scan']:
            report.append(f"\n发现 {len(stats['table_scan'])} 个全表扫描查询")
for query in stats['table_scan'][:5]:
                report.append(f"  - {query.get('sql''')[:100]}")

return"\n".join(report)

if __name__ == '__main__':
    db_config = {
'host''localhost',
'user''root',
'password''your_password',
'database''test'
    }

    analyzer = SlowQueryAnalyzer('/var/lib/mysql/slow.log', db_config)
    analyzer.parse_slow_log()
    stats = analyzer.analyze()

print(analyzer.generate_report(stats))

◆ 3.1.5 脚本5:文件同步工具

#!/usr/bin/env python3
# 文件名:file_sync.py
"""
文件同步工具
支持增量同步、断点续传、校验
"""


import os
import hashlib
import paramiko
from pathlib import Path
from logging_config import setup_logger

logger = setup_logger()

classFileSync:
def__init__(self, ssh_config):
self.ssh_config = ssh_config
self.client = None
self.sftp =  None

defconnect(self):
"""建立SSH连接"""
try:
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

            key = paramiko.RSAKey.from_private_key_file(self.ssh_config['key_file'])
self.client.connect(
                hostname=self.ssh_config['host'],
                port=self.ssh_config['port'],
                username=self.ssh_config['user'],
                pkey=key
            )

self.sftp = self.client.open_sftp()
            logger.info(f"连接成功: {self.ssh_config['host']}")

except Exception as e:
            logger.error(f"连接失败: {str(e)}")
raise

defdisconnect(self):
"""关闭连接"""
ifself.sftp:
self.sftp.close()
ifself.client:
self.client.close()

defcalculate_md5(self, file_path):
"""计算文件MD5"""
        hash_md5 = hashlib.md5()
withopen(file_path, "rb"as f:
for chunk initer(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
return hash_md5.hexdigest()

defremote_file_exists(self, remote_path):
"""检查远程文件是否存在"""
try:
self.sftp.stat(remote_path)
returnTrue
except FileNotFoundError:
returnFalse

defsync_file(self, local_path, remote_path, check_md5=True):
"""同步单个文件"""
try:
# 确保远程目录存在
            remote_dir = os.path.dirname(remote_path)
try:
self.sftp.stat(remote_dir)
except FileNotFoundError:
self._create_remote_dir(remote_dir)

# MD5校验
            need_upload = True
if check_md5 andself.remote_file_exists(remote_path):
                local_md5 = self.calculate_md5(local_path)
# 远程MD5计算(需要执行命令)
                stdin, stdout, stderr = self.client.exec_command(f"md5sum {remote_path}")
                remote_md5 = stdout.read().decode().split()[0]

if local_md5 == remote_md5:
                    logger.info(f"文件未变化,跳过: {local_path}")
                    need_upload = False

if need_upload:
self.sftp.put(local_path, remote_path)
                logger.info(f"上传成功: {local_path} -> {remote_path}")
returnTrue

returnFalse

except Exception as e:
            logger.error(f"同步失败 {local_path}{str(e)}")
returnFalse

def_create_remote_dir(self, remote_dir):
"""递归创建远程目录"""
        dirs = []
while remote_dir != '/':
            dirs.append(remote_dir)
            remote_dir = os.path.dirname(remote_dir)

for dir_path inreversed(dirs):
try:
self .sftp.stat(dir_path)
except FileNotFoundError:
self.sftp.mkdir(dir_path)
                logger.info(f"创建目录: {dir_path}")

defsync_directory(self, local_dir, remote_dir, exclude_patterns=None):
"""同步整个目录"""
        exclude_patterns = exclude_patterns or []
        synced_count = 0
        skipped_count = 0

for root, dirs, files in os.walk(local_dir):
# 计算相对路径
            rel_path = os.path.relpath(root, local_dir)
            remote_root = os.path.join(remote_dir, rel_path).replace('\\''/')

for file in files:
# 排除规则
ifany(pattern in file for pattern in exclude_patterns):
continue

                local_file = os.path.join(root, file)
                remote_file = os.path.join(remote_root, file).replace('\\''/')

ifself.sync_file(local_file, remote_file):
                    synced_count += 1
else:
                    skipped_count += 1

        logger.info(f"同步完成: 上传{synced_count}个文件,跳过{skipped_count}个")

if __name__ == '__main__':
    ssh_config = {
'host''192.168.1.100',
'port'22,
'user''root',
'key_file''~/.ssh/ops_rsa'
    }

    sync = FileSync(ssh_config)
    sync.connect()

# 同步单个文件
# sync.sync_file('/local/config.yml', '/remote/config.yml')

# 同步目录
    sync.sync_directory(
'/local/app',
'/remote/app',
        exclude_patterns=['.git''.pyc' '__pycache__']
    )

    sync.disconnect()

3.2 实际应用案例

◆ 案例一:自动化证书续期检查

场景描述:管理100+个域名的SSL证书,需要提前30天发现即将过期的证书并告警。

实现代码

#!/usr/bin/env python3
# 文件名:ssl_cert_checker.py

import ssl
import socket
from datetime import datetime, timedelta
import requests

classSSLCertChecker:
def__init__(self, domains, alert_days=30):
self.domains = domains
self.alert_days = alert_days

defcheck_cert_expiry(self, domain, port=443):
"""检查证书过期时间"""
try:
            context = ssl.create_default_context()
with socket.create_connection((domain, port), timeout=10as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
                    cert = ssock.getpeercert()

# 解析过期时间
            expire_date = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
            days_left = (expire_date - datetime.now()).days

return {
'domain': domain,
'expire_date': expire_date,
'days_left': days_left,
'issuer'dict(x[0for x in cert['issuer'])
            }

except Exception as e:
return {
'domain': domain,
'error'str (e)
            }

defcheck_all(self):
"""检查所有域名"""
        results = []
        alerts = []

for domain inself.domains:
            result = self.check_cert_expiry(domain)
            results.append(result)

if'days_left'in result and result['days_left'] self.alert_days:
                alerts.append(f"{domain} 证书将在 {result['days_left']} 天后过期")

return results, alerts

# 使用示例
domains = ['example.com''api.example.com''www.example.com']
checker = SSLCertChecker(domains)
results, alerts = checker.check_all()

for result in results:
if'days_left'in result:
print(f"{result['domain']}: 剩余 {result['days_left']} 天")
else:
print(f"{result['domain']}: 检查失败 - {result['error']}")

if alerts:
print("\n告警:")
for alert in alerts:
print(f"  - {alert}")

运行结果

example.com: 剩余 85 天
api.example.com: 剩余 12 天
www.example.com: 剩余 45 天

告警:
  - api.example.com 证书将在 12 天后过期

◆ 案例二:Docker容器资源清理

场景描述:定期清理停止超过7天的容器、未使用的镜像和volume,释放磁盘空间。

实现代码

#!/usr/bin/env python3
# 文件名:docker_cleanup.py

import subprocess
import json
from datetime import datetime, timedelta

classDockerCleaner:
def__init__(self, dry_run=True):
self.dry_run = dry_run

defget_stopped_containers(self, days=7):
"""获取停止超过N天的容器"""
        cutoff_time = datetime.now() - timedelta(days=days)

        cmd = "docker ps -a --format '{{json .}}'"
        result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

        stopped_containers = []
for line in result.stdout.strip().split('\n'):
ifnot line:
continue

            container = json.loads(line)
if container['State'] != 'exited':
continue

# 获取容器详细信息
            inspect_cmd = f"docker inspect {container['ID']}"
            inspect_result = subprocess.run(inspect_cmd, shell=True, capture_output=True, text=True)
            detail = json.loads(inspect_result.stdout)[0]

            finished_at = datetime.fromisoformat(detail['State']['FinishedAt'].split('.')[0])
if finished_at < cutoff_time:
                stopped_containers.append({
'id': container['ID'],
'name': container['Names'],
'finished_at': finished_at
                })

return stopped_containers

defremove_containers(self, containers):
"""删除容器"""
for container in containers:
            cmd = f"docker rm {container['id']}"
ifself.dry_run:
print(f"[DRY RUN] {cmd}")
else:
                subprocess.run(cmd, shell=True)
print(f"已删除容器: {container['name']}")

defprune_images(self):
"""清理未使用的镜像"""
        cmd = "docker image prune -a -f"
ifself.dry_run:
print(f"[DRY RUN] {cmd}")
else:
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
print(result.stdout)

defprune_volumes(self):
"""清理未使用的volume"""
        cmd = "docker volume prune -f"
ifself.dry_run:
print(f"[DRY RUN] {cmd}")
else:
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
print(result.stdout)

defcleanup(self, container_days=7):
"""执行清理"""
print(f"开始清理(DRY RUN: {self.dry_run})")

# 清理容器
        containers = self.get_stopped_containers(container_days)
print( f"\n发现 {len(containers)} 个停止超过{container_days}天的容器")
self.remove_containers(containers)

# 清理镜像
print("\n清理未使用的镜像...")
self.prune_images()

# 清理volume
print("\n清理未使用的volume...")
self.prune_volumes()

# 使用示例
cleaner = DockerCleaner(dry_run=False)
cleaner.cleanup(container_days=7)

◆ 案例三:定时数据库备份

场景描述:每天凌晨2点自动备份MySQL数据库,保留最近30天的备份,自动清理过期文件。

实现步骤

  1. 1. 创建备份脚本
#!/usr/bin/env python3
# 文件名:mysql_backup.py

import os
import subprocess
from datetime import datetime, timedelta
import gzip
import shutil

classMySQLBackup:
def__init__(self, config):
self.host = config['host']
self.user = config['user']
self.password = config['password']
self.databases = config['databases']
self.backup_dir = config['backup_dir']
self.retention_days = config.get('retention_days'30)

defbackup_database(self, database):
"""备份单个数据库"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_file = f"{self.backup_dir}/ {database}_{timestamp}.sql"

# mysqldump命令
        cmd = [
'mysqldump',
f'--host={self.host}',
f'--user={self.user}',
f'--password={self.password}',
'--single-transaction',
'--routines',
'--triggers',
'--events',
            database
        ]

try:
withopen(backup_file, 'w'as f:
                subprocess.run(cmd, stdout=f, check=True)

# 压缩
withopen(backup_file, 'rb'as f_in:
with gzip.open(f"{backup_file}.gz"'wb'as f_out:
                    shutil.copyfileobj(f_in, f_out)

            os.remove(backup_file)
print(f"备份成功: {database} -> {backup_file}.gz")
returnf"{backup_file}.gz"

except subprocess.CalledProcessError as e:
print(f"备份失败: {database} - {str(e)}")
returnNone

defcleanup_old_backups(self):
"""清理过期备份"""
        cutoff_time = datetime.now() - timedelta(days=self.retention_days)

for filename in os.listdir(self.backup_dir):
ifnot filename.endswith('.sql.gz'):
continue

            file_path = os.path.join(self .backup_dir, filename)
            file_time = datetime.fromtimestamp(os.path.getmtime(file_path))

if file_time < cutoff_time:
                os.remove(file_path)
print(f"删除过期备份: {filename}")

defrun(self):
"""执行备份"""
print(f"开始备份,时间: {datetime.now()}")

for database inself.databases:
self.backup_database(database)

self.cleanup_old_backups()
print("备份完成")

# 配置
config = {
'host''localhost',
'user''backup',
'password''your_password',
'databases': ['app_db''user_db'],
'backup_dir''/data/mysql_backups',
'retention_days'30
}

backup = MySQLBackup(config)
backup.run()
  1. 2. 配置crontab定时任务
# 编辑crontab
crontab -e

# 添加定时任务(每天凌晨2点执行)
0 2 * * * /usr/bin/python3 /opt/scripts/mysql_backup.py >> /var/log/mysql_backup.log 2>&1
  1. 3. 验证备份
# 查看备份文件
ls -lh /data/mysql_backups/

# 测试恢复
gunzip < app_db_20250115_020001.sql.gz | mysql -u root -p app_db_test

四、最佳实践和注意事项

4.1 最佳实践

◆ 4.1.1 性能优化

  • • 并发控制:使用ThreadPoolExecutor时合理设置max_workers,避免过多并发导致系统负载过高
    # 根据CPU核心数动态调整
    import os
    max_workers = min(32, (os.cpu_count() or1) * 4)
  • • 连接池复用:对于MySQL、Redis等数据库连接,使用连接池减少建立连接的开销
    from dbutils.pooled_db import PooledDB
    import pymysql

    pool = PooledDB(
        creator=pymysql,
        maxconnections=10,
        host='localhost',
        user='root',
        password='password'
    )
  • • 批量操作:批量执行数据库写入或API调用,减少网络往返次数
    # 批量插入
    cursor.executemany(
    "INSERT INTO logs (message, level) VALUES (%s, %s)",
        [(msg, level) for msg, level in log_entries]
    )

◆ 4.1.2 安全加固

  • • 敏感信息管理:使用环境变量或密钥管理系统(如HashiCorp Vault)存储密码
    import os
    from dotenv import load_dotenv

    load_dotenv()
    DB_PASSWORD = os.getenv('DB_PASSWORD')
  • • SSH密钥权限:确保私钥文件权限为600,防止被其他用户读取
    chmod 600 ~/.ssh/ops_rsa
  • • 输入验证:对用户输入进行严格校验,防止命令注入
    
    
    
        
    import shlex

    # 安全的命令参数处理
    safe_command = shlex.quote(user_input)

◆ 4.1.3 高可用配置

  • • 异常重试机制:网络操作添加重试逻辑,提高脚本鲁棒性
    from tenacity import retry, stop_after_attempt, wait_exponential

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1min=2max=10))
    defapi_call():
        response = requests.get('https://api.example.com')
        response.raise_for_status()
    return response.json()
  • • 健康检查:长时间运行的脚本应定期检查依赖服务的健康状态
  • • 备份策略:关键操作前先备份,如修改配置文件、删除数据等
    import shutil
    shutil.copy2('/etc/nginx/nginx.conf''/etc/nginx/nginx.conf.backup')

4.2 注意事项

◆ 4.2.1 配置注意事项

  • • 生产环境执行脚本前务必在测试环境充分验证,特别是涉及删除、修改操作的脚本
  • • SSH操作时避免使用root用户,应创建专用运维账号并限制sudo权限
  • • 日志文件需配置轮转,防止占满磁盘空间
  • • 定时任务的脚本应使用绝对路径,避免因环境变量差异导致执行失败

◆ 4.2.2 常见错误

错误现象
原因分析
解决方案
paramiko认证失败
SSH密钥权限错误或路径不对
检查私钥文件权限为600,路径使用绝对路径
UnicodeDecodeError
日志文件包含非UTF-8字符
读取时使用errors='ignore’参数
数据库连接超时
防火墙阻止或连接数达到上限
检查防火墙规则,增加max_connections
crontab定时任务未执行
环境变量缺失
脚本中显式设置PATH或使用绝对路径
磁盘空间不足导致脚本异常终止
未监控磁盘使用率
添加磁盘空间检查,低于阈值发送告警

◆ 4.2.3 兼容性问题

  • • 版本兼容:Python 3.6+引入了f-string,3.8+支持海象运算符,编写脚本时需考虑目标环境的Python版本
  • • 平台兼容:路径处理使用os.path或pathlib,避免硬编码Windows或Linux路径分隔符
  • • 组件依赖:paramiko 2.7+要求cryptography 3.0+,升级时需注意依赖版本兼容性

五、故障排查和监控

5.1 故障排查

◆ 5.1.1 日志查看

# 查看脚本执行日志
tail -f /var/log/ops/automation.log

# 查看crontab执行记录
grep CRON /var/log/syslog | tail -20

# 查看Python异常堆栈
grep -A 20 "Traceback" /var/log/ops/automation.log

◆ 5.1.2 常见问题排查

问题一:SSH连接超时

# 测试SSH连接
ssh -vvv -i ~/.ssh/ops_rsa root@192.168.1.100

# 检查防火墙
sudo iptables -L -n | grep 22

解决方案

  1. 1. 检查目标服务器SSH服务是否运行:systemctl status sshd
  2. 2. 验证防火墙规则是否允许22端口:firewall-cmd --list-all
  3. 3. 确认网络连通性:ping 192.168.1.100
  4. 4. 检查/etc/hosts.allow和/etc/hosts.deny配置

问题二:内存占用持续增长

# 监控Python进程内存
ps aux | grep python | sort -k4 -nr | head -5

# 使用memory_profiler分析
pip3 install memory-profiler
python3 -m memory_profiler script.py

解决方案

  1. 1. 检查是否存在循环引用导致对象无法释放
  2. 2. 大文件读取改用生成器或分块处理
  3. 3. 及时关闭数据库连接和文件句柄
  4. 4. 使用del显式删除大对象

问题三:脚本执行缓慢

  • • 症状:批量操作耗时远超预期,CPU使用率低
  • • 排查:使用cProfile分析性能瓶颈
    import cProfile
    cProfile.run('main()', sort='cumulative')
  • • 解决:优化数据库查询(添加索引)、使用并发加速I/O密集型任务、减少不必要的日志输出

◆ 5.1.3 调试模式

# 开启详细日志
import logging
logging.basicConfig(level=logging.DEBUG)

# paramiko开启调试
paramiko.util.log_to_file('/tmp/paramiko.log', level=logging.DEBUG)

# requests显示HTTP请求详情
import http.client
http.client.HTTPConnection.debuglevel = 1

5.2 性能监控

◆ 5.2.1 关键指标监控

# 脚本执行时间监控
import time
from functools import wraps

deftiming_decorator(func):
    @wraps(func)
defwrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        duration = time.time() - start
        logger.info(f"{func.__name__} 执行耗时: {duration:.2f}秒")
return result
return wrapper

@timing_decorator
defbatch_operation():
# 批量操作逻辑
pass
# 监控脚本资源占用
top -p $(pgrep -f "python.*automation")

# 监控网络流量
iftop -i eth0

# 监控磁盘IO
iostat -x 1

◆ 5.2.2 监控指标说明

指标名称
正常范围
告警阈值
说明
脚本执行时间
<5分钟
>10分钟
超时可能是网络问题或死锁
内存使用率
<70%
>85%
持续增长可能存在内存泄露
错误率
<1%
>5%
SSH失败、API调用失败等
并发数
10-50
>100
过高并发可能导致目标服务器过载
日志大小
<100MB/天
>1GB/天
异常日志输出需检查代码逻辑

◆ 5.2.3 监控告警配置

# Prometheus告警规则示例
groups:
-name:python_automation_alerts
interval:30s
rules:
-alert:ScriptExecutionTimeout
expr:script_duration_seconds>600
for:5m
labels:
severity:warning
annotations:
summary:"脚本执行超时: {{ $labels.script_name }}"
description:"执行时间 {{ $value }}秒"

-alert:HighErrorRate
expr:rate(script_errors_total[5m])>0.05
for:5m
labels:
severity:critical
annotations:
summary:"脚本错误率过高"
description:"错误率 {{ $value | humanizePercentage }}"

5.3 备份与恢复

◆ 5.3.1 备份策略




    
#!/bin/bash
# 脚本文件备份脚本

BACKUP_DIR="/data/backups/scripts"
SOURCE_DIR="/opt/ops_scripts"
DATE=$(date +%Y%m%d)

# 创建备份目录
mkdir -p $BACKUP_DIR

# 打包备份
tar -czf $BACKUP_DIR/scripts_$DATE.tar.gz \
    -C $SOURCE_DIR \
    --exclude='*.pyc' \
    --exclude='__pycache__' \
    --exclude='.git' \
    .

# 保留最近30天备份
find $BACKUP_DIR -name "scripts_*.tar.gz" -mtime +30 -delete

echo"备份完成: $BACKUP_DIR/scripts_$DATE.tar.gz"

◆ 5.3.2 恢复流程

  1. 1. 停止相关服务
    # 停止crontab任务
    crontab -r
  2. 2. 恢复数据
    # 解压备份文件
    tar -xzf /data/backups/scripts/scripts_20250115.tar.gz -C /opt/ops_scripts_restore
  3. 3. 验证完整性
    # 检查文件数量
    diff -r /opt/ops_scripts /opt/ops_scripts_restore

    # 测试脚本语法
    python3 -m py_compile /opt/ops_scripts_restore/*.py
  4. 4. 重启服务
    # 恢复crontab
    crontab /data/backups/crontab_backup.txt

六、总结

6.1 技术要点回顾

  • • Python运维自动化的核心价值在于减少重复劳动、提高执行效率、降低人为错误率
  • • 选择合适的库是成功的关键:paramiko处理SSH、psutil监控系统、schedule实现定时任务
  • • 生产环境脚本必须具备完善的日志、异常处理、重试机制,确保故障可追溯、可恢复
  • • 安全性不容忽视:密钥管理、权限控制、输入校验缺一不可

6.2 进阶学习方向

  1. 1. 异步编程优化:学习asyncio和aiohttp,将I/O密集型脚本改造为异步版本,大幅提升性能
  • • 学习资源:Python官方asyncio文档
  • • 实践建议:改造批量HTTP请求脚本,对比同步和异步版本的性能差异
  • 2. 容器化部署:将运维脚本打包为Docker镜像,实现跨环境一致性执行
    • • 学习资源:Docker最佳实践
    • • 实践建议:使用Alpine Linux作为基础镜像,减小镜像体积
  • 3. 可观测性增强:集成OpenTelemetry实现分布式追踪,结合Grafana可视化脚本执行情况
    • • 学习资源:OpenTelemetry Python SDK
    • • 实践建议:为关键脚本添加Trace和Metrics,建立运维自动化的观测体系

    6.3 参考资料

    • • Python官方文档 - 标准库和语言特性权威参考
    • • Paramiko官方文档 - SSH自动化必备库
    • • psutil文档 - 系统监控和进程管理
    • • Real Python运维教程 - 实战导向的Python运维教程
    • • Awesome Python - 精选Python运维相关库和工具

    附录

    A. 命令速查表

    # Python环境管理
    python3 -m venv /opt/venv  # 创建虚拟环境
    source /opt/venv/bin/activate  # 激活虚拟环境
    pip3 freeze > requirements.txt  # 导出依赖
    pip3 install -r requirements.txt  # 安装依赖

    # 常用运维命令
    python3 batch_ssh_executor.py "df -h"# 批量检查磁盘
    python3 log_analyzer.py  # 分析日志
    python3 system_monitor.py  # 系统监控
    python3 mysql_backup.py  # 数据库备份

    # 调试和性能分析
    python3 -m pdb script.py  # 调试脚本
    python3 -m cProfile -o profile.stats script.py  # 性能分析
    python3 -m trace --count script.py  # 代码覆盖率

    # 代码质量检查
    pylint script.py  # 代码规范检查
    black script.py  # 代码格式化
    mypy script.py  # 类型检查

    B. 配置参数详解

    paramiko.SSHClient参数

    • • timeout:连接超时时间(秒),默认无超时,建议设置10-30秒
    • • banner_timeout:SSH banner读取超时,默认15秒
    • • auth_timeout:认证超时,默认15秒
    • • allow_agent:是否使用SSH agent,默认True
    • • look_for_keys:是否搜索~/.ssh/目录下的密钥,默认True

    logging.RotatingFileHandler参数

    • • maxBytes:单个日志文件最大字节数,建议100MB
    • • backupCount:保留的日志文件数量,建议10-30个
    • • encoding:日志文件编码,建议utf-8
    • • delay:延迟创建文件直到第一次写入,默认False

    ThreadPoolExecutor参数

    • • max_workers:最大线程数,建议为CPU核心数的2-4倍
    • • thread_name_prefix:线程名称前缀,便于调试
    • • initializer:线程启动时执行的初始化函数

    C. 术语表

    术语
    英文
    解释
    幂等性
    Idempotence
    多次执行产生相同结果的特性,运维脚本必备
    连接池
    Connection Pool
    预先建立并复用连接,减少建立连接的开销
    异步编程
    Asynchronous Programming
    使用协程提高I/O密集型任务的并发性能
    分布式追踪
    Distributed Tracing
    跟踪请求在分布式系统中的完整调用链路
    熔断器
    Circuit Breaker
    当服务异常时自动断开调用,防止雪崩
    SSH密钥认证
    SSH Key Authentication
    使用公私钥对进行身份验证,比密码更安全
    慢查询
    Slow Query
    执行时间超过阈值的数据库查询
    全表扫描
    Full Table Scan
    数据库查询未使用索引,扫描整张表的所有行
    日志轮转
    Log Rotation
    定期归档和删除旧日志,防止占满磁盘
    优雅退出
    Graceful Shutdown
    程序收到终止信号后完成当前任务再退出

    Python社区是高质量的Python/Django开发社区
    本文地址:http://www.python88.com/topic/190362