Linux 运维常用脚本和工具
一、ETCD 备份脚本
1. 主程序脚本
[root@mysql-131 local]# cat /usr/local/bin/etcd-backup.sh
#!/bin/bash
# 配置变量
ETCD_CA="/etc/etcd/ssl/etcd-ca.pem"
ETCD_CERT="/etc/etcd/ssl/etcd-server.pem"
ETCD_KEY="/etc/etcd/ssl/etcd-server-key.pem"
ETCD_ENDPOINTS="https://192.168.233.32:2379,https://192.168.233.33:2379,https://192.168.233.34:2379"
BACKUP_DIR="/var/openebs/local/etcd-backup"
KEEP_DAYS="7"
# 创建备份目录
mkdir -p "${BACKUP_DIR}"
# 选择第一个健康的节点进行备份
IFS=',' read -ra ENDPOINTS <<< "${ETCD_ENDPOINTS}"
for endpoint in "${ENDPOINTS[@]}"; do
if ETCDCTL_API=3 etcdctl --cacert="${ETCD_CA}" \
--cert="${ETCD_CERT}" --key="${ETCD_KEY}" \
--endpoints="${endpoint}" endpoint health &>/dev/null; then
HEALTHY_ENDPOINT="${endpoint}"
break
fi
done
if [ -z "${HEALTHY_ENDPOINT}" ]; then
echo "$(date '+%Y-%m-%d %H:%M:%S') - No healthy etcd endpoint available, skipping backup" >&2
exit 1
fi
# 创建备份文件名
BACKUP_FILE="${BACKUP_DIR}/etcd-snapshot-$(date +%Y%m%d-%H%M%S).db"
# 执行备份
echo "$(date '+%Y-%m-%d %H:%M:%S') - Starting etcd backup to ${BACKUP_FILE} using endpoint ${HEALTHY_ENDPOINT}"
if ETCDCTL_API=3 etcdctl --cacert="${ETCD_CA}" \
--cert="${ETCD_CERT}" --key="${ETCD_KEY}" \
--endpoints="${HEALTHY_ENDPOINT}" snapshot save "${BACKUP_FILE}"; then
echo "$(date '+%Y-%m-%d %H:%M:%S') - Backup completed successfully"
# 检查备份文件状态
ETCDCTL_API=3 etcdctl --write-out=table snapshot status "${BACKUP_FILE}"
# 清理旧备份
find "${BACKUP_DIR}" -name 'etcd-snapshot-*.db' -mtime +${KEEP_DAYS} -delete
echo "$(date '+%Y-%m-%d %H:%M:%S') - Old backups older than ${KEEP_DAYS} days cleaned"
else
echo "$(date '+%Y-%m-%d %H:%M:%S') - Backup failed" >&2
exit 1
fi2. systemd 管理脚本
[root@mysql-131 local]# cat /etc/systemd/system/etcd-backup.service
[Unit]
Description=ETCD Backup Service
After=network.target
Wants=etcd.service
[Service]
Type=oneshot
User=root
ExecStart=/usr/local/bin/etcd-backup.sh
Environment="ETCDCTL_API=3"
[Install]
WantedBy=multi-user.target3. systemd 定时任务
当然也可以用 cron 计划任务
[root@mysql-131 local]# cat /etc/systemd/system/etcd-backup.timer
[Unit]
Description=Run etcd backup every 4 hours
[Timer]
OnCalendar=*-*-* 00/4:00:00
AccuracySec=1m
Persistent=true
[Install]
WantedBy=timers.target解读定时器
[Unit] 部分
Description: 对定时任务的描述,这里是“每4小时运行一次 etcd 备份”。
[Timer] 部分
OnCalendar: 定义任务的触发时间。*-*-* 00/4:00:00 表示从午夜开始,每隔4小时触发一次任务(即00:00、04:00、08:00等)。
AccuracySec: 定义任务触发的时间精度,这里是1分钟。这意味着任务可能会在指定时间的前后1分钟内触发。
Persistent: 如果设置为 true,表示如果系统在预定时间未运行,任务会在系统启动后尽快执行一次。
[Install] 部分
WantedBy: 定义该定时任务在系统启动时是否自动启动。timers.target 是一个特殊的 systemd 目标,用于管理所有定时任务。二、远程同步脚本
1. 定时远程同步本地文件
定时每天凌晨1点同步本地目录中最新的6个文件到远端机器
0 1 * * * /bin/bash -c "ls /var/openebs/local/etcd-backup/ | sort -r | head -6 | xargs -I {} rsync -avz --partial --progress --checksum /var/openebs/local/etcd-backup/{} root@192.168.233.132:/var/openebs/db-backup/etcd-backup" >> /var/log/etcd-backup.log 2>&1本地文件命名格式如下:
[root@mysql-131 ~]# ls -lah /var/openebs/local/etcd-backup/
total 157G
drwxr-xr-x 2 root root 4.0K May 20 12:01 .
drwxr-xr-x 6 root root 4.0K May 7 15:44 ..
-rw------- 1 root root 3.3G May 19 12:01 etcd-snapshot-20250519-120000.db
-rw------- 1 root root 3.3G May 19 16:01 etcd-snapshot-20250519-160000.db
-rw------- 1 root root 3.3G May 19 20:04 etcd-snapshot-20250519-200000.db
-rw------- 1 root root 3.3G May 20 00:02 etcd-snapshot-20250520-000000.db
-rw------- 1 root root 3.3G May 20 04:02 etcd-snapshot-20250520-040000.db
-rw------- 1 root root 3.3G May 20 08:01 etcd-snapshot-20250520-080000.db
-rw------- 1 root root 3.3G May 20 12:01 etcd-snapshot-20250520-120000.db2. 脚本本地目录同步到远程
会检查本地
/var/openebs/local/下的 mysql-backup、pgsql-backup 和 mongodb-backup 目录筛选出最近1天创建的备份目录(根据目录名中的日期部分)
使用rsync将目录同步到远程对应的位置
记录详细日志到
/var/log/backup_sync.log--partial参数,允许中断的传输在恢复时继续--timeout=60参数,防止网络问题导致长时间挂起实现了
sync_with_retry函数,最多重试3次每次重试之间有30秒的间隔(可配置)
脚本会跟踪失败的同步操作数量
最终退出状态码反映失败的数量
更详细的日志记录,方便排查问题
$ vim /usr/local/bin/backup_sync.sh
#!/bin/bash
# 配置参数
LOCAL_BASE="/var/openebs/local"
REMOTE_HOST="192.168.233.132"
REMOTE_USER="root"
REMOTE_BASE="/var/openebs/db-backup"
LOG_FILE="/var/log/backup_sync.log"
MAX_RETRIES=3
RETRY_DELAY=30
# 备份类型和对应的$LOCAL_BASE里面的目录
declare -A BACKUP_TYPES=(
["mysql"]="mysql-backup"
["pgsql"]="pgsql-backup"
["mongodb"]="mongodb-backup"
)
# 创建日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >> "$LOG_FILE"
}
# 带重试机制的rsync同步函数
sync_with_retry() {
local src=$1
local remote_target=$2
local backup_type=$3
local retry_count=0
local sync_success=false
# 提取目录名用于日志
local dir_name=$(basename "$src")
while [ $retry_count -lt $MAX_RETRIES ] && [ "$sync_success" = false ]; do
log "尝试 $((retry_count+1))/$MAX_RETRIES: 同步 $dir_name ($backup_type) 到远程..."
# 使用SSH方式的rsync进行同步
rsync -avz --partial --progress --timeout=60 -e ssh \
--rsync-path="mkdir -p ${REMOTE_BASE}/${BACKUP_TYPES[$backup_type]} && rsync" \
"$src/" \
"${REMOTE_USER}@${REMOTE_HOST}:${REMOTE_BASE}/${BACKUP_TYPES[$backup_type]}/$dir_name/" \
>> "$LOG_FILE" 2>&1
if [ $? -eq 0 ]; then
sync_success=true
log "同步 $dir_name ($backup_type) 成功"
else
retry_count=$((retry_count+1))
if [ $retry_count -lt $MAX_RETRIES ]; then
log "同步失败,${RETRY_DELAY}秒后重试..."
sleep $RETRY_DELAY
else
log "同步 $dir_name ($backup_type) 失败,已达最大重试次数"
fi
fi
done
if [ "$sync_success" = false ]; then
return 1
else
return 0
fi
}
# 主同步函数
sync_recent_backups() {
local sync_errors=0
# 获取当前日期和前一天日期(字符串格式)
local current_date=$(date +%Y%m%d)
local yesterday_date=$(date -d "1 day ago" +%Y%m%d)
log "当前日期: $current_date, 前一天日期: $yesterday_date"
for backup_type in "${!BACKUP_TYPES[@]}"; do
local_source_dir="${LOCAL_BASE}/${BACKUP_TYPES[$backup_type]}"
if [ ! -d "$local_source_dir" ]; then
log "警告: 本地目录不存在: $local_source_dir"
continue
fi
log "开始处理 $backup_type 备份..."
# 查找最近1天的目录(当天+前一天)
recent_dirs=()
while IFS= read -r -d $'\0' dir; do
dir_name=$(basename "$dir")
# 提取8位日期字符串(前8个字符)
dir_date="${dir_name:0:8}"
# 验证是否为有效日期
if ! date -d "$dir_date" &>/dev/null; then
log "跳过无效日期目录: $dir_name"
continue
fi
# 直接比较日期字符串
if [[ "$dir_date" == "$current_date" || "$dir_date" == "$yesterday_date" ]]; then
log "目录 $dir_name 属于最近1天(日期: $dir_date),将同步"
recent_dirs+=("$dir")
else
log "跳过旧目录: $dir_name(日期: $dir_date)"
fi
done < <(find "$local_source_dir" -maxdepth 1 -type d -name "202*" -print0)
if [ ${#recent_dirs[@]} -eq 0 ]; then
log "未找到最近1天的 $backup_type 备份目录"
continue
fi
# 同步每个找到的目录
for dir in "${recent_dirs[@]}"; do
# 调用带重试机制的同步函数
sync_with_retry "$dir" "${REMOTE_USER}@${REMOTE_HOST}:${REMOTE_BASE}/${BACKUP_TYPES[$backup_type]}" "$backup_type"
if [ $? -ne 0 ]; then
sync_errors=$((sync_errors+1))
fi
done
done
return $sync_errors
}
# 主执行流程
log "===== 开始备份同步任务 ====="
sync_recent_backups
result=$?
if [ $result -eq 0 ]; then
log "备份同步任务成功完成"
else
log "备份同步任务完成,但有 $result 个目录同步失败"
fi
log "===== 任务结束 ====="
exit $result计划任务定期执行脚本
0 1 * * * /bin/bash -c "bash /usr/local/bin/backup_sync.sh"3. harbor 仓库远程自动同步
该脚本会自动把 A 仓库的镜像同步至 B 仓库中
root@k8s-master1:~# cat /data/script/harbor/replication-images.sh
#!/usr/bin/env bash
#
#****************************************************************************
# 作者: 甄天祥
# QQ: 2099637909
# 日期: 2022-09-21
# 网址: http://blog.tianxiang.love
# 描述: 同步本地Harbor仓库中没有的远端Harbor仓库中的镜像
#*****************************************************************************
set -eo pipefail
# === 默认配置 ===
DEFAULT_SOURCE_USER="admin"
DEFAULT_SOURCE_PASS="Harbor12345"
DEFAULT_SOURCE_ADDR="harbor.tianxiang.love:30443"
DEFAULT_LOCAL_USER="admin"
DEFAULT_LOCAL_PASS="Harbor12345"
DEFAULT_LOCAL_ADDR="harbor-m6.tianxiang.love"
DEFAULT_THREADS=5
DEFAULT_WATCH_INTERVAL=300 # 默认每 5 分钟轮询
# === 初始化变量 ===
SOURCE_HARBOR_USER="$DEFAULT_SOURCE_USER"
SOURCE_HARBOR_PASSWD="$DEFAULT_SOURCE_PASS"
SOURCE_HARBOR_ADDRESS="$DEFAULT_SOURCE_ADDR"
LOCAL_HARBOR_USER="$DEFAULT_LOCAL_USER"
LOCAL_HARBOR_PASSWD="$DEFAULT_LOCAL_PASS"
LOCAL_HARBOR_ADDRESS="$DEFAULT_LOCAL_ADDR"
THREADS=$DEFAULT_THREADS
WATCH_MODE=false
WATCH_INTERVAL=$DEFAULT_WATCH_INTERVAL
SYNC_ALL=true
SPECIFIC_PROJECTS=()
EXCLUDE_PROJECTS=()
DRY_RUN=false
LOG_LEVEL="info"
TIMESTAMP=$(date '+%Y-%m-%d-%H-%M')
LOG_FILE="harbor-sync-${TIMESTAMP}.log"
FAILED_FILE="failed-images.txt"
# === 颜色定义 ===
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
BLUE='\033[0;34m'
NC='\033[0m'
# === 日志函数 ===
log() {
local level=$1
local message=$2
local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
case $level in
"debug")
[[ "$LOG_LEVEL" == "debug" ]] && echo -e "${BLUE}[DEBUG]${NC} ${timestamp} - ${message}" | tee -a "$LOG_FILE"
;;
"info")
echo -e "${GREEN}[INFO]${NC} ${timestamp} - ${message}" | tee -a "$LOG_FILE"
;;
"warn")
echo -e "${YELLOW}[WARN]${NC} ${timestamp} - ${message}" | tee -a "$LOG_FILE"
;;
"error")
echo -e "${RED}[ERROR]${NC} ${timestamp} - ${message}" | tee -a "$LOG_FILE"
;;
*)
echo -e "[${level}] ${timestamp} - ${message}" | tee -a "$LOG_FILE"
;;
esac
}
# === 帮助信息 ===
usage() {
echo "使用方法: $0 [选项]"
echo
echo "选项:"
echo " --source-user 用户 源Harbor用户名"
echo " --source-pass 密码 源Harbor密码"
echo " --source-addr 地址 源Harbor地址"
echo " --local-user 用户 本地Harbor用户名"
echo " --local-pass 密码 本地Harbor密码"
echo " --local-addr 地址 本地Harbor地址"
echo " --project 项目名 同步指定项目 (可多次使用)"
echo " --exclude 项目名 排除指定项目 (可多次使用)"
echo " --threads 数量 并发线程数 (默认: $DEFAULT_THREADS)"
echo " --dry-run 试运行,不实际同步镜像"
echo " --debug 启用DEBUG日志"
echo " --watch 启用守护模式"
echo " --watch-interval 秒数 守护模式下轮询间隔 (默认: $DEFAULT_WATCH_INTERVAL)"
echo " -h, --help 显示帮助信息"
exit 0
}
# === 参数解析 ===
parse_args() {
while [[ $# -gt 0 ]]; do
case "$1" in
--source-user) SOURCE_HARBOR_USER="$2"; shift 2;;
--source-pass) SOURCE_HARBOR_PASSWD="$2"; shift 2;;
--source-addr) SOURCE_HARBOR_ADDRESS="$2"; shift 2;;
--local-user) LOCAL_HARBOR_USER="$2"; shift 2;;
--local-pass) LOCAL_HARBOR_PASSWD="$2"; shift 2;;
--local-addr) LOCAL_HARBOR_ADDRESS="$2"; shift 2;;
--project) SYNC_ALL=false; SPECIFIC_PROJECTS+=("$2"); shift 2;;
--exclude) EXCLUDE_PROJECTS+=("$2"); shift 2;;
--threads) THREADS="$2"; shift 2;;
--dry-run) DRY_RUN=true; shift;;
--debug) LOG_LEVEL="debug"; shift;;
--watch) WATCH_MODE=true; shift;;
--watch-interval) WATCH_INTERVAL="$2"; shift 2;;
-h|--help) usage;;
*) log error "未知选项: $1"; usage; exit 1;;
esac
done
}
check_dependencies() {
for cmd in curl docker jq; do
command -v "$cmd" >/dev/null || { log error "缺少依赖: $cmd"; exit 1; }
done
}
harbor_login() {
docker login "$1" -u "$2" -p "$3" >/dev/null || { log error "登录 $1 失败"; exit 1; }
}
fetch_projects() {
curl -s -k -u "$1:$2" -X GET "https://$3/api/v2.0/projects?page_size=100" | jq -r '.[].name'
}
fetch_image_tags() {
curl -s -k -u "$1:$2" -X GET "https://$3/api/v2.0/projects/$4/repositories?page_size=100" |
jq -r '.[].name' | while read -r repo; do
curl -s -k -u "$1:$2" -X GET "https://$3/v2/$repo/tags/list" |
jq -r '.tags[]?' | while read -r tag; do
echo "$repo:$tag"
done
done
}
create_project() {
curl -s -k -u "$LOCAL_HARBOR_USER:$LOCAL_HARBOR_PASSWD" -X POST "https://$LOCAL_HARBOR_ADDRESS/api/v2.0/projects" \
-H "Content-Type: application/json" \
-d '{"project_name": "'$1'", "metadata": {"public": "false"}}' >/dev/null 2>&1 || true
}
sync_image() {
local image=$1
for attempt in {1..3}; do
if docker pull "$SOURCE_HARBOR_ADDRESS/$image" && \
docker tag "$SOURCE_HARBOR_ADDRESS/$image" "$LOCAL_HARBOR_ADDRESS/$image" && \
docker push "$LOCAL_HARBOR_ADDRESS/$image"; then
docker rmi "$SOURCE_HARBOR_ADDRESS/$image" "$LOCAL_HARBOR_ADDRESS/$image" >/dev/null 2>&1
log info "同步成功: $image"
echo "$image" >> synced-images.txt
return 0
else
log warn "第 $attempt 次尝试失败: $image"
sleep 2
fi
done
log error "同步失败: $image"
echo "$image" >> "$FAILED_FILE"
}
sync_images_concurrent() {
local image_list=("$@")
local -i index=0 total=${#image_list[@]}
while [[ $index -lt $total ]]; do
for ((i=0; i<THREADS && index<total; i++, index++)); do
sync_image "${image_list[$index]}" &
done
wait
done
}
main() {
start_time=$(date +%s)
check_dependencies
log info "登录 Harbor 仓库..."
harbor_login "$SOURCE_HARBOR_ADDRESS" "$SOURCE_HARBOR_USER" "$SOURCE_HARBOR_PASSWD"
harbor_login "$LOCAL_HARBOR_ADDRESS" "$LOCAL_HARBOR_USER" "$LOCAL_HARBOR_PASSWD"
log info "获取源项目列表..."
readarray -t ALL_PROJECTS < <(fetch_projects "$SOURCE_HARBOR_USER" "$SOURCE_HARBOR_PASSWD" "$SOURCE_HARBOR_ADDRESS")
FILTERED_PROJECTS=()
for proj in "${ALL_PROJECTS[@]}"; do
[[ "$SYNC_ALL" == false && ! " ${SPECIFIC_PROJECTS[*]} " =~ " $proj " ]] && continue
[[ " ${EXCLUDE_PROJECTS[*]} " =~ " $proj " ]] && continue
FILTERED_PROJECTS+=("$proj")
done
> synced-images.txt
> "$FAILED_FILE"
TO_SYNC_IMAGES=()
for proj in "${FILTERED_PROJECTS[@]}"; do
log info "处理项目: $proj"
create_project "$proj"
readarray -t SRC_IMAGES < <(fetch_image_tags "$SOURCE_HARBOR_USER" "$SOURCE_HARBOR_PASSWD" "$SOURCE_HARBOR_ADDRESS" "$proj")
readarray -t DST_IMAGES < <(fetch_image_tags "$LOCAL_HARBOR_USER" "$LOCAL_HARBOR_PASSWD" "$LOCAL_HARBOR_ADDRESS" "$proj")
for image in "${SRC_IMAGES[@]}"; do
if ! printf '%s\n' "${DST_IMAGES[@]}" | grep -qx "$image"; then
TO_SYNC_IMAGES+=("$image")
fi
done
done
if [[ "$DRY_RUN" == true ]]; then
log info "[Dry Run] 需要同步的镜像:"
printf '%s\n' "${TO_SYNC_IMAGES[@]}"
exit 0
fi
log info "共需同步 ${#TO_SYNC_IMAGES[@]} 个镜像,开始并发执行..."
sync_images_concurrent "${TO_SYNC_IMAGES[@]}"
end_time=$(date +%s)
duration=$((end_time - start_time))
log info "✅ 同步完成: 成功 ${#TO_SYNC_IMAGES[@]} 个镜像, 耗时 ${duration} 秒"
if [[ -s "$FAILED_FILE" ]]; then
log warn "❌ 失败镜像列表如下 (保存在 $FAILED_FILE):"
cat "$FAILED_FILE"
fi
log info "������ 同步成功的镜像列表:"
sort synced-images.txt || true
}
watch_mode() {
log info "进入守护模式,每 ${WATCH_INTERVAL} 秒轮询同步..."
while true; do
main "${ORIGINAL_ARGS[@]}"
log info "等待 ${WATCH_INTERVAL} 秒后继续同步..."
sleep "$WATCH_INTERVAL"
done
}
# 启动
ORIGINAL_ARGS=("$@")
parse_args "$@"
if [[ "$WATCH_MODE" == true ]]; then
watch_mode
else
main "$@"
fisystemd 管理脚本程序并启动
root@k8s-master1:~# cat /etc/systemd/system/harbor-sync.service
[Unit]
Description=Harbor 镜像同步守护进程
After=network.target docker.service
Requires=docker.service
[Service]
Type=simple
ExecStart=/data/script/harbor/replication-images.sh --watch --watch-interval 300
WorkingDirectory=/data/script/harbor
StandardOutput=append:/var/log/harbor-sync.log
StandardError=append:/var/log/harbor-sync.err
Restart=always
RestartSec=10
User=root
[Install]
WantedBy=multi-user.targetroot@k8s-master1:~# systemctl daemon-reload
root@k8s-master1:~# systemctl enable harbor-sync --now
root@k8s-master1:~# systemctl status harbor-sync.service
● harbor-sync.service - Harbor 镜像同步守护进程
Loaded: loaded (/etc/systemd/system/harbor-sync.service; enabled; vendor preset: enabled)
Active: active (running) since Thu 2025-08-21 06:53:35 CST; 8h ago
Main PID: 4081799 (bash)
Tasks: 2 (limit: 4557)
Memory: 1.5M
CPU: 13min 18.985s
CGroup: /system.slice/harbor-sync.service
├─ 546574 sleep 300
└─4081799 bash /data/script/harbor/replication-images.sh --watch --watch-interval 300
Aug 21 06:53:35 k8s-master1 systemd[1]: harbor-sync.service: Consumed 7h 14min 37.650s CPU time.
Aug 21 06:53:35 k8s-master1 systemd[1]: Started Harbor 镜像同步守护进程.三、shell 常用脚本
1. 解压缩
压缩
$ tar -cf - ansibel-ubuntu-22.04-2025-08-14/ | pigz -9 -p $(nproc) > ansibel-ubuntu-22.04-2025-08-14.tar.gzpigz -9:压缩级别。-p $(nproc):使用所有可用的 CPU 核心(nproc返回 CPU 核心数)。
解压缩
$ pigz -d -p $(nproc) < ansibel-ubuntu-22.04-2025-08-14.tar.gz | tar xf -pigz -d:多线程解压.gz文件。-p $(nproc):使用所有可用的 CPU 核心(nproc返回 CPU 核心数)。tar xf -:从标准输入读取解压后的数据并提取文件。
2. 计划任务
1.1 同步本地最新目录到远端
配合使用 grep 正则表达式以及 sort 排序 和 tail 取最后一个,来找到最新的目录,然后进行同步
$ ls -lh /data/k8s-app/mysql-cluster/mysql-backup/
total 32K
drwxr-xr-x 2 root root 139 Aug 6 21:33 20250806_213341
drwxr-xr-x 2 root root 136 Aug 6 21:42 20250806_214112
drwxr-xr-x 2 root root 139 Aug 7 10:00 20250807_100001
drwxr-xr-x 2 root root 139 Aug 8 10:00 20250808_100001
drwxr-xr-x 2 root root 139 Aug 9 10:00 20250821_100001
[root@k8s-master1 ~]# ls /data/k8s-app/mysql-cluster/mysql-backup/ | grep -E '^[0-9]{8}_[0-9]{6}$' | sort | tail -n 1
20250821_100001# 上午 10 点同步 mysql-cluster 备份到 192.168.198.51 虚拟机
30 10 * * * rsync -avzP "/data/k8s-app/mysql-cluster/mysql-backup/$(ls /data/k8s-app/mysql-cluster/mysql-backup/ | grep -E '^[0-9]{8}_[0-9]{6}$' | sort | tail -n 1)" tianxiang@192.168.198.51:/data/k8s-app/mysql-backup/1.2 执行脚本自动导入数据库
#40 10 * * * /data/k8s-app/mysql-backup/restore_mysql.sh >> /data/k8s-app/mysql-backup/mysql_restore.log 2>&1
40 10 * * * /data/k8s-app/mysql-backup/restore_mysql.sh$ cat /data/k8s-app/mysql-backup/restore_mysql.sh
#!/bin/bash
# 配置部分
BACKUP_DIR="/data/k8s-app/mysql-backup"
NAMESPACE="middleware"
MYSQL_USER="root"
MYSQL_PASSWORD="123456"
DATABASE="halodb"
LOG_FILE="/var/log/mysql_restore.log"
# 日志函数 - 同时输出到终端和日志文件
log() {
local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
local message="[$timestamp] $1"
echo "$message" # 输出到终端
echo "$message" >> "$LOG_FILE" # 追加到日志文件
}
# 错误处理函数
error_exit() {
log "错误: $1"
exit 1
}
# 1. 查找最新的备份目录
log "开始查找最新备份目录..."
latest_backup=$(ls "$BACKUP_DIR" | grep -E '^[0-9]{8}_[0-9]{6}$' | sort | tail -n 1)
if [ -z "$latest_backup" ]; then
error_exit "未找到有效的备份目录"
fi
log "找到最新备份目录: $latest_backup"
# 2. 查找并解压 halodb 开头的 SQL 文件
backup_path="$BACKUP_DIR/$latest_backup"
log "在目录 $backup_path 中查找备份文件..."
gz_file=$(ls "$backup_path" | grep -E '^halodb_[0-9]{8}_[0-9]{6}\.sql\.gz$' | head -n 1)
if [ -z "$gz_file" ]; then
error_exit "未找到 halodb 开头的 SQL 压缩文件"
fi
log "找到备份文件: $gz_file"
log "开始解压文件..."
gunzip -c "$backup_path/$gz_file" > "$backup_path/${gz_file%.gz}" || {
error_exit "解压文件失败"
}
sql_file="${gz_file%.gz}"
log "已解压文件: $sql_file"
# 3. 获取集群主节点
log "正在查找集群主节点..."
primary_node=$(kubectl -n middleware exec -it mysql-cluster-0 -c mysql -- mysqlsh -uri root:123456@127.0.0.1 -- cluster status 2>/dev/null | grep -C 5 "PRIMARY" | grep "address" | awk -F\" '{print $4}' | awk -F. '{print $1}')
if [ -z "$primary_node" ]; then
error_exit "无法确定集群主节点"
fi
pod_name="${primary_node%.*}" # 去除域名部分,获取 pod 名称
log "找到主节点: $pod_name"
# 4. 复制并导入 SQL 文件
log "正在复制 SQL 文件到容器..."
kubectl -n "$NAMESPACE" cp "$backup_path/$sql_file" "$pod_name:/var/lib/mysql/$sql_file" -c mysql || {
error_exit "复制文件失败"
}
log "正在导入数据库..."
start_time=$(date +%s)
kubectl -n "$NAMESPACE" exec -it "$pod_name" -c mysql -- \
sh -c "mysql -u$MYSQL_USER -p$MYSQL_PASSWORD $DATABASE < /var/lib/mysql/$sql_file" && {
end_time=$(date +%s)
duration=$((end_time - start_time))
log "数据库导入成功,耗时: ${duration}秒"
} || {
error_exit "数据库导入失败"
}
# 清理临时文件
log "清理临时文件..."
rm -f "$backup_path/$sql_file" || {
log "警告: 清理临时文件失败,但操作已完成"
}
log "操作完成"3. 脚本循环探活网站
#!/usr/bin/env bash
# url_monitor.sh —— 多 URL 5 秒级健康监控脚本(增强版)
# Usage:
# ./url_monitor.sh [--ping|-p] [--interval|-i N] [--timeout|-t N] [--ping-count N] [--ping-timeout N] [--file|-f urls.txt]
# Example:
# ./url_monitor.sh --ping --interval 5 --timeout 5 --ping-count 2 --ping-timeout 1
# ----------------------------------------------------------
set -o errexit
set -o nounset
set -o pipefail
# ---------------- defaults ----------------
INTERVAL=5 # 主循环间隔(秒)
CURL_TIMEOUT=5 # curl 单次超时(秒)
ENABLE_PING=0 # 是否启用 ping 探测(0/1)
PING_COUNT=1 # ping 次数
PING_TIMEOUT=1 # ping 每次等待超时(秒)
LOG_DIR="/var/log/url_monitor"
URLS_FILE=""
# 内置 URL 列表(如果要用文件加载,可用 -f/--file)
URLS=(
"https://blog.tianxiang.love"
)
# ---------------- helpers ----------------
current_log_file() {
echo "${LOG_DIR}/$(date +%F).log"
}
mkdir -p "$LOG_DIR"
show_help() {
cat <<EOF
Usage: $0 [options]
Options:
-p, --ping 启用 ping 探测(会从 URL 提取 host 并 ping)
-i, --interval N 监控间隔(秒),默认 ${INTERVAL}
-t, --timeout N curl 超时(秒),默认 ${CURL_TIMEOUT}
--ping-count N ping 次数,默认 ${PING_COUNT}
--ping-timeout N ping 每次等待超时(秒),默认 ${PING_TIMEOUT}
-f, --file FILE 从文件加载 URL(逐行,# 注释会被忽略)
-h, --help 显示此帮助并退出
Example:
$0 --ping -i 5 -t 3 --ping-count 2 --ping-timeout 1
EOF
}
# 颜色仅用于 console 输出,写入文件为纯文本
log() {
local level="$1"; shift
local msg="$*"
local ts
ts="$(date '+%F %T')"
local plain="[$ts] [$level] $msg"
# color
local color_reset="\033[0m"
local color=""
case "$level" in
INFO) color="\033[32m" ;; # green
WARN) color="\033[33m" ;; # yellow
ERROR) color="\033[31m" ;; # red
*) color="" ;;
esac
# console (colored)
if [[ -t 1 ]]; then
echo -e "[$ts] [${color}${level}${color_reset}] $msg"
else
echo "[$ts] [$level] $msg"
fi
# append plain to daily log file
echo "$plain" >> "$(current_log_file)"
}
INFO() { log "INFO" "$@"; }
WARN() { log "WARN" "$@"; }
ERROR() { log "ERROR" "$@"; }
# 从 URL 提取 host(支持带端口、带用户名、IPv6 [::1])
extract_host() {
local url="$1"
# remove scheme
local hp="${url#*://}"
# remove path
hp="${hp%%/*}"
# remove userinfo if present
hp="${hp#*@}"
# if IPv6 like [::1]:8080
if [[ "$hp" =~ ^\[(.*)\](:[0-9]+)?$ ]]; then
echo "${BASH_REMATCH[1]}"
return
fi
# strip port if any
echo "${hp%%:*}"
}
# ping 探测
ping_probe() {
local host="$1"
if ! command -v ping >/dev/null 2>&1; then
WARN "ping 命令不可用,跳过 ping: $host"
return 2
fi
# 尝试使用一次更短的超时
if ping -c "$PING_COUNT" -W "$PING_TIMEOUT" "$host" > /dev/null 2>&1; then
INFO "ping 成功: $host (count=${PING_COUNT} timeout=${PING_TIMEOUT}s)"
return 0
else
ERROR "ping 失败: $host (count=${PING_COUNT} timeout=${PING_TIMEOUT}s)"
return 1
fi
}
# curl 探测(返回码 + 耗时)
probe() {
local url="$1"
local tmp_err
tmp_err="$(mktemp)"
# curl -w 输出: http_code|time_total
local output
output=$(curl -sSL -o /dev/null --max-time "$CURL_TIMEOUT" -w "%{http_code}|%{time_total}" "$url" 2>"$tmp_err") || true
local exit_code=$?
local err_msg
err_msg="$(<"$tmp_err")"
rm -f "$tmp_err"
if [[ $exit_code -ne 0 ]]; then
# 更详细的错误映射
case $exit_code in
3) ERROR "$url URL 格式错误 (curl error 3). $err_msg" ;;
6) ERROR "$url DNS 解析失败 (curl error 6). $err_msg" ;;
7) ERROR "$url 连接失败/不可达 (curl error 7). $err_msg" ;;
28) ERROR "$url 请求超时 (curl error 28). $err_msg" ;;
35) ERROR "$url SSL 握手失败 (curl error 35). $err_msg" ;;
51|60) ERROR "$url SSL 证书验证失败 (curl error $exit_code). $err_msg" ;;
52) WARN "$url 空响应 (curl error 52). $err_msg" ;;
56) ERROR "$url 接收失败/连接中断 (curl error 56). $err_msg" ;;
*) ERROR "$url curl 错误码=$exit_code. ${err_msg:-'(no additional info)'}" ;;
esac
return 1
fi
# 正常返回,解析 http_code 和 time_total
IFS='|' read -r http_code time_total <<< "$output"
http_code="${http_code:-000}"
time_total="${time_total:-0}"
# 将秒转毫秒(整数)
local time_ms
time_ms=$(awk "BEGIN{printf \"%.0f\", ($time_total)*1000}")
# http 状态分类
if [[ "$http_code" =~ ^2[0-9][0-9]$ ]]; then
INFO "$url 正常 code=$http_code time=${time_ms}ms"
return 0
elif [[ "$http_code" =~ ^3[0-9][0-9]$ ]]; then
WARN "$url 重定向 code=$http_code time=${time_ms}ms"
return 2
elif [[ "$http_code" =~ ^4[0-9][0-9]$ ]]; then
ERROR "$url 客户端错误 code=$http_code time=${time_ms}ms"
return 1
elif [[ "$http_code" =~ ^5[0-9][0-9]$ ]]; then
ERROR "$url 服务器错误 code=$http_code time=${time_ms}ms"
return 1
else
WARN "$url 非标准响应 code=$http_code time=${time_ms}ms"
return 2
fi
}
# ---------------- 参数解析 ----------------
if [[ $# -gt 0 ]]; then
while [[ $# -gt 0 ]]; do
case "$1" in
-p|--ping) ENABLE_PING=1; shift ;;
-i|--interval) INTERVAL="$2"; shift 2 ;;
-t|--timeout) CURL_TIMEOUT="$2"; shift 2 ;;
--ping-count) PING_COUNT="$2"; shift 2 ;;
--ping-timeout) PING_TIMEOUT="$2"; shift 2 ;;
-f|--file) URLS_FILE="$2"; shift 2 ;;
-h|--help) show_help; exit 0 ;;
*) echo "未知参数: $1"; show_help; exit 1 ;;
esac
done
fi
# 若指定了文件,则覆盖内置 URL 列表
if [[ -n "${URLS_FILE:-}" ]]; then
if [[ ! -f "$URLS_FILE" ]]; then
echo "指定的 URL 文件不存在: $URLS_FILE" >&2
exit 2
fi
URLS=()
while IFS= read -r line || [[ -n "$line" ]]; do
line="${line%%#*}" # 去掉注释部分
line="${line//[[:space:]]/}" # 去空白
if [[ -n "$line" ]]; then
URLS+=("$line")
fi
done < "$URLS_FILE"
fi
if [[ ${#URLS[@]} -eq 0 ]]; then
echo "没有可用的 URL,退出。" >&2
exit 1
fi
trap 'INFO "收到中断信号,退出"; exit 0' SIGINT SIGTERM
INFO "===== 开始监控(共 ${#URLS[@]} 个 URL) interval=${INTERVAL}s curl_timeout=${CURL_TIMEOUT}s ping=${ENABLE_PING} ====="
# 主循环
while true; do
for u in "${URLS[@]}"; do
probe "$u" &
pids_probe+=($!)
# 如果启用了 ping,则异步 ping(并且只对 host 进行 ping,不对 URL 本身)
if [[ "$ENABLE_PING" -eq 1 ]]; then
host="$(extract_host "$u")"
if [[ -n "$host" ]]; then
ping_probe "$host" &
pids_ping+=($!)
else
WARN "无法从 URL 提取 host,跳过 ping: $u"
fi
fi
done
# 等待本轮所有后台任务完成(防止下一轮和上一轮冲突)
for pid in "${pids_probe[@]:-}"; do wait "$pid" || true; done
unset pids_probe
for pid in "${pids_ping[@]:-}"; do wait "$pid" || true; done
unset pids_ping
sleep "$INTERVAL"
donesystemd 管理并启动
[root@k8s-app-1 script]# cat /etc/systemd/system/url_monitor.service
[Unit]
Description=URL & Ping Health Monitor Service
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
ExecStart=/usr/local/bin/url_monitor.sh --ping --interval 5 --timeout 5 --ping-count 2 --ping-timeout 1
Restart=always
RestartSec=5
# 建议指定运行用户(日志目录要可写)
User=root
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target[root@k8s-app-1 script]# systemctl status url_monitor.service
● url_monitor.service - URL & Ping Health Monitor Service
Loaded: loaded (/etc/systemd/system/url_monitor.service; enabled; vendor preset: disabled)
Active: active (running) since Mon 2025-08-11 16:49:25 CST; 1 weeks 2 days ago
Main PID: 43222 (bash)
Tasks: 2
Memory: 55.9M
CGroup: /system.slice/url_monitor.service
├─43222 bash /usr/local/bin/url_monitor.sh --ping --interval 5 --timeout 5 --ping-count 2 --ping-timeout 1
└─63349 sleep 5
Aug 21 15:18:47 k8s-app-1 url_monitor.sh[43222]: [2025-08-21 15:18:47] [INFO] ping 成功: blog.tianxiang.love (count=2 timeout=1s)
Aug 21 15:18:53 k8s-app-1 url_monitor.sh[43222]: [2025-08-21 15:18:53] [INFO] https://blog.tianxiang.love 正常 code=200 time=22ms
Aug 21 15:18:53 k8s-app-1 url_monitor.sh[43222]: [2025-08-21 15:18:53] [INFO] ping 成功: blog.tianxiang.love (count=2 timeout=1s)
Aug 21 15:18:59 k8s-app-1 url_monitor.sh[43222]: [2025-08-21 15:18:59] [INFO] https://blog.tianxiang.love 正常 code=200 time=22ms
Aug 21 15:18:59 k8s-app-1 url_monitor.sh[43222]: [2025-08-21 15:18:59] [INFO] ping 成功: blog.tianxiang.love (count=2 timeout=1s)4. 查询汇总服务器的出入口带宽流量
[root@k8s-app-1 check_bandwidth_monitor]# cat bandwidth_monitor.py
import paramiko
import time
import csv
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict
GATEWAY_IP = "192.168.233.1" # 通过网关地址获取到被监控网卡
IP_LIST_FILE = "edge-ip_list.txt"
OUTPUT_FILE = "bandwidth_report.csv"
INTERVAL = 6 # 采样间隔
SAMPLES = 60 # 采样次数(监控时长 = INTERVAL * SAMPLES)
MAX_WORKERS = 97 # 并发数,可根据总主机数量来定义
def read_ip_list(file_path: str) -> List[str]:
with open(file_path, "r") as f:
return [line.strip() for line in f if line.strip()]
def ssh_exec(ssh: paramiko.SSHClient, cmd: str) -> str:
stdin, stdout, stderr = ssh.exec_command(cmd)
return stdout.read().decode().strip()
def get_iface_by_route(ssh: paramiko.SSHClient, target_ip: str) -> str:
output = ssh_exec(ssh, f"ip route get {target_ip}")
print(f"[路由输出] {output}")
parts = output.split()
if "dev" in parts:
idx = parts.index("dev")
return parts[idx + 1]
raise Exception(f"无法识别网卡 from: {output}")
def get_iface_bytes(ssh: paramiko.SSHClient, iface: str) -> Dict[str, int]:
line = ssh_exec(ssh, f"cat /proc/net/dev | grep '{iface}:'")
parts = line.strip().split()
return {
"rx_bytes": int(parts[1]),
"tx_bytes": int(parts[9])
}
def monitor_host(ip: str) -> Dict:
result = {"ip": ip, "avg_rx": 0, "avg_tx": 0}
try:
print(f"开始监控 {ip} ...")
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ip, username="root", timeout=5)
iface = get_iface_by_route(ssh, GATEWAY_IP)
print(f"[{ip}] 出口网卡: {iface}")
rx_total = 0
tx_total = 0
for i in range(SAMPLES):
d1 = get_iface_bytes(ssh, iface)
time.sleep(INTERVAL)
d2 = get_iface_bytes(ssh, iface)
rx_diff = (d2["rx_bytes"] - d1["rx_bytes"]) / INTERVAL
tx_diff = (d2["tx_bytes"] - d1["tx_bytes"]) / INTERVAL
rx_total += rx_diff
tx_total += tx_diff
print(f"[{ip}] RX: {rx_diff/1024:.2f} KB/s, TX: {tx_diff/1024:.2f} KB/s")
result["avg_rx"] = rx_total / SAMPLES / 1024 / 1024 # MB/s
result["avg_tx"] = tx_total / SAMPLES / 1024 / 1024 # MB/s
ssh.close()
except Exception as e:
result["error"] = str(e)
return result
def main():
ip_list = read_ip_list(IP_LIST_FILE)
all_results = []
print(f"=== 参数信息 ===\n监控时长: {INTERVAL * SAMPLES} 秒\n采样间隔: {INTERVAL} 秒\n采样次数: {SAMPLES} 次\n并发数: {MAX_WORKERS}\n")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = [executor.submit(monitor_host, ip) for ip in ip_list]
for future in futures:
all_results.append(future.result())
print("\n=== 各机器平均带宽统计 (单位:MB/s) ===")
total_rx = total_tx = 0
with open(OUTPUT_FILE, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["IP", "RX_MB/s", "TX_MB/s"])
for res in all_results:
ip = res['ip']
rx = res.get("avg_rx", 0)
tx = res.get("avg_tx", 0)
print(f"{ip}: RX={rx:.3f}, TX={tx:.3f}")
total_rx += rx
total_tx += tx
writer.writerow([ip, f"{rx:.3f}", f"{tx:.3f}"])
print(f"\n=== 总体平均带宽 ===")
print(f"总接收 RX: {total_rx:.3f} MB/s")
print(f"总发送 TX: {total_tx:.3f} MB/s")
print(f"结果已保存为 {OUTPUT_FILE}")
if __name__ == "__main__":
main()# 使用此脚本的前提是,执行脚本的机器需要可以免密登陆客户端机器
[root@k8s-app-1 check_bandwidth_monitor]# python3 bandwidth_monitor.py5. 脚本参数化自定义同步本地目录到远端目录
完全参数化:所有配置都通过命令行参数传递
灵活的主题支持:通过
-t/--topic参数指定备份任务名称可配置的源和目标:可以备份任意目录到任意远程位置
模块化选项:
--no-delete:不删除目标端多余文件--no-compress:不压缩传输--dry-run:预览模式
更好的错误处理:详细的参数验证和错误提示
安全性:支持从环境变量读取密码
向后兼容:包装脚本保持原有使用习惯
#!/bin/bash
# backup_script.sh - 参数化备份脚本(适合计划任务)
# 功能:通过参数灵活备份任意目录到远程服务器
# 默认配置(可通过参数覆盖)
DEFAULT_BACKUP_DIR="/var/log/backup-xiaohezi_logs"
DEFAULT_REMOTE_USER="tianxiang"
DEFAULT_REMOTE_HOST="10.100.255.6"
DEFAULT_SSH_PORT="22"
# SSH密码(在此处设置您的密码)
SSH_PASSWORD="TianXiang."
# 显示使用说明
show_usage() {
cat << EOF
用法: $0 [选项] --source SOURCE_PATH --remote REMOTE_PATH
必需参数:
-s, --source SOURCE_PATH 备份源目录路径
-r, --remote REMOTE_PATH 远程目标目录路径
可选参数:
-t, --topic TOPIC_NAME 备份主题/任务名称(用于日志标识)
-u, --user REMOTE_USER 远程服务器用户名(默认: $DEFAULT_REMOTE_USER)
-h, --host REMOTE_HOST 远程服务器地址(默认: $DEFAULT_REMOTE_HOST)
-p, --port SSH_PORT SSH端口(默认: $DEFAULT_SSH_PORT)
-d, --backup-dir DIR 本地日志目录(默认: $DEFAULT_BACKUP_DIR)
-l, --log-file FILE 指定日志文件名(默认自动生成)
--no-delete 不删除目标端多余文件
--no-compress 不压缩传输
--dry-run Dry run模式,只显示将要执行的操作
--help 显示此帮助信息
示例:
$0 -t "MySQL备份" -s /data/mysql -r /backup/mysql
$0 -t "网站数据" -s /var/www -r /backup/website -u myuser -h 192.168.1.100
$0 -t "应用配置" -s /etc/app -r /backup/config --no-delete --dry-run
注意: SSH密码已在脚本中硬编码,请确保脚本文件权限安全。
EOF
}
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
# 显示rsync功能说明
show_rsync_info() {
log "================================================================"
log " rsync 参数功能说明"
log "================================================================"
log "-a (archive) - 归档模式,相当于 -rlptgoD"
log "-v (verbose) - 详细输出"
if [ "$USE_COMPRESS" = true ]; then
log "-z (compress) - 压缩传输"
fi
log "-P (progress + partial) - 显示进度 + 断点续传"
if [ "$USE_DELETE" = true ]; then
log "--delete - 删除目标多余文件"
fi
log "================================================================"
}
# 错误处理函数
error_exit() {
log "错误: $1"
exit 1
}
# 参数解析
parse_arguments() {
SOURCE_PATH=""
REMOTE_PATH=""
TOPIC="未命名任务"
REMOTE_USER="$DEFAULT_REMOTE_USER"
REMOTE_HOST="$DEFAULT_REMOTE_HOST"
SSH_PORT="$DEFAULT_SSH_PORT"
BACKUP_DIR="$DEFAULT_BACKUP_DIR"
LOG_FILE=""
USE_DELETE=true
USE_COMPRESS=true
DRY_RUN=false
while [[ $# -gt 0 ]]; do
case $1 in
-s|--source)
SOURCE_PATH="$2"
shift 2
;;
-r|--remote)
REMOTE_PATH="$2"
shift 2
;;
-t|--topic)
TOPIC="$2"
shift 2
;;
-u|--user)
REMOTE_USER="$2"
shift 2
;;
-h|--host)
REMOTE_HOST="$2"
shift 2
;;
-p|--port)
SSH_PORT="$2"
shift 2
;;
-d|--backup-dir)
BACKUP_DIR="$2"
shift 2
;;
-l|--log-file)
LOG_FILE="$2"
shift 2
;;
--no-delete)
USE_DELETE=false
shift
;;
--no-compress)
USE_COMPRESS=false
shift
;;
--dry-run)
DRY_RUN=true
shift
;;
--help)
show_usage
exit 0
;;
*)
error_exit "未知参数: $1"
;;
esac
done
# 验证必需参数
if [ -z "$SOURCE_PATH" ] || [ -z "$REMOTE_PATH" ]; then
error_exit "必须提供源目录路径(--source)和远程目录路径(--remote)"
fi
# 验证密码是否设置
if [ -z "$SSH_PASSWORD" ]; then
error_exit "SSH密码未设置,请在脚本中设置SSH_PASSWORD变量"
fi
# 设置日志文件
if [ -z "$LOG_FILE" ]; then
mkdir -p "$BACKUP_DIR"
LOG_FILE="${BACKUP_DIR}/backup_${TOPIC}_$(date +%Y%m%d_%H%M%S).log"
else
mkdir -p "$(dirname "$LOG_FILE")"
fi
}
# 检查依赖
check_dependencies() {
log "检查必要依赖..."
# 检查源目录
if [ ! -d "$SOURCE_PATH" ]; then
error_exit "备份源目录不存在: $SOURCE_PATH"
fi
# 检查sshpass是否安装
if ! command -v sshpass &> /dev/null; then
error_exit "sshpass 未安装,请先安装: yum install sshpass 或 apt-get install sshpass"
fi
# 检查rsync是否安装
if ! command -v rsync &> /dev/null; then
error_exit "rsync 未安装,请先安装: yum install rsync 或 apt-get install rsync"
fi
log "依赖检查完成"
}
# 构建rsync命令
build_rsync_command() {
local rsync_cmd="sshpass -p '$SSH_PASSWORD' rsync -av"
# 添加压缩选项
if [ "$USE_COMPRESS" = true ]; then
rsync_cmd="$rsync_cmd -z"
fi
# 添加进度选项
rsync_cmd="$rsync_cmd -P"
# 添加删除选项
if [ "$USE_DELETE" = true ]; then
rsync_cmd="$rsync_cmd --delete"
fi
# 添加SSH选项
rsync_cmd="$rsync_cmd -e 'ssh -p $SSH_PORT -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null'"
# 添加源和目标路径
rsync_cmd="$rsync_cmd '$SOURCE_PATH/' '$REMOTE_USER@$REMOTE_HOST:$REMOTE_PATH'"
echo "$rsync_cmd"
}
# 执行备份
perform_backup() {
log "开始备份: $TOPIC"
log "源目录: $SOURCE_PATH"
log "目标地址: $REMOTE_USER@$REMOTE_HOST:$REMOTE_PATH"
# 构建rsync命令
local rsync_cmd=$(build_rsync_command)
# 显示rsync信息
show_rsync_info
# Dry run模式
if [ "$DRY_RUN" = true ]; then
log "DRY RUN模式 - 将要执行的命令:"
log "$rsync_cmd"
log "Dry run完成,未实际执行备份"
return 0
fi
# 创建远程目录
log "创建远程目录..."
local mkdir_cmd="sshpass -p '$SSH_PASSWORD' ssh -p $SSH_PORT -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $REMOTE_USER@$REMOTE_HOST 'mkdir -p $REMOTE_PATH'"
eval "$mkdir_cmd" >> "$LOG_FILE" 2>&1
if [ $? -ne 0 ]; then
log "警告: 远程目录创建可能失败,但继续执行备份..."
fi
# 执行备份
log "执行rsync备份..."
log "命令: $rsync_cmd"
eval "$rsync_cmd" >> "$LOG_FILE" 2>&1
if [ $? -eq 0 ]; then
log "备份完成: $TOPIC"
return 0
else
error_exit "备份失败: $TOPIC"
fi
}
# 清理旧日志(保留最近30天)
cleanup_old_logs() {
log "清理旧日志文件..."
find "$BACKUP_DIR" -name "backup_*.log" -mtime +30 -delete >> "$LOG_FILE" 2>&1
log "日志清理完成"
}
# 主函数
main() {
# 解析参数
parse_arguments "$@"
# 创建日志目录
mkdir -p "$(dirname "$LOG_FILE")"
log "=== 备份任务开始: $TOPIC ==="
log "日志文件: $LOG_FILE"
# 检查依赖
check_dependencies
# 执行备份
perform_backup
# 清理旧日志
cleanup_old_logs
log "=== 备份任务完成: $TOPIC ==="
log "详细日志请查看: $LOG_FILE"
}
# 执行主函数
main "$@"测试执行
[root@k8s-master1 data]# ./backup_script.sh -t "测试备份" -s /data/Dockerfile -r /home/tianxiang/k8s-app/gitlab/gitlab-backup -u tianxiang -h 10.100.255.6
[2025-10-29 11:24:10] === 备份任务开始: 测试备份 ===
[2025-10-29 11:24:10] 日志文件: /var/log/backup-xiaohezi_logs/backup_测试备份_20251029_112410.log
[2025-10-29 11:24:10] 检查必要依赖...
[2025-10-29 11:24:10] 依赖检查完成
[2025-10-29 11:24:10] 开始备份: 测试备份
[2025-10-29 11:24:10] 源目录: /data/Dockerfile
[2025-10-29 11:24:10] 目标地址: tianxiang@10.100.255.6:/home/tianxiang/k8s-app/gitlab/gitlab-backup
[2025-10-29 11:24:10] ================================================================
[2025-10-29 11:24:10] rsync 参数功能说明
[2025-10-29 11:24:10] ================================================================
[2025-10-29 11:24:10] -a (archive) - 归档模式,相当于 -rlptgoD
[2025-10-29 11:24:10] -v (verbose) - 详细输出
[2025-10-29 11:24:10] -z (compress) - 压缩传输
[2025-10-29 11:24:10] -P (progress + partial) - 显示进度 + 断点续传
[2025-10-29 11:24:10] --delete - 删除目标多余文件
[2025-10-29 11:24:10] ================================================================
[2025-10-29 11:24:10] 创建远程目录...
[2025-10-29 11:24:10] 执行rsync备份...
[2025-10-29 11:24:10] 命令: sshpass -p 'TianXiang.' rsync -av -z -P --delete -e 'ssh -p 22 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' '/data/Dockerfile/' 'tianxiang@10.100.255.6:/home/tianxiang/k8s-app/gitlab/gitlab-backup'
[2025-10-29 11:24:11] 备份完成: 测试备份
[2025-10-29 11:24:11] 清理旧日志文件...
[2025-10-29 11:24:11] 日志清理完成
[2025-10-29 11:24:11] === 备份任务完成: 测试备份 ===
[2025-10-29 11:24:11] 详细日志请查看: /var/log/backup-xiaohezi_logs/backup_测试备份_20251029_112410.log四、MySQL 常用语法
1. 相关查询
1.1 查询数据库存储使用的大小
SELECT
table_schema AS 'Database',
ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) AS 'Size (MB)'
FROM
information_schema.tables
GROUP BY
table_schema;执行输出结果如下:
mysql> SELECT
-> table_schema AS 'Database',
-> ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) AS 'Size (MB)'
-> FROM
-> information_schema.tables
-> GROUP BY
-> table_schema;
+--------------------+-----------+
| Database | Size (MB) |
+--------------------+-----------+
| cmdb | 13.81 |
| indc_alarm | 0.69 |
| information_schema | 0.00 |
| iot | 2.02 |
| mysql | 2.63 |
| performance_schema | 0.00 |
| sys | 0.02 |
| topv2 | 31.64 |
| zh-iam | 0.70 |
+--------------------+-----------+
9 rows in set (0.02 sec)1.2 查询单个数据库的存储使用大小
SELECT
table_schema AS 'Database',
ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) AS 'Size (MB)'
FROM
information_schema.tables
WHERE
table_schema = 'cmdb';执行结果如下
mysql> SELECT
-> table_schema AS 'Database',
-> ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) AS 'Size (MB)'
-> FROM
-> information_schema.tables
-> WHERE
-> table_schema = 'cmdb';
+----------+-----------+
| Database | Size (MB) |
+----------+-----------+
| cmdb | 13.81 |
+----------+-----------+
1 row in set (0.01 sec)五、关于 SSH 操作之类的脚本
1. 测试远程登陆
[root@k8s-app-1 script]# cat try_SSH_remote_login.py
#!/usr/bin/env python3
import paramiko
import sys
import argparse
from pathlib import Path
def read_ip_list(file_path):
"""读取IP列表文件,支持多种格式"""
ip_info = []
try:
with open(file_path, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split()
if len(parts) >= 3:
ip_info.append({
'ip': parts[0],
'port': int(parts[1]),
'password': parts[2],
'username': parts[3] if len(parts) > 3 else 'root'
})
else:
print(f"警告: 第 {line_num} 行格式错误,已跳过: {line}")
except FileNotFoundError:
print(f"错误: 文件 '{file_path}' 不存在")
sys.exit(1)
except Exception as e:
print(f"错误: 读取文件时发生错误 - {str(e)}")
sys.exit(1)
return ip_info
def test_ssh_login(ip, port, password, username, timeout=10):
"""测试SSH登录,返回是否成功"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(ip, port=port, username=username, password=password, timeout=timeout)
print(f"[SUCCESS] ✅ {ip}:{port} 登录成功")
return True
except paramiko.AuthenticationException:
print(f"[FAILED] ❌ {ip}:{port} 认证失败(密码错误或用户名错误)")
except paramiko.SSHException as e:
print(f"[FAILED] ❌ {ip}:{port} SSH连接错误 - {str(e)}")
except socket.timeout:
print(f"[FAILED] ❌ {ip}:{port} 连接超时")
except Exception as e:
print(f"[FAILED] ❌ {ip}:{port} 连接失败 - {str(e)}")
finally:
ssh.close()
return False
def main():
parser = argparse.ArgumentParser(
description="SSH远程登录测试工具",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例:
%(prog)s -f ip-list.txt
%(prog)s -f ip-list.txt -t 5
%(prog)s -f ip-list.txt -o result.txt
%(prog)s --single 192.168.1.1 22 password123
文件格式说明:
IP地址 端口 密码 [用户名]
192.168.1.1 22 password123 root
10.0.0.1 2222 mypassword
# 注释行以#开头
"""
)
parser.add_argument('-f', '--file', default='ip-list.txt',
help='IP列表文件路径 (默认: ip-list.txt)')
parser.add_argument('-t', '--timeout', type=int, default=10,
help='连接超时时间(秒) (默认: 10)')
parser.add_argument('-o', '--output',
help='将结果输出到文件')
parser.add_argument('--single', nargs=3, metavar=('IP', 'PORT', 'PASSWORD'),
help='测试单个主机: IP 端口 密码')
parser.add_argument('--username', default='root',
help='单个主机测试时的用户名 (默认: root)')
args = parser.parse_args()
# 测试单个主机
if args.single:
ip, port, password = args.single
print(f"🚀 测试单个主机 {ip}:{port}...\n")
success = test_ssh_login(ip, int(port), password, args.username, args.timeout)
print("\n📊 ========== 测试结果 ==========")
print(f"结果: {'✅ 成功' if success else '❌ 失败'}")
sys.exit(0 if success else 1)
# 从文件测试多个主机
if not Path(args.file).exists():
print(f"错误: 文件 '{args.file}' 不存在")
print("请使用 -f 参数指定正确的文件路径,或使用 --single 测试单个主机")
sys.exit(1)
ip_info_list = read_ip_list(args.file)
if not ip_info_list:
print("错误: IP列表为空或格式错误!")
print("文件格式应为: IP地址 端口 密码 [用户名]")
sys.exit(1)
success_list = []
failed_list = []
print(f"\n🚀 开始测试SSH登录 (超时: {args.timeout}秒)...\n")
for info in ip_info_list:
ip, port, password, username = info['ip'], info['port'], info['password'], info['username']
print(f"测试 {username}@{ip}:{port}...")
if test_ssh_login(ip, port, password, username, args.timeout):
success_list.append(f"{username}@{ip}:{port}")
else:
failed_list.append(f"{username}@{ip}:{port}")
# 输出结果
result_output = []
result_output.append("\n📊 ========== 测试结果汇总 ==========")
result_output.append(f"✅ 成功登录的主机 ({len(success_list)}台):")
for host in success_list:
result_output.append(f" - {host}")
result_output.append(f"\n❌ 登录失败的主机 ({len(failed_list)}台):")
for host in failed_list:
result_output.append(f" - {host}")
result_output.append(f"\n成功率: {len(success_list)}/{len(ip_info_list)} "
f"({len(success_list)/len(ip_info_list)*100:.1f}%)")
result_output.append("测试完成!")
# 输出到终端和文件
result_text = "\n".join(result_output)
print(result_text)
if args.output:
try:
with open(args.output, 'w') as f:
f.write(result_text + "\n")
print(f"\n结果已保存到: {args.output}")
except Exception as e:
print(f"警告: 无法保存结果到文件 - {str(e)}")
if __name__ == "__main__":
import socket # 添加socket导入用于超时异常处理
main()# 查看帮助
python3 try_SSH_remote_login.py --help
# 使用默认文件测试
python3 try_SSH_remote_login.py
# 指定文件测试
python3 try_SSH_remote_login.py -f my_ips.txt
# 测试单个主机
python3 try_SSH_remote_login.py --single 192.168.1.1 22 password123
# 带超时设置和输出文件
python3 try_SSH_remote_login.py -f ip-list.txt -t 5 -o result.txt演示如下:
[root@k8s-app-1 python]# python3 try_SSH_remote_login.py --username root --single 192.168.233.131 22 123456
🚀 测试单个主机 192.168.233.131:22...
[SUCCESS] ✅ 192.168.233.131:22 登录成功
📊 ========== 测试结果 ==========
结果: ✅ 成功
[root@k8s-app-1 python]# 多个主机测试
[root@k8s-app-1 python]# cat ip-list.txt
192.168.233.103 22 123456
192.168.233.104 22 123456
192.168.233.105 22 123456[root@k8s-app-1 python]# python3 try_SSH_remote_login.py --file ip-list.txt
🚀 开始测试SSH登录 (超时: 10秒)...
测试 root@192.168.233.103:22...
[SUCCESS] ✅ 192.168.233.103:22 登录成功
测试 root@192.168.233.104:22...
[SUCCESS] ✅ 192.168.233.104:22 登录成功
测试 root@192.168.233.105:22...
[SUCCESS] ✅ 192.168.233.105:22 登录成功
📊 ========== 测试结果汇总 ==========
✅ 成功登录的主机 (3台):
- root@192.168.233.103:22
- root@192.168.233.104:22
- root@192.168.233.105:22
❌ 登录失败的主机 (0台):
成功率: 3/3 (100.0%)
测试完成!2. 维护用户名密码
批量管理远程主机:修改密码、创建用户、设置sudo、修复SSH端口
[root@k8s-app-1 python]# cat batch_user_manage_and_ssh_fix.py
import paramiko
import sys
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
def read_ip_list(file):
ip_info = []
with open(file, 'r') as f:
for line in f:
parts = line.strip().split()
if len(parts) == 3:
ip_info.append({
'ip': parts[0],
'port': int(parts[1]),
'password': parts[2]
})
return ip_info
def get_os_type(ssh):
stdin, stdout, stderr = ssh.exec_command('cat /etc/os-release')
os_release = stdout.read().decode().lower()
if 'ubuntu' in os_release:
return 'ubuntu'
elif 'centos' in os_release:
return 'centos'
else:
return 'unknown'
def execute_remote_commands(info, args):
ip = info['ip']
port = info['port']
login_password = info['password']
login_user = args.user
new_password = args.new_passwd
create_user = args.create_user
create_user_password = args.create_passwd
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
result = {"ip": ip, "success": False}
try:
ssh.connect(ip, port=port, username=login_user, password=login_password, timeout=10)
print(f"[SUCCESS] ✅ {ip}:{port} 登录成功")
# 获取操作系统类型
os_type = get_os_type(ssh)
print(f"[INFO] 🖥️ {ip}: 操作系统类型 - {os_type}")
# 修改登录用户密码
if new_password:
if os_type == 'ubuntu':
cmd = f'echo "{login_user}:{new_password}" | sudo chpasswd'
else:
cmd = f'echo "{new_password}" | passwd --stdin {login_user}'
stdin, stdout, stderr = ssh.exec_command(cmd)
if stdout.channel.recv_exit_status() == 0:
print(f"[UPDATE] 🔑 {ip}: 用户 {login_user} 密码已修改")
else:
print(f"[ERROR] ❌ {ip}: 登录用户密码修改失败 - {stderr.read().decode().strip()}")
else:
print(f"[SKIP] 🔐 {ip}: 未指定新密码,跳过登录用户密码修改")
# 创建新用户(如果指定)
if create_user:
print(f"[DEBUG] 创建用户参数: {create_user}, 密码: {create_user_password}") # 调试信息
stdin, stdout, stderr = ssh.exec_command(f'id -u {create_user}')
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
print(f"[CREATE] 👤 {ip}: 用户 {create_user} 不存在,开始创建")
# 创建用户
stdin, stdout, stderr = ssh.exec_command(f'sudo useradd -m {create_user}')
if stdout.channel.recv_exit_status() != 0:
print(f"[ERROR] ❌ {ip}: 用户创建失败 - {stderr.read().decode().strip()}")
raise Exception("用户创建失败")
# 设置密码
if create_user_password:
if os_type == 'ubuntu':
cmd = f'echo "{create_user}:{create_user_password}" | sudo chpasswd'
else:
cmd = f'echo "{create_user_password}" | passwd --stdin {create_user}'
stdin, stdout, stderr = ssh.exec_command(cmd)
if stdout.channel.recv_exit_status() != 0:
print(f"[ERROR] ❌ {ip}: 密码设置失败 - {stderr.read().decode().strip()}")
raise Exception("密码设置失败")
print(f"[PASSWD] 🔑 {ip}: 用户 {create_user} 密码已设置")
# 设置sudo权限
sudo_cmd = f'echo "{create_user} ALL=(ALL) NOPASSWD:ALL" | sudo tee /etc/sudoers.d/{create_user}'
stdin, stdout, stderr = ssh.exec_command(sudo_cmd)
if stdout.channel.recv_exit_status() != 0:
print(f"[ERROR] ❌ {ip}: sudo权限设置失败 - {stderr.read().decode().strip()}")
raise Exception("sudo权限设置失败")
# 设置文件权限
chmod_cmd = f'sudo chmod 440 /etc/sudoers.d/{create_user}'
ssh.exec_command(chmod_cmd)
print(f"[SUDO] 🛡️ {ip}: 用户 {create_user} 已配置 sudo 权限")
else:
print(f"[SKIP] 👤 {ip}: 用户 {create_user} 已存在,跳过创建")
# 设置 SSH 端口为 22(如不是)
stdin, stdout, _ = ssh.exec_command('grep "^Port" /etc/ssh/sshd_config || echo "Port 22"')
current_port = stdout.read().decode().strip()
if "Port 22" not in current_port:
ssh.exec_command('sed -i "s/^Port.*/Port 22/" /etc/ssh/sshd_config')
print(f"[UPDATE] 🔧 {ip}: SSH 端口已设置为 22")
else:
print(f"[SKIP] 🔧 {ip}: SSH 端口已经是 22")
# 重启 SSH 服务
if os_type == 'ubuntu':
restart_cmd = 'sudo systemctl restart ssh'
else:
restart_cmd = 'sudo systemctl restart sshd || sudo service sshd restart'
stdin, stdout, stderr = ssh.exec_command(restart_cmd)
if stdout.channel.recv_exit_status() == 0:
print(f"[RESTART] ♻️ {ip}: SSH 服务已重启")
else:
print(f"[ERROR] ❌ {ip}: SSH 重启失败 - {stderr.read().decode().strip()}")
result['success'] = True
except paramiko.AuthenticationException:
print(f"[FAILED] ❌ {ip}: 认证失败(密码错误)")
except paramiko.SSHException as e:
print(f"[FAILED] ❌ {ip}: SSH连接错误 - {str(e)}")
except Exception as e:
print(f"[FAILED] ❌ {ip}: 操作失败 - {str(e)}")
finally:
ssh.close()
return result
def main():
parser = argparse.ArgumentParser(description='批量管理远程主机:修改密码、创建用户、设置sudo、修复SSH端口')
parser.add_argument('--file', default='ip-list.txt', help='IP列表文件(格式:IP 端口 密码)')
parser.add_argument('--user', default='root', help='登录用户名')
parser.add_argument('--new-passwd', help='新密码:用于修改登录用户密码')
parser.add_argument('--create-user', help='要创建的新用户名')
parser.add_argument('--create-passwd', help='新用户的密码')
parser.add_argument('--workers', type=int, default=10, help='并发线程数(默认10)')
args = parser.parse_args()
# 调试输出参数
print(f"\n[DEBUG] 命令行参数解析结果:")
print(f"文件: {args.file}")
print(f"登录用户: {args.user}")
print(f"新密码: {args.new_passwd}")
print(f"创建用户: {args.create_user}")
print(f"创建用户密码: {args.create_passwd}")
print(f"并发数: {args.workers}\n")
ip_info_list = read_ip_list(args.file)
if not ip_info_list:
print("❌ IP列表为空或格式错误")
sys.exit(1)
print(f"\n🚀 开始处理 {len(ip_info_list)} 台主机,使用 {args.workers} 并发线程...\n")
success, failed = [], []
with ThreadPoolExecutor(max_workers=args.workers) as executor:
futures = [executor.submit(execute_remote_commands, info, args) for info in ip_info_list]
for future in as_completed(futures):
result = future.result()
(success if result['success'] else failed).append(result['ip'])
print("\n📊 ========== 执行结果汇总 ==========")
print(f"✅ 成功主机 ({len(success)} 台):")
for ip in success:
print(f" - {ip}")
print(f"\n❌ 失败主机 ({len(failed)} 台):")
for ip in failed:
print(f" - {ip}")
print("\n✅ 所有任务已完成。")
if __name__ == "__main__":
main()[root@k8s-app-1 python]# python3 batch_user_manage_and_ssh_fix.py --help
usage: batch_user_manage_and_ssh_fix.py [-h] [--file FILE] [--user USER] [--new-passwd NEW_PASSWD] [--create-user CREATE_USER] [--create-passwd CREATE_PASSWD] [--workers WORKERS]
批量管理远程主机:修改密码、创建用户、设置sudo、修复SSH端口
optional arguments:
-h, --help show this help message and exit
--file FILE IP列表文件(格式:IP 端口 密码)
--user USER 登录用户名
--new-passwd NEW_PASSWD
新密码:用于修改登录用户密码
--create-user CREATE_USER
要创建的新用户名
--create-passwd CREATE_PASSWD
新用户的密码
--workers WORKERS 并发线程数(默认10)3. 配置免密登陆
[root@k8s-app-1 script]# cat batch_root_free_login.py
import paramiko
import sys
import os
from pathlib import Path
# 定义文件路径
ip_list_file = 'ip-list.txt'
ssh_key_file = str(Path.home() / '.ssh/id_rsa.pub')
# 读取 IP 列表文件
def read_ip_list(file):
ip_info = []
with open(file, 'r') as f:
for line in f:
parts = line.strip().split()
if len(parts) == 3:
ip_info.append({
'ip': parts[0],
'port': int(parts[1]),
'password': parts[2]
})
return ip_info
# 测试SSH连接并配置免密登录
def test_and_setup_ssh(ip, port, password):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# 测试连接
ssh.connect(ip, port=port, username='root', password=password, timeout=10)
print(f"[SUCCESS] 成功连接到 {ip}:{port}")
# 检查是否已有公钥文件
if not os.path.exists(ssh_key_file):
print(f"[WARNING] {ip}: 本地SSH公钥文件不存在,跳过配置免密登录")
return True
# 读取本地公钥
with open(ssh_key_file, 'r') as key_file:
public_key = key_file.read().strip()
# 配置免密登录
ssh.exec_command("mkdir -p ~/.ssh && chmod 700 ~/.ssh")
ssh.exec_command(f"echo '{public_key}' >> ~/.ssh/authorized_keys && chmod 600 ~/.ssh/authorized_keys")
print(f"[SUCCESS] {ip}: 已成功配置免密登录")
return True
except paramiko.AuthenticationException:
print(f"[FAILED] {ip}: 认证失败,请检查密码")
except paramiko.SSHException as e:
print(f"[FAILED] {ip}: SSH连接错误 - {str(e)}")
except Exception as e:
print(f"[FAILED] {ip}: 连接失败 - {str(e)}")
finally:
ssh.close()
return False
# 主函数
def main():
# 读取IP列表
ip_info_list = read_ip_list(ip_list_file)
if not ip_info_list:
print("错误: IP列表为空或格式错误")
sys.exit(1)
# 统计结果
success_list = []
failed_list = []
print("\n开始测试SSH连接...\n")
# 测试每台主机
for info in ip_info_list:
ip = info['ip']
port = info['port']
password = info['password']
print(f"正在处理 {ip}:{port}...")
if test_and_setup_ssh(ip, port, password):
success_list.append(ip)
else:
failed_list.append(ip)
# 打印汇总结果
print("\n========== 测试结果汇总 ==========")
print(f"成功连接的主机 ({len(success_list)}台):")
for ip in success_list:
print(f" √ {ip}")
print(f"\n连接失败的主机 ({len(failed_list)}台):")
for ip in failed_list:
print(f" × {ip}")
print("\n测试完成")
if __name__ == "__main__":
main()帮助命令:
#!/usr/bin/env python3
import paramiko
import sys
import os
import argparse
from pathlib import Path
def read_ip_list(file_path):
"""读取IP列表文件,支持多种格式"""
ip_info = []
try:
with open(file_path, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split()
if len(parts) >= 3:
ip_info.append({
'ip': parts[0],
'port': int(parts[1]),
'password': parts[2],
'username': parts[3] if len(parts) > 3 else 'root'
})
else:
print(f"警告: 第 {line_num} 行格式错误,已跳过: {line}")
except FileNotFoundError:
print(f"错误: 文件 '{file_path}' 不存在")
sys.exit(1)
except Exception as e:
print(f"错误: 读取文件时发生错误 - {str(e)}")
sys.exit(1)
return ip_info
def test_and_setup_ssh(ip, port, password, username, ssh_key_file, timeout=10, dry_run=False):
"""测试SSH连接并配置免密登录"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
# 测试连接
if dry_run:
print(f"[DRY RUN] 将测试连接到 {username}@{ip}:{port}")
return True
ssh.connect(ip, port=port, username=username, password=password, timeout=timeout)
print(f"[SUCCESS] 成功连接到 {username}@{ip}:{port}")
# 检查是否已有公钥文件
if not os.path.exists(ssh_key_file):
print(f"[WARNING] {ip}: 本地SSH公钥文件不存在,跳过配置免密登录")
return True
# 读取本地公钥
with open(ssh_key_file, 'r') as key_file:
public_key = key_file.read().strip()
# 配置免密登录
commands = [
"mkdir -p ~/.ssh",
"chmod 700 ~/.ssh",
f"echo '{public_key}' >> ~/.ssh/authorized_keys",
"chmod 600 ~/.ssh/authorized_keys"
]
for cmd in commands:
stdin, stdout, stderr = ssh.exec_command(cmd)
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
error_output = stderr.read().decode().strip()
print(f"[WARNING] {ip}: 命令执行失败 '{cmd}': {error_output}")
print(f"[SUCCESS] {ip}: 已成功配置免密登录")
return True
except paramiko.AuthenticationException:
print(f"[FAILED] {ip}: 认证失败,请检查密码")
except paramiko.SSHException as e:
print(f"[FAILED] {ip}: SSH连接错误 - {str(e)}")
except Exception as e:
print(f"[FAILED] {ip}: 连接失败 - {str(e)}")
finally:
ssh.close()
return False
def main():
parser = argparse.ArgumentParser(
description="批量SSH免密登录配置工具",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例:
%(prog)s -f ip-list.txt
%(prog)s -f ip-list.txt -k ~/.ssh/my_key.pub
%(prog)s -f ip-list.txt -t 5 --dry-run
%(prog)s --single 192.168.1.1 22 password123
%(prog)s --single 192.168.1.1 22 password123 -u admin
文件格式说明:
IP地址 端口 密码 [用户名]
192.168.1.1 22 password123 root
10.0.0.1 2222 mypassword admin
# 注释行以#开头
"""
)
parser.add_argument('-f', '--file', default='ip-list.txt',
help='IP列表文件路径 (默认: ip-list.txt)')
parser.add_argument('-k', '--key-file', default=str(Path.home() / '.ssh/id_rsa.pub'),
help='SSH公钥文件路径 (默认: ~/.ssh/id_rsa.pub)')
parser.add_argument('-t', '--timeout', type=int, default=10,
help='连接超时时间(秒) (默认: 10)')
parser.add_argument('-u', '--username', default='root',
help='用户名 (默认: root)')
parser.add_argument('--dry-run', action='store_true',
help='试运行模式,只显示将要执行的操作而不实际执行')
parser.add_argument('--single', nargs=3, metavar=('IP', 'PORT', 'PASSWORD'),
help='配置单个主机: IP 端口 密码')
args = parser.parse_args()
# 检查公钥文件是否存在
if not args.dry_run and not os.path.exists(args.key_file):
print(f"错误: SSH公钥文件 '{args.key_file}' 不存在")
print("请使用 -k 参数指定正确的公钥文件路径,或使用 ssh-keygen 生成密钥对")
sys.exit(1)
# 测试单个主机
if args.single:
ip, port, password = args.single
print(f"🚀 配置单个主机 {args.username}@{ip}:{port}...\n")
success = test_and_setup_ssh(
ip, int(port), password, args.username,
args.key_file, args.timeout, args.dry_run
)
print("\n📊 ========== 配置结果 ==========")
print(f"结果: {'✅ 成功' if success else '❌ 失败'}")
sys.exit(0 if success else 1)
# 从文件测试多个主机
if not os.path.exists(args.file):
print(f"错误: 文件 '{args.file}' 不存在")
print("请使用 -f 参数指定正确的文件路径,或使用 --single 配置单个主机")
sys.exit(1)
ip_info_list = read_ip_list(args.file)
if not ip_info_list:
print("错误: IP列表为空或格式错误!")
print("文件格式应为: IP地址 端口 密码 [用户名]")
sys.exit(1)
success_list = []
failed_list = []
mode = "试运行模式" if args.dry_run else "执行模式"
print(f"\n🚀 开始批量配置SSH免密登录 ({mode})...\n")
for info in ip_info_list:
ip, port, password, username = info['ip'], info['port'], info['password'], info['username']
print(f"处理 {username}@{ip}:{port}...")
if test_and_setup_ssh(ip, port, password, username, args.key_file, args.timeout, args.dry_run):
success_list.append(f"{username}@{ip}:{port}")
else:
failed_list.append(f"{username}@{ip}:{port}")
# 输出结果
print("\n📊 ========== 配置结果汇总 ==========")
print(f"✅ 成功配置的主机 ({len(success_list)}台):")
for host in success_list:
print(f" - {host}")
if failed_list:
print(f"\n❌ 配置失败的主机 ({len(failed_list)}台):")
for host in failed_list:
print(f" - {host}")
print(f"\n成功率: {len(success_list)}/{len(ip_info_list)} "
f"({len(success_list)/len(ip_info_list)*100:.1f}%%)")
if args.dry_run:
print("\n💡 提示: 本次为试运行模式,未实际执行任何配置操作")
print("请移除 --dry-run 参数来实际执行配置")
else:
print("\n✅ 配置完成!")
# 返回适当的退出码
sys.exit(1 if failed_list and not args.dry_run else 0)
if __name__ == "__main__":
main()帮助命令:
# 查看帮助
python3 batch_root_free_login.py --help
# 使用默认配置
python3 batch_root_free_login.py
# 指定IP列表文件和公钥文件
python3 batch_root_free_login.py -f my_ips.txt -k ~/.ssh/my_key.pub
# 试运行模式(只显示将要执行的操作)
python3 batch_root_free_login.py --dry-run
# 配置单个主机
python3 batch_root_free_login.py --single 192.168.1.1 22 password123
# 配置单个主机并指定用户名
python3 batch_root_free_login.py --single 192.168.1.1 22 password123 -u admin演示如下:
# 配置单个主机免密登陆,同时指定 root 和公钥
[root@k8s-app-1 python]# python3 batch_root_free_login.py --single 192.168.233.131 22 123456 -u root -k ~/.ssh/id_rsa.pub
🚀 配置单个主机 root@192.168.233.131:22...
[SUCCESS] 成功连接到 root@192.168.233.131:22
[SUCCESS] 192.168.233.131: 已成功配置免密登录
📊 ========== 配置结果 ==========
结果: ✅ 成功
# 配置单个主机免密登陆,默认使用 root
[root@k8s-app-1 python]# python3 batch_root_free_login.py --single 192.168.233.131 22 123456
🚀 配置单个主机 root@192.168.233.131:22...
[SUCCESS] 成功连接到 root@192.168.233.131:22
[SUCCESS] 192.168.233.131: 已成功配置免密登录
📊 ========== 配置结果 ==========
结果: ✅ 成功配置多个主机免密登陆
# 使用默认 root
[root@k8s-app-1 python]# python3 batch_root_free_login.py --file ip-list.txt
🚀 开始批量配置SSH免密登录 (执行模式)...
处理 root@192.168.233.103:22...
[SUCCESS] 成功连接到 root@192.168.233.103:22
[SUCCESS] 192.168.233.103: 已成功配置免密登录
处理 root@192.168.233.104:22...
[SUCCESS] 成功连接到 root@192.168.233.104:22
[SUCCESS] 192.168.233.104: 已成功配置免密登录
处理 root@192.168.233.105:22...
[SUCCESS] 成功连接到 root@192.168.233.105:22
[SUCCESS] 192.168.233.105: 已成功配置免密登录
📊 ========== 配置结果汇总 ==========
✅ 成功配置的主机 (3台):
- root@192.168.233.103:22
- root@192.168.233.104:22
- root@192.168.233.105:22
成功率: 3/3 (100.0%%)
✅ 配置完成!4. 自动化远程机器执行shell脚本
[root@k8s-app-1 script]# cat batch_push_and_exec_script.py
#!/usr/bin/env python3
import paramiko
import argparse
import sys
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
DANGEROUS_PATTERNS = [
'rm -rf /', 'reboot', 'poweroff', 'shutdown', 'mkfs',
':(){ :|: & };:', 'dd if=', '>/dev/sd', '>:()'
]
def check_dangerous_script(local_script):
"""检查脚本中是否包含危险命令"""
with open(local_script, 'r') as f:
content = f.read()
found = []
for pattern in DANGEROUS_PATTERNS:
if pattern in content:
found.append(pattern)
if found:
print("\n⚠️⚠️⚠️ 警告:你的脚本中包含以下 **危险命令**,请务必确认:")
for cmd in found:
print(f" ⚠️ {cmd}")
print("继续执行可能会造成数据丢失或系统不可用!")
confirm = input("❗ 是否继续执行?(yes/NO): ")
if confirm.strip().lower() != 'yes':
print("❌ 已取消执行。")
sys.exit(1)
def read_ip_list(file_path):
"""读取IP列表文件,支持注释行"""
ip_info = []
try:
with open(file_path, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split()
if len(parts) >= 3:
ip_info.append({
'ip': parts[0],
'port': int(parts[1]),
'password': parts[2],
'username': parts[3] if len(parts) > 3 else 'root'
})
else:
print(f"警告: 第 {line_num} 行格式错误,已跳过: {line}")
except FileNotFoundError:
print(f"错误: 文件 '{file_path}' 不存在")
sys.exit(1)
except Exception as e:
print(f"错误: 读取文件时发生错误 - {str(e)}")
sys.exit(1)
return ip_info
def ssh_push_and_exec(info, args):
ip = info['ip']
port = info['port']
password = info['password']
username = info.get('username', args.user)
local_script = args.script
remote_script = f"/tmp/{Path(local_script).name}"
script_args = args.script_args or ""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
result = {"ip": ip, "success": False}
try:
ssh.connect(ip, port=port, username=username, password=password, timeout=args.timeout)
print(f"[{ip}] ✅ 登录成功")
# 上传脚本
sftp = ssh.open_sftp()
sftp.put(local_script, remote_script)
sftp.chmod(remote_script, 0o755)
sftp.close()
print(f"[{ip}] 📤 脚本已上传到 {remote_script}")
# 执行脚本
cmd = f"bash {remote_script} {script_args}"
stdin, stdout, stderr = ssh.exec_command(cmd)
exit_code = stdout.channel.recv_exit_status()
stdout_result = stdout.read().decode()
stderr_result = stderr.read().decode()
# 清理远程脚本
ssh.exec_command(f"rm -f {remote_script}")
if exit_code == 0:
print(f"[{ip}] ✅ 脚本执行成功")
result["success"] = True
else:
print(f"[{ip}] ❌ 脚本执行失败 (exit {exit_code})")
# 打印输出
if stdout_result.strip():
print(f"[{ip}] --- STDOUT ---\n{stdout_result}")
if stderr_result.strip():
print(f"[{ip}] --- STDERR ---\n{stderr_result}")
except Exception as e:
print(f"[{ip}] ❌ 执行出错: {str(e)}")
finally:
ssh.close()
return result
def main():
parser = argparse.ArgumentParser(
description='批量上传并执行本地脚本',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例:
%(prog)s --script deploy.sh
%(prog)s --script deploy.sh --file my_ips.txt
%(prog)s --script deploy.sh --single 192.168.1.1 22 password123
%(prog)s --script deploy.sh --script-args "--verbose --force"
%(prog)s --script deploy.sh --workers 5 --timeout 30
文件格式说明:
IP地址 端口 密码 [用户名]
192.168.1.1 22 password123 root
10.0.0.1 2222 mypassword admin
# 注释行以#开头
"""
)
parser.add_argument('--file', default='ip-list.txt', help='IP列表文件 (默认: ip-list.txt)')
parser.add_argument('--user', default='root', help='SSH 登录用户名 (默认: root)')
parser.add_argument('--script', required=True, help='本地要上传并执行的脚本路径 (.sh)')
parser.add_argument('--script-args', help='传递给远程脚本的参数(可选)')
parser.add_argument('--workers', type=int, default=10, help='并发线程数 (默认: 10)')
parser.add_argument('--timeout', type=int, default=10, help='连接超时时间(秒) (默认: 10)')
parser.add_argument('--single', nargs=3, metavar=('IP', 'PORT', 'PASSWORD'),
help='执行单个主机: IP 端口 密码')
args = parser.parse_args()
# 脚本文件检查与危险命令提示
script_path = Path(args.script)
if not script_path.exists():
print(f"❌ 脚本文件不存在: {args.script}")
sys.exit(1)
if script_path.suffix != '.sh':
print(f"❌ 脚本必须是 .sh 文件: {args.script}")
sys.exit(1)
check_dangerous_script(args.script)
# 处理单个主机模式
if args.single:
ip, port, password = args.single
single_info = {
'ip': ip,
'port': int(port),
'password': password,
'username': args.user
}
print(f"\n🚀 开始执行单个主机 {args.user}@{ip}:{port}...\n")
result = ssh_push_and_exec(single_info, args)
print("\n📊 执行完成")
if result["success"]:
print(f"✅ 主机 {ip} 执行成功")
sys.exit(0)
else:
print(f"❌ 主机 {ip} 执行失败")
sys.exit(1)
# 批量模式:从文件读取IP列表
if not os.path.exists(args.file):
print(f"错误: 文件 '{args.file}' 不存在")
print("请使用 --file 参数指定正确的文件路径,或使用 --single 执行单个主机")
sys.exit(1)
ip_info_list = read_ip_list(args.file)
if not ip_info_list:
print("❌ IP 列表为空或格式错误")
print("文件格式应为: IP地址 端口 密码 [用户名]")
sys.exit(1)
print(f"\n🚀 开始批量执行脚本,共 {len(ip_info_list)} 台主机,最大并发 {args.workers} ...\n")
success, failed = [], []
with ThreadPoolExecutor(max_workers=args.workers) as executor:
futures = [executor.submit(ssh_push_and_exec, info, args) for info in ip_info_list]
for future in as_completed(futures):
result = future.result()
(success if result["success"] else failed).append(result["ip"])
print("\n📊 执行完成")
print(f"✅ 成功主机 ({len(success)} 台):")
for ip in success:
print(f" - {ip}")
if failed:
print(f"\n❌ 失败主机 ({len(failed)} 台):")
for ip in failed:
print(f" - {ip}")
print(f"\n成功率: {len(success)}/{len(ip_info_list)} "
f"({len(success)/len(ip_info_list)*100:.1f}%%)")
# 如果有失败的主机,返回非零退出码
if failed:
sys.exit(1)
else:
sys.exit(0)
if __name__ == '__main__':
main()帮助命令
[root@k8s-app-1 python]# python3 batch_push_and_exec_script.py --help
usage: batch_push_and_exec_script.py [-h] [--file FILE] [--user USER] --script SCRIPT [--script-args SCRIPT_ARGS] [--workers WORKERS] [--timeout TIMEOUT] [--single IP PORT PASSWORD]
批量上传并执行本地脚本
optional arguments:
-h, --help show this help message and exit
--file FILE IP列表文件 (默认: ip-list.txt)
--user USER SSH 登录用户名 (默认: root)
--script SCRIPT 本地要上传并执行的脚本路径 (.sh)
--script-args SCRIPT_ARGS
传递给远程脚本的参数(可选)
--workers WORKERS 并发线程数 (默认: 10)
--timeout TIMEOUT 连接超时时间(秒) (默认: 10)
--single IP PORT PASSWORD
执行单个主机: IP 端口 密码
使用示例:
batch_push_and_exec_script.py --script deploy.sh
batch_push_and_exec_script.py --script deploy.sh --file my_ips.txt
batch_push_and_exec_script.py --script deploy.sh --single 192.168.1.1 22 password123
batch_push_and_exec_script.py --script deploy.sh --script-args "--verbose --force"
batch_push_and_exec_script.py --script deploy.sh --workers 5 --timeout 30
文件格式说明:
IP地址 端口 密码 [用户名]
192.168.1.1 22 password123 root
10.0.0.1 2222 mypassword admin
# 注释行以#开头# 查看帮助
python3 batch_push_and_exec_script.py --help
# 批量执行(使用默认ip-list.txt)
python3 batch_push_and_exec_script.py --script deploy.sh
# 批量执行(指定IP列表文件)
python3 batch_push_and_exec_script.py --script deploy.sh --file my_ips.txt
# 单个主机执行
python3 batch_push_and_exec_script.py --script deploy.sh --single 192.168.1.1 22 password123
# 带参数执行
python3 batch_push_and_exec_script.py --script deploy.sh --script-args "--verbose --force"
# 控制并发和超时
python3 batch_push_and_exec_script.py --script deploy.sh --workers 5 --timeout 30六、K8S 和 容器相关脚本
1. 根据域名查询都有哪些 ingress
[root@k8s-app-1 meta42-tls]# cat check_ingress_secrets.sh
#!/bin/bash
# 目标域名
TARGET_DOMAIN="blog.tianxiang.love"
# 获取所有命名空间
namespaces=$(kubectl get ns -o jsonpath='{.items[*].metadata.name}')
# 打印表头
printf "%-30s %-50s %-50s %-50s\n" "Namespace" "Ingress Name" "TLS Secret Name" "Matching Host"
printf "===============================================================================================================================\n"
# 遍历所有命名空间
for ns in $namespaces; do
# 获取当前命名空间的所有ingress
ingresses=$(kubectl get ingress -n $ns -o jsonpath='{.items[*].metadata.name}')
for ingress in $ingresses; do
# 获取ingress的完整信息
ingress_json=$(kubectl get ingress -n $ns $ingress -o json)
# 检查是否有匹配的host
matching_hosts=$(echo "$ingress_json" | jq -r --arg domain "$TARGET_DOMAIN" '.spec.rules[]?.host | select(. != null and contains($domain))')
if [ -n "$matching_hosts" ]; then
# 获取tls secret名称
secrets=$(echo "$ingress_json" | jq -r '.spec.tls[]?.secretName | select(. != null)')
# 如果没有tls配置,则显示N/A
if [ -z "$secrets" ]; then
printf "%-30s %-50s %-50s %-50s\n" "$ns" "$ingress" "N/A (No TLS configured)" "$matching_hosts"
else
# 一个ingress可能有多个tls配置,遍历输出
for secret in $secrets; do
printf "%-30s %-50s %-50s %-50s\n" "$ns" "$ingress" "$secret" "$matching_hosts"
done
fi
fi
done
done2. 更新 ingress 使用的 secret
[root@k8s-app-1 meta42-tls]# cat update_ingress_certs.sh
#!/bin/bash
# 目标域名
TARGET_DOMAIN="blog.tianxiang.love"
# 新证书文件路径
NEW_CERT_FILE="/data/tianxiang/script/meta42-tls/nginx/blog.tianxiang.love.pem"
NEW_KEY_FILE="/data/tianxiang/script/meta42-tls/nginx/blog.tianxiang.love.key"
# 检查证书文件是否存在
if [ ! -f "$NEW_CERT_FILE" ] || [ ! -f "$NEW_KEY_FILE" ]; then
echo "错误:证书文件不存在,请检查路径"
echo "NEW_CERT_FILE: $NEW_CERT_FILE"
echo "NEW_KEY_FILE: $NEW_KEY_FILE"
exit 1
fi
# 获取所有命名空间
namespaces=$(kubectl get ns -o jsonpath='{.items[*].metadata.name}')
# 打印表头
printf "%-30s %-50s %-50s %-50s\n" "Namespace" "Ingress Name" "TLS Secret Name" "Matching Host"
printf "===============================================================================================================================\n"
# 存储需要更新的secret列表
declare -A secrets_to_update
# 遍历所有命名空间
for ns in $namespaces; do
# 获取当前命名空间的所有ingress
ingresses=$(kubectl get ingress -n $ns -o jsonpath='{.items[*].metadata.name}')
for ingress in $ingresses; do
# 获取ingress的完整信息
ingress_json=$(kubectl get ingress -n $ns $ingress -o json)
# 检查是否有匹配的host
matching_hosts=$(echo "$ingress_json" | jq -r --arg domain "$TARGET_DOMAIN" '.spec.rules[]?.host | select(. != null and contains($domain))')
if [ -n "$matching_hosts" ]; then
# 获取tls secret名称
secrets=$(echo "$ingress_json" | jq -r '.spec.tls[]?.secretName | select(. != null)')
# 只处理有TLS配置的ingress
if [ -n "$secrets" ]; then
# 一个ingress可能有多个tls配置,遍历输出
for secret in $secrets; do
printf "%-30s %-50s %-50s %-50s\n" "$ns" "$ingress" "$secret" "$matching_hosts"
# 记录需要更新的secret
secrets_to_update["$ns/$secret"]=1
done
fi
fi
done
done
# 询问是否要更新证书
if [ ${#secrets_to_update[@]} -gt 0 ]; then
echo -e "\n发现 ${#secrets_to_update[@]} 个需要更新的TLS secret:"
for secret in "${!secrets_to_update[@]}"; do
echo " - $secret"
done
read -p "是否要更新以上所有secret的证书?(y/n) " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
for secret in "${!secrets_to_update[@]}"; do
ns="${secret%%/*}"
secret_name="${secret#*/}"
echo -e "\n正在更新 $ns/$secret_name ..."
kubectl create secret tls "$secret_name" \
--cert="$NEW_CERT_FILE" \
--key="$NEW_KEY_FILE" \
-n "$ns" \
--dry-run=client -o yaml | kubectl apply -f -
if [ $? -eq 0 ]; then
echo "成功更新 $ns/$secret_name"
else
echo "更新 $ns/$secret_name 失败"
fi
done
echo -e "\n所有证书更新完成"
else
echo "已取消证书更新"
fi
else
echo -e "\n没有找到需要更新的TLS secret"
fi