avatar

甄天祥-Linux-个人小站

A text-focused Halo theme

  • 首页
  • 分类
  • 标签
  • 关于
Home Linux 运维常用脚本和工具
文章

Linux 运维常用脚本和工具

Posted 2025-05-19 Updated 2025-10- 29
By Administrator
205~263 min read

一、ETCD 备份脚本

1. 主程序脚本

[root@mysql-131 local]# cat /usr/local/bin/etcd-backup.sh
#!/bin/bash

# 配置变量
ETCD_CA="/etc/etcd/ssl/etcd-ca.pem"
ETCD_CERT="/etc/etcd/ssl/etcd-server.pem"
ETCD_KEY="/etc/etcd/ssl/etcd-server-key.pem"
ETCD_ENDPOINTS="https://192.168.233.32:2379,https://192.168.233.33:2379,https://192.168.233.34:2379"
BACKUP_DIR="/var/openebs/local/etcd-backup"
KEEP_DAYS="7"

# 创建备份目录
mkdir -p "${BACKUP_DIR}"

# 选择第一个健康的节点进行备份
IFS=',' read -ra ENDPOINTS <<< "${ETCD_ENDPOINTS}"
for endpoint in "${ENDPOINTS[@]}"; do
    if ETCDCTL_API=3 etcdctl --cacert="${ETCD_CA}" \
       --cert="${ETCD_CERT}" --key="${ETCD_KEY}" \
       --endpoints="${endpoint}" endpoint health &>/dev/null; then
        HEALTHY_ENDPOINT="${endpoint}"
        break
    fi
done

if [ -z "${HEALTHY_ENDPOINT}" ]; then
    echo "$(date '+%Y-%m-%d %H:%M:%S') - No healthy etcd endpoint available, skipping backup" >&2
    exit 1
fi

# 创建备份文件名
BACKUP_FILE="${BACKUP_DIR}/etcd-snapshot-$(date +%Y%m%d-%H%M%S).db"

# 执行备份
echo "$(date '+%Y-%m-%d %H:%M:%S') - Starting etcd backup to ${BACKUP_FILE} using endpoint ${HEALTHY_ENDPOINT}"
if ETCDCTL_API=3 etcdctl --cacert="${ETCD_CA}" \
   --cert="${ETCD_CERT}" --key="${ETCD_KEY}" \
   --endpoints="${HEALTHY_ENDPOINT}" snapshot save "${BACKUP_FILE}"; then
    echo "$(date '+%Y-%m-%d %H:%M:%S') - Backup completed successfully"
    
    # 检查备份文件状态
    ETCDCTL_API=3 etcdctl --write-out=table snapshot status "${BACKUP_FILE}"
    
    # 清理旧备份
    find "${BACKUP_DIR}" -name 'etcd-snapshot-*.db' -mtime +${KEEP_DAYS} -delete
    echo "$(date '+%Y-%m-%d %H:%M:%S') - Old backups older than ${KEEP_DAYS} days cleaned"
else
    echo "$(date '+%Y-%m-%d %H:%M:%S') - Backup failed" >&2
    exit 1
fi

2. systemd 管理脚本

[root@mysql-131 local]# cat /etc/systemd/system/etcd-backup.service
[Unit]
Description=ETCD Backup Service
After=network.target
Wants=etcd.service

[Service]
Type=oneshot
User=root
ExecStart=/usr/local/bin/etcd-backup.sh
Environment="ETCDCTL_API=3"

[Install]
WantedBy=multi-user.target

3. systemd 定时任务

当然也可以用 cron 计划任务

[root@mysql-131 local]# cat /etc/systemd/system/etcd-backup.timer 
[Unit]
Description=Run etcd backup every 4 hours

[Timer]
OnCalendar=*-*-* 00/4:00:00
AccuracySec=1m
Persistent=true

[Install]
WantedBy=timers.target

解读定时器

[Unit] 部分
Description: 对定时任务的描述,这里是“每4小时运行一次 etcd 备份”。

[Timer] 部分
OnCalendar: 定义任务的触发时间。*-*-* 00/4:00:00 表示从午夜开始,每隔4小时触发一次任务(即00:00、04:00、08:00等)。
AccuracySec: 定义任务触发的时间精度,这里是1分钟。这意味着任务可能会在指定时间的前后1分钟内触发。
Persistent: 如果设置为 true,表示如果系统在预定时间未运行,任务会在系统启动后尽快执行一次。

[Install] 部分
WantedBy: 定义该定时任务在系统启动时是否自动启动。timers.target 是一个特殊的 systemd 目标,用于管理所有定时任务。

二、远程同步脚本

1. 定时远程同步本地文件

定时每天凌晨1点同步本地目录中最新的6个文件到远端机器

0 1 * * * /bin/bash -c "ls /var/openebs/local/etcd-backup/ | sort -r | head -6 | xargs -I {} rsync -avz --partial --progress --checksum /var/openebs/local/etcd-backup/{} root@192.168.233.132:/var/openebs/db-backup/etcd-backup" >> /var/log/etcd-backup.log 2>&1

本地文件命名格式如下:

[root@mysql-131 ~]# ls -lah /var/openebs/local/etcd-backup/
total 157G
drwxr-xr-x 2 root root 4.0K May 20 12:01 .
drwxr-xr-x 6 root root 4.0K May  7 15:44 ..
-rw------- 1 root root 3.3G May 19 12:01 etcd-snapshot-20250519-120000.db
-rw------- 1 root root 3.3G May 19 16:01 etcd-snapshot-20250519-160000.db
-rw------- 1 root root 3.3G May 19 20:04 etcd-snapshot-20250519-200000.db
-rw------- 1 root root 3.3G May 20 00:02 etcd-snapshot-20250520-000000.db
-rw------- 1 root root 3.3G May 20 04:02 etcd-snapshot-20250520-040000.db
-rw------- 1 root root 3.3G May 20 08:01 etcd-snapshot-20250520-080000.db
-rw------- 1 root root 3.3G May 20 12:01 etcd-snapshot-20250520-120000.db

2. 脚本本地目录同步到远程

  1. 会检查本地 /var/openebs/local/ 下的 mysql-backup、pgsql-backup 和 mongodb-backup 目录

  2. 筛选出最近1天创建的备份目录(根据目录名中的日期部分)

  3. 使用rsync将目录同步到远程对应的位置

  4. 记录详细日志到 /var/log/backup_sync.log

  5. --partial 参数,允许中断的传输在恢复时继续

  6. --timeout=60 参数,防止网络问题导致长时间挂起

  7. 实现了 sync_with_retry 函数,最多重试3次

  8. 每次重试之间有30秒的间隔(可配置)

  9. 脚本会跟踪失败的同步操作数量

  10. 最终退出状态码反映失败的数量

  11. 更详细的日志记录,方便排查问题

$ vim /usr/local/bin/backup_sync.sh

#!/bin/bash

# 配置参数
LOCAL_BASE="/var/openebs/local"
REMOTE_HOST="192.168.233.132"
REMOTE_USER="root"
REMOTE_BASE="/var/openebs/db-backup"
LOG_FILE="/var/log/backup_sync.log"
MAX_RETRIES=3
RETRY_DELAY=30

# 备份类型和对应的$LOCAL_BASE里面的目录
declare -A BACKUP_TYPES=(
    ["mysql"]="mysql-backup"
    ["pgsql"]="pgsql-backup"
    ["mongodb"]="mongodb-backup"
)

# 创建日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >> "$LOG_FILE"
}

# 带重试机制的rsync同步函数
sync_with_retry() {
    local src=$1
    local remote_target=$2
    local backup_type=$3
    local retry_count=0
    local sync_success=false
    
    # 提取目录名用于日志
    local dir_name=$(basename "$src")
    
    while [ $retry_count -lt $MAX_RETRIES ] && [ "$sync_success" = false ]; do
        log "尝试 $((retry_count+1))/$MAX_RETRIES: 同步 $dir_name ($backup_type) 到远程..."
        
        # 使用SSH方式的rsync进行同步
        rsync -avz --partial --progress --timeout=60 -e ssh \
              --rsync-path="mkdir -p ${REMOTE_BASE}/${BACKUP_TYPES[$backup_type]} && rsync" \
              "$src/" \
              "${REMOTE_USER}@${REMOTE_HOST}:${REMOTE_BASE}/${BACKUP_TYPES[$backup_type]}/$dir_name/" \
              >> "$LOG_FILE" 2>&1
        
        if [ $? -eq 0 ]; then
            sync_success=true
            log "同步 $dir_name ($backup_type) 成功"
        else
            retry_count=$((retry_count+1))
            if [ $retry_count -lt $MAX_RETRIES ]; then
                log "同步失败,${RETRY_DELAY}秒后重试..."
                sleep $RETRY_DELAY
            else
                log "同步 $dir_name ($backup_type) 失败,已达最大重试次数"
            fi
        fi
    done
    
    if [ "$sync_success" = false ]; then
        return 1
    else
        return 0
    fi
}

# 主同步函数
sync_recent_backups() {
    local sync_errors=0
    
    # 获取当前日期和前一天日期(字符串格式)
    local current_date=$(date +%Y%m%d)
    local yesterday_date=$(date -d "1 day ago" +%Y%m%d)
    
    log "当前日期: $current_date, 前一天日期: $yesterday_date"

    for backup_type in "${!BACKUP_TYPES[@]}"; do
        local_source_dir="${LOCAL_BASE}/${BACKUP_TYPES[$backup_type]}"
        
        if [ ! -d "$local_source_dir" ]; then
            log "警告: 本地目录不存在: $local_source_dir"
            continue
        fi

        log "开始处理 $backup_type 备份..."
        
        # 查找最近1天的目录(当天+前一天)
        recent_dirs=()
        while IFS= read -r -d $'\0' dir; do
            dir_name=$(basename "$dir")
            
            # 提取8位日期字符串(前8个字符)
            dir_date="${dir_name:0:8}"
            
            # 验证是否为有效日期
            if ! date -d "$dir_date" &>/dev/null; then
                log "跳过无效日期目录: $dir_name"
                continue
            fi

            # 直接比较日期字符串
            if [[ "$dir_date" == "$current_date" || "$dir_date" == "$yesterday_date" ]]; then
                log "目录 $dir_name 属于最近1天(日期: $dir_date),将同步"
                recent_dirs+=("$dir")
            else
                log "跳过旧目录: $dir_name(日期: $dir_date)"
            fi
        done < <(find "$local_source_dir" -maxdepth 1 -type d -name "202*" -print0)
        
        if [ ${#recent_dirs[@]} -eq 0 ]; then
            log "未找到最近1天的 $backup_type 备份目录"
            continue
        fi

        # 同步每个找到的目录
        for dir in "${recent_dirs[@]}"; do
            # 调用带重试机制的同步函数
            sync_with_retry "$dir" "${REMOTE_USER}@${REMOTE_HOST}:${REMOTE_BASE}/${BACKUP_TYPES[$backup_type]}" "$backup_type"
            
            if [ $? -ne 0 ]; then
                sync_errors=$((sync_errors+1))
            fi
        done
    done
    
    return $sync_errors
}

# 主执行流程
log "===== 开始备份同步任务 ====="
sync_recent_backups
result=$?

if [ $result -eq 0 ]; then
    log "备份同步任务成功完成"
else
    log "备份同步任务完成,但有 $result 个目录同步失败"
fi

log "===== 任务结束 ====="

exit $result

计划任务定期执行脚本

0 1 * * * /bin/bash -c "bash /usr/local/bin/backup_sync.sh"

3. harbor 仓库远程自动同步

该脚本会自动把 A 仓库的镜像同步至 B 仓库中

root@k8s-master1:~# cat /data/script/harbor/replication-images.sh
#!/usr/bin/env bash
#
#****************************************************************************
# 作者:               甄天祥
# QQ:                 2099637909
# 日期:               2022-09-21
# 网址:               http://blog.tianxiang.love
# 描述:               同步本地Harbor仓库中没有的远端Harbor仓库中的镜像
#*****************************************************************************

set -eo pipefail

# === 默认配置 ===
DEFAULT_SOURCE_USER="admin"
DEFAULT_SOURCE_PASS="Harbor12345"
DEFAULT_SOURCE_ADDR="harbor.tianxiang.love:30443"
DEFAULT_LOCAL_USER="admin"
DEFAULT_LOCAL_PASS="Harbor12345"
DEFAULT_LOCAL_ADDR="harbor-m6.tianxiang.love"
DEFAULT_THREADS=5
DEFAULT_WATCH_INTERVAL=300  # 默认每 5 分钟轮询

# === 初始化变量 ===
SOURCE_HARBOR_USER="$DEFAULT_SOURCE_USER"
SOURCE_HARBOR_PASSWD="$DEFAULT_SOURCE_PASS"
SOURCE_HARBOR_ADDRESS="$DEFAULT_SOURCE_ADDR"
LOCAL_HARBOR_USER="$DEFAULT_LOCAL_USER"
LOCAL_HARBOR_PASSWD="$DEFAULT_LOCAL_PASS"
LOCAL_HARBOR_ADDRESS="$DEFAULT_LOCAL_ADDR"
THREADS=$DEFAULT_THREADS
WATCH_MODE=false
WATCH_INTERVAL=$DEFAULT_WATCH_INTERVAL
SYNC_ALL=true
SPECIFIC_PROJECTS=()
EXCLUDE_PROJECTS=()
DRY_RUN=false
LOG_LEVEL="info"
TIMESTAMP=$(date '+%Y-%m-%d-%H-%M')
LOG_FILE="harbor-sync-${TIMESTAMP}.log"
FAILED_FILE="failed-images.txt"

# === 颜色定义 ===
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
BLUE='\033[0;34m'
NC='\033[0m'

# === 日志函数 ===
log() {
    local level=$1
    local message=$2
    local timestamp=$(date '+%Y-%m-%d %H:%M:%S')

    case $level in
        "debug")
            [[ "$LOG_LEVEL" == "debug" ]] && echo -e "${BLUE}[DEBUG]${NC} ${timestamp} - ${message}" | tee -a "$LOG_FILE"
            ;;
        "info")
            echo -e "${GREEN}[INFO]${NC} ${timestamp} - ${message}" | tee -a "$LOG_FILE"
            ;;
        "warn")
            echo -e "${YELLOW}[WARN]${NC} ${timestamp} - ${message}" | tee -a "$LOG_FILE"
            ;;
        "error")
            echo -e "${RED}[ERROR]${NC} ${timestamp} - ${message}" | tee -a "$LOG_FILE"
            ;;
        *)
            echo -e "[${level}] ${timestamp} - ${message}" | tee -a "$LOG_FILE"
            ;;
    esac
}

# === 帮助信息 ===
usage() {
    echo "使用方法: $0 [选项]"
    echo
    echo "选项:"
    echo "  --source-user 用户       源Harbor用户名"
    echo "  --source-pass 密码       源Harbor密码"
    echo "  --source-addr 地址       源Harbor地址"
    echo "  --local-user 用户        本地Harbor用户名"
    echo "  --local-pass 密码        本地Harbor密码"
    echo "  --local-addr 地址        本地Harbor地址"
    echo "  --project 项目名         同步指定项目 (可多次使用)"
    echo "  --exclude 项目名         排除指定项目 (可多次使用)"
    echo "  --threads 数量           并发线程数 (默认: $DEFAULT_THREADS)"
    echo "  --dry-run                试运行,不实际同步镜像"
    echo "  --debug                  启用DEBUG日志"
    echo "  --watch                  启用守护模式"
    echo "  --watch-interval 秒数     守护模式下轮询间隔 (默认: $DEFAULT_WATCH_INTERVAL)"
    echo "  -h, --help               显示帮助信息"
    exit 0
}

# === 参数解析 ===
parse_args() {
    while [[ $# -gt 0 ]]; do
        case "$1" in
            --source-user) SOURCE_HARBOR_USER="$2"; shift 2;;
            --source-pass) SOURCE_HARBOR_PASSWD="$2"; shift 2;;
            --source-addr) SOURCE_HARBOR_ADDRESS="$2"; shift 2;;
            --local-user) LOCAL_HARBOR_USER="$2"; shift 2;;
            --local-pass) LOCAL_HARBOR_PASSWD="$2"; shift 2;;
            --local-addr) LOCAL_HARBOR_ADDRESS="$2"; shift 2;;
            --project) SYNC_ALL=false; SPECIFIC_PROJECTS+=("$2"); shift 2;;
            --exclude) EXCLUDE_PROJECTS+=("$2"); shift 2;;
            --threads) THREADS="$2"; shift 2;;
            --dry-run) DRY_RUN=true; shift;;
            --debug) LOG_LEVEL="debug"; shift;;
            --watch) WATCH_MODE=true; shift;;
            --watch-interval) WATCH_INTERVAL="$2"; shift 2;;
            -h|--help) usage;;
            *) log error "未知选项: $1"; usage; exit 1;;
        esac
    done
}

check_dependencies() {
    for cmd in curl docker jq; do
        command -v "$cmd" >/dev/null || { log error "缺少依赖: $cmd"; exit 1; }
    done
}

harbor_login() {
    docker login "$1" -u "$2" -p "$3" >/dev/null || { log error "登录 $1 失败"; exit 1; }
}

fetch_projects() {
    curl -s -k -u "$1:$2" -X GET "https://$3/api/v2.0/projects?page_size=100" | jq -r '.[].name'
}

fetch_image_tags() {
    curl -s -k -u "$1:$2" -X GET "https://$3/api/v2.0/projects/$4/repositories?page_size=100" |
        jq -r '.[].name' | while read -r repo; do
        curl -s -k -u "$1:$2" -X GET "https://$3/v2/$repo/tags/list" |
            jq -r '.tags[]?' | while read -r tag; do
                echo "$repo:$tag"
            done
    done
}

create_project() {
    curl -s -k -u "$LOCAL_HARBOR_USER:$LOCAL_HARBOR_PASSWD" -X POST "https://$LOCAL_HARBOR_ADDRESS/api/v2.0/projects" \
        -H "Content-Type: application/json" \
        -d '{"project_name": "'$1'", "metadata": {"public": "false"}}' >/dev/null 2>&1 || true
}

sync_image() {
    local image=$1
    for attempt in {1..3}; do
        if docker pull "$SOURCE_HARBOR_ADDRESS/$image" && \
           docker tag "$SOURCE_HARBOR_ADDRESS/$image" "$LOCAL_HARBOR_ADDRESS/$image" && \
           docker push "$LOCAL_HARBOR_ADDRESS/$image"; then
            docker rmi "$SOURCE_HARBOR_ADDRESS/$image" "$LOCAL_HARBOR_ADDRESS/$image" >/dev/null 2>&1
            log info "同步成功: $image"
            echo "$image" >> synced-images.txt
            return 0
        else
            log warn "第 $attempt 次尝试失败: $image"
            sleep 2
        fi
    done
    log error "同步失败: $image"
    echo "$image" >> "$FAILED_FILE"
}

sync_images_concurrent() {
    local image_list=("$@")
    local -i index=0 total=${#image_list[@]}

    while [[ $index -lt $total ]]; do
        for ((i=0; i<THREADS && index<total; i++, index++)); do
            sync_image "${image_list[$index]}" &
        done
        wait
    done
}

main() {
    start_time=$(date +%s)
    check_dependencies

    log info "登录 Harbor 仓库..."
    harbor_login "$SOURCE_HARBOR_ADDRESS" "$SOURCE_HARBOR_USER" "$SOURCE_HARBOR_PASSWD"
    harbor_login "$LOCAL_HARBOR_ADDRESS" "$LOCAL_HARBOR_USER" "$LOCAL_HARBOR_PASSWD"

    log info "获取源项目列表..."
    readarray -t ALL_PROJECTS < <(fetch_projects "$SOURCE_HARBOR_USER" "$SOURCE_HARBOR_PASSWD" "$SOURCE_HARBOR_ADDRESS")

    FILTERED_PROJECTS=()
    for proj in "${ALL_PROJECTS[@]}"; do
        [[ "$SYNC_ALL" == false && ! " ${SPECIFIC_PROJECTS[*]} " =~ " $proj " ]] && continue
        [[ " ${EXCLUDE_PROJECTS[*]} " =~ " $proj " ]] && continue
        FILTERED_PROJECTS+=("$proj")
    done

    > synced-images.txt
    > "$FAILED_FILE"

    TO_SYNC_IMAGES=()
    for proj in "${FILTERED_PROJECTS[@]}"; do
        log info "处理项目: $proj"
        create_project "$proj"

        readarray -t SRC_IMAGES < <(fetch_image_tags "$SOURCE_HARBOR_USER" "$SOURCE_HARBOR_PASSWD" "$SOURCE_HARBOR_ADDRESS" "$proj")
        readarray -t DST_IMAGES < <(fetch_image_tags "$LOCAL_HARBOR_USER" "$LOCAL_HARBOR_PASSWD" "$LOCAL_HARBOR_ADDRESS" "$proj")

        for image in "${SRC_IMAGES[@]}"; do
            if ! printf '%s\n' "${DST_IMAGES[@]}" | grep -qx "$image"; then
                TO_SYNC_IMAGES+=("$image")
            fi
        done
    done

    if [[ "$DRY_RUN" == true ]]; then
        log info "[Dry Run] 需要同步的镜像:"
        printf '%s\n' "${TO_SYNC_IMAGES[@]}"
        exit 0
    fi

    log info "共需同步 ${#TO_SYNC_IMAGES[@]} 个镜像,开始并发执行..."
    sync_images_concurrent "${TO_SYNC_IMAGES[@]}"

    end_time=$(date +%s)
    duration=$((end_time - start_time))

    log info "✅ 同步完成: 成功 ${#TO_SYNC_IMAGES[@]} 个镜像, 耗时 ${duration} 秒"

    if [[ -s "$FAILED_FILE" ]]; then
        log warn "❌ 失败镜像列表如下 (保存在 $FAILED_FILE):"
        cat "$FAILED_FILE"
    fi

    log info "������ 同步成功的镜像列表:"
    sort synced-images.txt || true
}

watch_mode() {
    log info "进入守护模式,每 ${WATCH_INTERVAL} 秒轮询同步..."
    while true; do
        main "${ORIGINAL_ARGS[@]}"
        log info "等待 ${WATCH_INTERVAL} 秒后继续同步..."
        sleep "$WATCH_INTERVAL"
    done
}

# 启动
ORIGINAL_ARGS=("$@")
parse_args "$@"

if [[ "$WATCH_MODE" == true ]]; then
    watch_mode
else
    main "$@"
fi

systemd 管理脚本程序并启动

root@k8s-master1:~# cat /etc/systemd/system/harbor-sync.service
[Unit]
Description=Harbor 镜像同步守护进程
After=network.target docker.service
Requires=docker.service

[Service]
Type=simple
ExecStart=/data/script/harbor/replication-images.sh --watch --watch-interval 300
WorkingDirectory=/data/script/harbor
StandardOutput=append:/var/log/harbor-sync.log
StandardError=append:/var/log/harbor-sync.err
Restart=always
RestartSec=10
User=root

[Install]
WantedBy=multi-user.target
root@k8s-master1:~# systemctl daemon-reload 
root@k8s-master1:~# systemctl enable harbor-sync --now
root@k8s-master1:~# systemctl status harbor-sync.service 
● harbor-sync.service - Harbor 镜像同步守护进程
     Loaded: loaded (/etc/systemd/system/harbor-sync.service; enabled; vendor preset: enabled)
     Active: active (running) since Thu 2025-08-21 06:53:35 CST; 8h ago
   Main PID: 4081799 (bash)
      Tasks: 2 (limit: 4557)
     Memory: 1.5M
        CPU: 13min 18.985s
     CGroup: /system.slice/harbor-sync.service
             ├─ 546574 sleep 300
             └─4081799 bash /data/script/harbor/replication-images.sh --watch --watch-interval 300

Aug 21 06:53:35 k8s-master1 systemd[1]: harbor-sync.service: Consumed 7h 14min 37.650s CPU time.
Aug 21 06:53:35 k8s-master1 systemd[1]: Started Harbor 镜像同步守护进程.

三、shell 常用脚本

1. 解压缩

压缩

$ tar -cf - ansibel-ubuntu-22.04-2025-08-14/ | pigz -9 -p $(nproc) > ansibel-ubuntu-22.04-2025-08-14.tar.gz
  • pigz -9:压缩级别。

  • -p $(nproc):使用所有可用的 CPU 核心(nproc 返回 CPU 核心数)。

解压缩

$ pigz -d -p $(nproc) < ansibel-ubuntu-22.04-2025-08-14.tar.gz | tar xf -

  • pigz -d:多线程解压 .gz 文件。

  • -p $(nproc):使用所有可用的 CPU 核心(nproc 返回 CPU 核心数)。

  • tar xf -:从标准输入读取解压后的数据并提取文件。

2. 计划任务

1.1 同步本地最新目录到远端

配合使用 grep 正则表达式以及 sort 排序 和 tail 取最后一个,来找到最新的目录,然后进行同步

$ ls -lh /data/k8s-app/mysql-cluster/mysql-backup/
total 32K
drwxr-xr-x 2 root root  139 Aug  6 21:33 20250806_213341
drwxr-xr-x 2 root root  136 Aug  6 21:42 20250806_214112
drwxr-xr-x 2 root root  139 Aug  7 10:00 20250807_100001
drwxr-xr-x 2 root root  139 Aug  8 10:00 20250808_100001
drwxr-xr-x 2 root root  139 Aug  9 10:00 20250821_100001
[root@k8s-master1 ~]# ls /data/k8s-app/mysql-cluster/mysql-backup/ | grep -E '^[0-9]{8}_[0-9]{6}$' | sort | tail -n 1
20250821_100001
# 上午 10 点同步 mysql-cluster 备份到 192.168.198.51 虚拟机
30 10 * * * rsync -avzP "/data/k8s-app/mysql-cluster/mysql-backup/$(ls /data/k8s-app/mysql-cluster/mysql-backup/ | grep -E '^[0-9]{8}_[0-9]{6}$' | sort | tail -n 1)" tianxiang@192.168.198.51:/data/k8s-app/mysql-backup/

1.2 执行脚本自动导入数据库

#40 10 * * * /data/k8s-app/mysql-backup/restore_mysql.sh >> /data/k8s-app/mysql-backup/mysql_restore.log 2>&1
40 10 * * * /data/k8s-app/mysql-backup/restore_mysql.sh
$ cat /data/k8s-app/mysql-backup/restore_mysql.sh

#!/bin/bash

# 配置部分
BACKUP_DIR="/data/k8s-app/mysql-backup"
NAMESPACE="middleware"
MYSQL_USER="root"
MYSQL_PASSWORD="123456"
DATABASE="halodb"
LOG_FILE="/var/log/mysql_restore.log"

# 日志函数 - 同时输出到终端和日志文件
log() {
    local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
    local message="[$timestamp] $1"
    echo "$message"                         # 输出到终端
    echo "$message" >> "$LOG_FILE"          # 追加到日志文件
}

# 错误处理函数
error_exit() {
    log "错误: $1"
    exit 1
}

# 1. 查找最新的备份目录
log "开始查找最新备份目录..."
latest_backup=$(ls "$BACKUP_DIR" | grep -E '^[0-9]{8}_[0-9]{6}$' | sort | tail -n 1)

if [ -z "$latest_backup" ]; then
    error_exit "未找到有效的备份目录"
fi

log "找到最新备份目录: $latest_backup"

# 2. 查找并解压 halodb 开头的 SQL 文件
backup_path="$BACKUP_DIR/$latest_backup"
log "在目录 $backup_path 中查找备份文件..."
gz_file=$(ls "$backup_path" | grep -E '^halodb_[0-9]{8}_[0-9]{6}\.sql\.gz$' | head -n 1)

if [ -z "$gz_file" ]; then
    error_exit "未找到 halodb 开头的 SQL 压缩文件"
fi

log "找到备份文件: $gz_file"
log "开始解压文件..."
gunzip -c "$backup_path/$gz_file" > "$backup_path/${gz_file%.gz}" || {
    error_exit "解压文件失败"
}

sql_file="${gz_file%.gz}"
log "已解压文件: $sql_file"

# 3. 获取集群主节点
log "正在查找集群主节点..."
primary_node=$(kubectl -n middleware exec -it mysql-cluster-0 -c mysql -- mysqlsh -uri root:123456@127.0.0.1 -- cluster status 2>/dev/null | grep -C 5 "PRIMARY" | grep "address" | awk -F\" '{print $4}' | awk -F. '{print $1}')

if [ -z "$primary_node" ]; then
    error_exit "无法确定集群主节点"
fi

pod_name="${primary_node%.*}"  # 去除域名部分,获取 pod 名称
log "找到主节点: $pod_name"

# 4. 复制并导入 SQL 文件
log "正在复制 SQL 文件到容器..."
kubectl -n "$NAMESPACE" cp "$backup_path/$sql_file" "$pod_name:/var/lib/mysql/$sql_file" -c mysql || {
    error_exit "复制文件失败"
}

log "正在导入数据库..."
start_time=$(date +%s)
kubectl -n "$NAMESPACE" exec -it "$pod_name" -c mysql -- \
  sh -c "mysql -u$MYSQL_USER -p$MYSQL_PASSWORD $DATABASE < /var/lib/mysql/$sql_file" && {
    end_time=$(date +%s)
    duration=$((end_time - start_time))
    log "数据库导入成功,耗时: ${duration}秒"
} || {
    error_exit "数据库导入失败"
}

# 清理临时文件
log "清理临时文件..."
rm -f "$backup_path/$sql_file" || {
    log "警告: 清理临时文件失败,但操作已完成"
}

log "操作完成"

3. 脚本循环探活网站

#!/usr/bin/env bash
# url_monitor.sh  —— 多 URL 5 秒级健康监控脚本(增强版)
# Usage:
#   ./url_monitor.sh [--ping|-p] [--interval|-i N] [--timeout|-t N] [--ping-count N] [--ping-timeout N] [--file|-f urls.txt]
# Example:
#   ./url_monitor.sh --ping --interval 5 --timeout 5 --ping-count 2 --ping-timeout 1
# ----------------------------------------------------------

set -o errexit
set -o nounset
set -o pipefail

# ---------------- defaults ----------------
INTERVAL=5            # 主循环间隔(秒)
CURL_TIMEOUT=5        # curl 单次超时(秒)
ENABLE_PING=0         # 是否启用 ping 探测(0/1)
PING_COUNT=1          # ping 次数
PING_TIMEOUT=1        # ping 每次等待超时(秒)
LOG_DIR="/var/log/url_monitor"
URLS_FILE=""
# 内置 URL 列表(如果要用文件加载,可用 -f/--file)
URLS=(
    "https://blog.tianxiang.love"
)

# ---------------- helpers ----------------
current_log_file() {
    echo "${LOG_DIR}/$(date +%F).log"
}

mkdir -p "$LOG_DIR"

show_help() {
    cat <<EOF
Usage: $0 [options]
Options:
  -p, --ping              启用 ping 探测(会从 URL 提取 host 并 ping)
  -i, --interval N        监控间隔(秒),默认 ${INTERVAL}
  -t, --timeout N         curl 超时(秒),默认 ${CURL_TIMEOUT}
      --ping-count N      ping 次数,默认 ${PING_COUNT}
      --ping-timeout N    ping 每次等待超时(秒),默认 ${PING_TIMEOUT}
  -f, --file FILE         从文件加载 URL(逐行,# 注释会被忽略)
  -h, --help              显示此帮助并退出
Example:
  $0 --ping -i 5 -t 3 --ping-count 2 --ping-timeout 1
EOF
}

# 颜色仅用于 console 输出,写入文件为纯文本
log() {
    local level="$1"; shift
    local msg="$*"
    local ts
    ts="$(date '+%F %T')"
    local plain="[$ts] [$level] $msg"
    # color
    local color_reset="\033[0m"
    local color=""
    case "$level" in
        INFO) color="\033[32m" ;;   # green
        WARN) color="\033[33m" ;;   # yellow
        ERROR) color="\033[31m" ;;  # red
        *) color="" ;;
    esac
    # console (colored)
    if [[ -t 1 ]]; then
        echo -e "[$ts] [${color}${level}${color_reset}] $msg"
    else
        echo "[$ts] [$level] $msg"
    fi
    # append plain to daily log file
    echo "$plain" >> "$(current_log_file)"
}

INFO()  { log "INFO"  "$@"; }
WARN()  { log "WARN"  "$@"; }
ERROR() { log "ERROR" "$@"; }

# 从 URL 提取 host(支持带端口、带用户名、IPv6 [::1])
extract_host() {
    local url="$1"
    # remove scheme
    local hp="${url#*://}"
    # remove path
    hp="${hp%%/*}"
    # remove userinfo if present
    hp="${hp#*@}"
    # if IPv6 like [::1]:8080
    if [[ "$hp" =~ ^\[(.*)\](:[0-9]+)?$ ]]; then
        echo "${BASH_REMATCH[1]}"
        return
    fi
    # strip port if any
    echo "${hp%%:*}"
}

# ping 探测
ping_probe() {
    local host="$1"
    if ! command -v ping >/dev/null 2>&1; then
        WARN "ping 命令不可用,跳过 ping: $host"
        return 2
    fi
    # 尝试使用一次更短的超时
    if ping -c "$PING_COUNT" -W "$PING_TIMEOUT" "$host" > /dev/null 2>&1; then
        INFO "ping 成功: $host (count=${PING_COUNT} timeout=${PING_TIMEOUT}s)"
        return 0
    else
        ERROR "ping 失败: $host (count=${PING_COUNT} timeout=${PING_TIMEOUT}s)"
        return 1
    fi
}

# curl 探测(返回码 + 耗时)
probe() {
    local url="$1"
    local tmp_err
    tmp_err="$(mktemp)"

    # curl -w 输出: http_code|time_total
    local output
    output=$(curl -sSL -o /dev/null --max-time "$CURL_TIMEOUT" -w "%{http_code}|%{time_total}" "$url" 2>"$tmp_err") || true
    local exit_code=$?
    local err_msg
    err_msg="$(<"$tmp_err")"
    rm -f "$tmp_err"

    if [[ $exit_code -ne 0 ]]; then
        # 更详细的错误映射
        case $exit_code in
            3)  ERROR "$url  URL 格式错误 (curl error 3). $err_msg" ;;
            6)  ERROR "$url  DNS 解析失败 (curl error 6). $err_msg" ;;
            7)  ERROR "$url  连接失败/不可达 (curl error 7). $err_msg" ;;
            28) ERROR "$url  请求超时 (curl error 28). $err_msg" ;;
            35) ERROR "$url  SSL 握手失败 (curl error 35). $err_msg" ;;
            51|60) ERROR "$url  SSL 证书验证失败 (curl error $exit_code). $err_msg" ;;
            52) WARN  "$url  空响应 (curl error 52). $err_msg" ;;
            56) ERROR "$url  接收失败/连接中断 (curl error 56). $err_msg" ;;
            *)  ERROR "$url  curl 错误码=$exit_code. ${err_msg:-'(no additional info)'}" ;;
        esac
        return 1
    fi

    # 正常返回,解析 http_code 和 time_total
    IFS='|' read -r http_code time_total <<< "$output"
    http_code="${http_code:-000}"
    time_total="${time_total:-0}"

    # 将秒转毫秒(整数)
    local time_ms
    time_ms=$(awk "BEGIN{printf \"%.0f\", ($time_total)*1000}")

    # http 状态分类
    if [[ "$http_code" =~ ^2[0-9][0-9]$ ]]; then
        INFO "$url  正常  code=$http_code  time=${time_ms}ms"
        return 0
    elif [[ "$http_code" =~ ^3[0-9][0-9]$ ]]; then
        WARN "$url  重定向  code=$http_code  time=${time_ms}ms"
        return 2
    elif [[ "$http_code" =~ ^4[0-9][0-9]$ ]]; then
        ERROR "$url  客户端错误  code=$http_code  time=${time_ms}ms"
        return 1
    elif [[ "$http_code" =~ ^5[0-9][0-9]$ ]]; then
        ERROR "$url  服务器错误  code=$http_code  time=${time_ms}ms"
        return 1
    else
        WARN "$url  非标准响应  code=$http_code  time=${time_ms}ms"
        return 2
    fi
}

# ---------------- 参数解析 ----------------
if [[ $# -gt 0 ]]; then
    while [[ $# -gt 0 ]]; do
        case "$1" in
            -p|--ping) ENABLE_PING=1; shift ;;
            -i|--interval) INTERVAL="$2"; shift 2 ;;
            -t|--timeout) CURL_TIMEOUT="$2"; shift 2 ;;
            --ping-count) PING_COUNT="$2"; shift 2 ;;
            --ping-timeout) PING_TIMEOUT="$2"; shift 2 ;;
            -f|--file) URLS_FILE="$2"; shift 2 ;;
            -h|--help) show_help; exit 0 ;;
            *) echo "未知参数: $1"; show_help; exit 1 ;;
        esac
    done
fi

# 若指定了文件,则覆盖内置 URL 列表
if [[ -n "${URLS_FILE:-}" ]]; then
    if [[ ! -f "$URLS_FILE" ]]; then
        echo "指定的 URL 文件不存在: $URLS_FILE" >&2
        exit 2
    fi
    URLS=()
    while IFS= read -r line || [[ -n "$line" ]]; do
        line="${line%%#*}"        # 去掉注释部分
        line="${line//[[:space:]]/}" # 去空白
        if [[ -n "$line" ]]; then
            URLS+=("$line")
        fi
    done < "$URLS_FILE"
fi

if [[ ${#URLS[@]} -eq 0 ]]; then
    echo "没有可用的 URL,退出。" >&2
    exit 1
fi

trap 'INFO "收到中断信号,退出"; exit 0' SIGINT SIGTERM

INFO "===== 开始监控(共 ${#URLS[@]} 个 URL) interval=${INTERVAL}s curl_timeout=${CURL_TIMEOUT}s ping=${ENABLE_PING} ====="

# 主循环
while true; do
    for u in "${URLS[@]}"; do
        probe "$u" &
        pids_probe+=($!)
        # 如果启用了 ping,则异步 ping(并且只对 host 进行 ping,不对 URL 本身)
        if [[ "$ENABLE_PING" -eq 1 ]]; then
            host="$(extract_host "$u")"
            if [[ -n "$host" ]]; then
                ping_probe "$host" &
                pids_ping+=($!)
            else
                WARN "无法从 URL 提取 host,跳过 ping: $u"
            fi
        fi
    done

    # 等待本轮所有后台任务完成(防止下一轮和上一轮冲突)
    for pid in "${pids_probe[@]:-}"; do wait "$pid" || true; done
    unset pids_probe
    for pid in "${pids_ping[@]:-}"; do wait "$pid" || true; done
    unset pids_ping

    sleep "$INTERVAL"
done

systemd 管理并启动

[root@k8s-app-1 script]# cat /etc/systemd/system/url_monitor.service
[Unit]
Description=URL & Ping Health Monitor Service
After=network-online.target
Wants=network-online.target

[Service]
Type=simple
ExecStart=/usr/local/bin/url_monitor.sh --ping --interval 5 --timeout 5 --ping-count 2 --ping-timeout 1
Restart=always
RestartSec=5
# 建议指定运行用户(日志目录要可写)
User=root
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target
[root@k8s-app-1 script]# systemctl status url_monitor.service 
● url_monitor.service - URL & Ping Health Monitor Service
   Loaded: loaded (/etc/systemd/system/url_monitor.service; enabled; vendor preset: disabled)
   Active: active (running) since Mon 2025-08-11 16:49:25 CST; 1 weeks 2 days ago
 Main PID: 43222 (bash)
    Tasks: 2
   Memory: 55.9M
   CGroup: /system.slice/url_monitor.service
           ├─43222 bash /usr/local/bin/url_monitor.sh --ping --interval 5 --timeout 5 --ping-count 2 --ping-timeout 1
           └─63349 sleep 5

Aug 21 15:18:47 k8s-app-1 url_monitor.sh[43222]: [2025-08-21 15:18:47] [INFO] ping 成功: blog.tianxiang.love (count=2 timeout=1s)
Aug 21 15:18:53 k8s-app-1 url_monitor.sh[43222]: [2025-08-21 15:18:53] [INFO] https://blog.tianxiang.love  正常  code=200  time=22ms
Aug 21 15:18:53 k8s-app-1 url_monitor.sh[43222]: [2025-08-21 15:18:53] [INFO] ping 成功: blog.tianxiang.love (count=2 timeout=1s)
Aug 21 15:18:59 k8s-app-1 url_monitor.sh[43222]: [2025-08-21 15:18:59] [INFO] https://blog.tianxiang.love  正常  code=200  time=22ms
Aug 21 15:18:59 k8s-app-1 url_monitor.sh[43222]: [2025-08-21 15:18:59] [INFO] ping 成功: blog.tianxiang.love (count=2 timeout=1s)

4. 查询汇总服务器的出入口带宽流量

[root@k8s-app-1 check_bandwidth_monitor]# cat bandwidth_monitor.py
import paramiko
import time
import csv
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict

GATEWAY_IP = "192.168.233.1"  # 通过网关地址获取到被监控网卡
IP_LIST_FILE = "edge-ip_list.txt"
OUTPUT_FILE = "bandwidth_report.csv"
INTERVAL = 6 # 采样间隔
SAMPLES = 60  # 采样次数(监控时长 = INTERVAL * SAMPLES)
MAX_WORKERS = 97 # 并发数,可根据总主机数量来定义


def read_ip_list(file_path: str) -> List[str]:
    with open(file_path, "r") as f:
        return [line.strip() for line in f if line.strip()]


def ssh_exec(ssh: paramiko.SSHClient, cmd: str) -> str:
    stdin, stdout, stderr = ssh.exec_command(cmd)
    return stdout.read().decode().strip()


def get_iface_by_route(ssh: paramiko.SSHClient, target_ip: str) -> str:
    output = ssh_exec(ssh, f"ip route get {target_ip}")
    print(f"[路由输出] {output}")
    parts = output.split()
    if "dev" in parts:
        idx = parts.index("dev")
        return parts[idx + 1]
    raise Exception(f"无法识别网卡 from: {output}")


def get_iface_bytes(ssh: paramiko.SSHClient, iface: str) -> Dict[str, int]:
    line = ssh_exec(ssh, f"cat /proc/net/dev | grep '{iface}:'")
    parts = line.strip().split()
    return {
        "rx_bytes": int(parts[1]),
        "tx_bytes": int(parts[9])
    }


def monitor_host(ip: str) -> Dict:
    result = {"ip": ip, "avg_rx": 0, "avg_tx": 0}
    try:
        print(f"开始监控 {ip} ...")
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(ip, username="root", timeout=5)

        iface = get_iface_by_route(ssh, GATEWAY_IP)
        print(f"[{ip}] 出口网卡: {iface}")

        rx_total = 0
        tx_total = 0

        for i in range(SAMPLES):
            d1 = get_iface_bytes(ssh, iface)
            time.sleep(INTERVAL)
            d2 = get_iface_bytes(ssh, iface)
            rx_diff = (d2["rx_bytes"] - d1["rx_bytes"]) / INTERVAL
            tx_diff = (d2["tx_bytes"] - d1["tx_bytes"]) / INTERVAL
            rx_total += rx_diff
            tx_total += tx_diff
            print(f"[{ip}] RX: {rx_diff/1024:.2f} KB/s, TX: {tx_diff/1024:.2f} KB/s")

        result["avg_rx"] = rx_total / SAMPLES / 1024 / 1024  # MB/s
        result["avg_tx"] = tx_total / SAMPLES / 1024 / 1024  # MB/s

        ssh.close()
    except Exception as e:
        result["error"] = str(e)
    return result


def main():
    ip_list = read_ip_list(IP_LIST_FILE)
    all_results = []

    print(f"=== 参数信息 ===\n监控时长: {INTERVAL * SAMPLES} 秒\n采样间隔: {INTERVAL} 秒\n采样次数: {SAMPLES} 次\n并发数: {MAX_WORKERS}\n")

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = [executor.submit(monitor_host, ip) for ip in ip_list]
        for future in futures:
            all_results.append(future.result())

    print("\n=== 各机器平均带宽统计 (单位:MB/s) ===")
    total_rx = total_tx = 0
    with open(OUTPUT_FILE, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["IP", "RX_MB/s", "TX_MB/s"])
        for res in all_results:
            ip = res['ip']
            rx = res.get("avg_rx", 0)
            tx = res.get("avg_tx", 0)
            print(f"{ip}: RX={rx:.3f}, TX={tx:.3f}")
            total_rx += rx
            total_tx += tx
            writer.writerow([ip, f"{rx:.3f}", f"{tx:.3f}"])

    print(f"\n=== 总体平均带宽 ===")
    print(f"总接收 RX: {total_rx:.3f} MB/s")
    print(f"总发送 TX: {total_tx:.3f} MB/s")
    print(f"结果已保存为 {OUTPUT_FILE}")


if __name__ == "__main__":
    main()
# 使用此脚本的前提是,执行脚本的机器需要可以免密登陆客户端机器
[root@k8s-app-1 check_bandwidth_monitor]# python3 bandwidth_monitor.py

5. 脚本参数化自定义同步本地目录到远端目录

  1. 完全参数化:所有配置都通过命令行参数传递

  2. 灵活的主题支持:通过 -t/--topic 参数指定备份任务名称

  3. 可配置的源和目标:可以备份任意目录到任意远程位置

  4. 模块化选项:

    • --no-delete:不删除目标端多余文件

    • --no-compress:不压缩传输

    • --dry-run:预览模式

  5. 更好的错误处理:详细的参数验证和错误提示

  6. 安全性:支持从环境变量读取密码

  7. 向后兼容:包装脚本保持原有使用习惯

#!/bin/bash
# backup_script.sh - 参数化备份脚本(适合计划任务)
# 功能:通过参数灵活备份任意目录到远程服务器

# 默认配置(可通过参数覆盖)
DEFAULT_BACKUP_DIR="/var/log/backup-xiaohezi_logs"
DEFAULT_REMOTE_USER="tianxiang"
DEFAULT_REMOTE_HOST="10.100.255.6"
DEFAULT_SSH_PORT="22"

# SSH密码(在此处设置您的密码)
SSH_PASSWORD="TianXiang."

# 显示使用说明
show_usage() {
    cat << EOF
用法: $0 [选项] --source SOURCE_PATH --remote REMOTE_PATH

必需参数:
  -s, --source SOURCE_PATH     备份源目录路径
  -r, --remote REMOTE_PATH     远程目标目录路径

可选参数:
  -t, --topic TOPIC_NAME       备份主题/任务名称(用于日志标识)
  -u, --user REMOTE_USER       远程服务器用户名(默认: $DEFAULT_REMOTE_USER)
  -h, --host REMOTE_HOST       远程服务器地址(默认: $DEFAULT_REMOTE_HOST)
  -p, --port SSH_PORT          SSH端口(默认: $DEFAULT_SSH_PORT)
  -d, --backup-dir DIR         本地日志目录(默认: $DEFAULT_BACKUP_DIR)
  -l, --log-file FILE          指定日志文件名(默认自动生成)
  --no-delete                  不删除目标端多余文件
  --no-compress                不压缩传输
  --dry-run                    Dry run模式,只显示将要执行的操作
  --help                       显示此帮助信息

示例:
  $0 -t "MySQL备份" -s /data/mysql -r /backup/mysql
  $0 -t "网站数据" -s /var/www -r /backup/website -u myuser -h 192.168.1.100
  $0 -t "应用配置" -s /etc/app -r /backup/config --no-delete --dry-run

注意: SSH密码已在脚本中硬编码,请确保脚本文件权限安全。

EOF
}

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}

# 显示rsync功能说明
show_rsync_info() {
    log "================================================================"
    log "                    rsync 参数功能说明"
    log "================================================================"
    log "-a (archive) - 归档模式,相当于 -rlptgoD"
    log "-v (verbose) - 详细输出"
    if [ "$USE_COMPRESS" = true ]; then
        log "-z (compress) - 压缩传输"
    fi
    log "-P (progress + partial) - 显示进度 + 断点续传"
    if [ "$USE_DELETE" = true ]; then
        log "--delete - 删除目标多余文件"
    fi
    log "================================================================"
}

# 错误处理函数
error_exit() {
    log "错误: $1"
    exit 1
}

# 参数解析
parse_arguments() {
    SOURCE_PATH=""
    REMOTE_PATH=""
    TOPIC="未命名任务"
    REMOTE_USER="$DEFAULT_REMOTE_USER"
    REMOTE_HOST="$DEFAULT_REMOTE_HOST"
    SSH_PORT="$DEFAULT_SSH_PORT"
    BACKUP_DIR="$DEFAULT_BACKUP_DIR"
    LOG_FILE=""
    USE_DELETE=true
    USE_COMPRESS=true
    DRY_RUN=false
    
    while [[ $# -gt 0 ]]; do
        case $1 in
            -s|--source)
                SOURCE_PATH="$2"
                shift 2
                ;;
            -r|--remote)
                REMOTE_PATH="$2"
                shift 2
                ;;
            -t|--topic)
                TOPIC="$2"
                shift 2
                ;;
            -u|--user)
                REMOTE_USER="$2"
                shift 2
                ;;
            -h|--host)
                REMOTE_HOST="$2"
                shift 2
                ;;
            -p|--port)
                SSH_PORT="$2"
                shift 2
                ;;
            -d|--backup-dir)
                BACKUP_DIR="$2"
                shift 2
                ;;
            -l|--log-file)
                LOG_FILE="$2"
                shift 2
                ;;
            --no-delete)
                USE_DELETE=false
                shift
                ;;
            --no-compress)
                USE_COMPRESS=false
                shift
                ;;
            --dry-run)
                DRY_RUN=true
                shift
                ;;
            --help)
                show_usage
                exit 0
                ;;
            *)
                error_exit "未知参数: $1"
                ;;
        esac
    done
    
    # 验证必需参数
    if [ -z "$SOURCE_PATH" ] || [ -z "$REMOTE_PATH" ]; then
        error_exit "必须提供源目录路径(--source)和远程目录路径(--remote)"
    fi
    
    # 验证密码是否设置
    if [ -z "$SSH_PASSWORD" ]; then
        error_exit "SSH密码未设置,请在脚本中设置SSH_PASSWORD变量"
    fi
    
    # 设置日志文件
    if [ -z "$LOG_FILE" ]; then
        mkdir -p "$BACKUP_DIR"
        LOG_FILE="${BACKUP_DIR}/backup_${TOPIC}_$(date +%Y%m%d_%H%M%S).log"
    else
        mkdir -p "$(dirname "$LOG_FILE")"
    fi
}

# 检查依赖
check_dependencies() {
    log "检查必要依赖..."
    
    # 检查源目录
    if [ ! -d "$SOURCE_PATH" ]; then
        error_exit "备份源目录不存在: $SOURCE_PATH"
    fi
    
    # 检查sshpass是否安装
    if ! command -v sshpass &> /dev/null; then
        error_exit "sshpass 未安装,请先安装: yum install sshpass 或 apt-get install sshpass"
    fi
    
    # 检查rsync是否安装
    if ! command -v rsync &> /dev/null; then
        error_exit "rsync 未安装,请先安装: yum install rsync 或 apt-get install rsync"
    fi
    
    log "依赖检查完成"
}

# 构建rsync命令
build_rsync_command() {
    local rsync_cmd="sshpass -p '$SSH_PASSWORD' rsync -av"
    
    # 添加压缩选项
    if [ "$USE_COMPRESS" = true ]; then
        rsync_cmd="$rsync_cmd -z"
    fi
    
    # 添加进度选项
    rsync_cmd="$rsync_cmd -P"
    
    # 添加删除选项
    if [ "$USE_DELETE" = true ]; then
        rsync_cmd="$rsync_cmd --delete"
    fi
    
    # 添加SSH选项
    rsync_cmd="$rsync_cmd -e 'ssh -p $SSH_PORT -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null'"
    
    # 添加源和目标路径
    rsync_cmd="$rsync_cmd '$SOURCE_PATH/' '$REMOTE_USER@$REMOTE_HOST:$REMOTE_PATH'"
    
    echo "$rsync_cmd"
}

# 执行备份
perform_backup() {
    log "开始备份: $TOPIC"
    log "源目录: $SOURCE_PATH"
    log "目标地址: $REMOTE_USER@$REMOTE_HOST:$REMOTE_PATH"
    
    # 构建rsync命令
    local rsync_cmd=$(build_rsync_command)
    
    # 显示rsync信息
    show_rsync_info
    
    # Dry run模式
    if [ "$DRY_RUN" = true ]; then
        log "DRY RUN模式 - 将要执行的命令:"
        log "$rsync_cmd"
        log "Dry run完成,未实际执行备份"
        return 0
    fi
    
    # 创建远程目录
    log "创建远程目录..."
    local mkdir_cmd="sshpass -p '$SSH_PASSWORD' ssh -p $SSH_PORT -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $REMOTE_USER@$REMOTE_HOST 'mkdir -p $REMOTE_PATH'"
    eval "$mkdir_cmd" >> "$LOG_FILE" 2>&1
    
    if [ $? -ne 0 ]; then
        log "警告: 远程目录创建可能失败,但继续执行备份..."
    fi
    
    # 执行备份
    log "执行rsync备份..."
    log "命令: $rsync_cmd"
    
    eval "$rsync_cmd" >> "$LOG_FILE" 2>&1
    
    if [ $? -eq 0 ]; then
        log "备份完成: $TOPIC"
        return 0
    else
        error_exit "备份失败: $TOPIC"
    fi
}

# 清理旧日志(保留最近30天)
cleanup_old_logs() {
    log "清理旧日志文件..."
    find "$BACKUP_DIR" -name "backup_*.log" -mtime +30 -delete >> "$LOG_FILE" 2>&1
    log "日志清理完成"
}

# 主函数
main() {
    # 解析参数
    parse_arguments "$@"
    
    # 创建日志目录
    mkdir -p "$(dirname "$LOG_FILE")"
    
    log "=== 备份任务开始: $TOPIC ==="
    log "日志文件: $LOG_FILE"
    
    # 检查依赖
    check_dependencies
    
    # 执行备份
    perform_backup
    
    # 清理旧日志
    cleanup_old_logs
    
    log "=== 备份任务完成: $TOPIC ==="
    log "详细日志请查看: $LOG_FILE"
}

# 执行主函数
main "$@"

测试执行

[root@k8s-master1 data]# ./backup_script.sh -t "测试备份" -s /data/Dockerfile -r /home/tianxiang/k8s-app/gitlab/gitlab-backup -u tianxiang -h 10.100.255.6
[2025-10-29 11:24:10] === 备份任务开始: 测试备份 ===
[2025-10-29 11:24:10] 日志文件: /var/log/backup-xiaohezi_logs/backup_测试备份_20251029_112410.log
[2025-10-29 11:24:10] 检查必要依赖...
[2025-10-29 11:24:10] 依赖检查完成
[2025-10-29 11:24:10] 开始备份: 测试备份
[2025-10-29 11:24:10] 源目录: /data/Dockerfile
[2025-10-29 11:24:10] 目标地址: tianxiang@10.100.255.6:/home/tianxiang/k8s-app/gitlab/gitlab-backup
[2025-10-29 11:24:10] ================================================================
[2025-10-29 11:24:10]                     rsync 参数功能说明
[2025-10-29 11:24:10] ================================================================
[2025-10-29 11:24:10] -a (archive) - 归档模式,相当于 -rlptgoD
[2025-10-29 11:24:10] -v (verbose) - 详细输出
[2025-10-29 11:24:10] -z (compress) - 压缩传输
[2025-10-29 11:24:10] -P (progress + partial) - 显示进度 + 断点续传
[2025-10-29 11:24:10] --delete - 删除目标多余文件
[2025-10-29 11:24:10] ================================================================
[2025-10-29 11:24:10] 创建远程目录...
[2025-10-29 11:24:10] 执行rsync备份...
[2025-10-29 11:24:10] 命令: sshpass -p 'TianXiang.' rsync -av -z -P --delete -e 'ssh -p 22 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' '/data/Dockerfile/' 'tianxiang@10.100.255.6:/home/tianxiang/k8s-app/gitlab/gitlab-backup'
[2025-10-29 11:24:11] 备份完成: 测试备份
[2025-10-29 11:24:11] 清理旧日志文件...
[2025-10-29 11:24:11] 日志清理完成
[2025-10-29 11:24:11] === 备份任务完成: 测试备份 ===
[2025-10-29 11:24:11] 详细日志请查看: /var/log/backup-xiaohezi_logs/backup_测试备份_20251029_112410.log

四、MySQL 常用语法

1. 相关查询

1.1 查询数据库存储使用的大小

SELECT 
    table_schema AS 'Database',
    ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) AS 'Size (MB)'
FROM 
    information_schema.tables
GROUP BY 
    table_schema;

执行输出结果如下:


mysql> SELECT 
    ->     table_schema AS 'Database',
    ->     ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) AS 'Size (MB)'
    -> FROM 
    ->     information_schema.tables
    -> GROUP BY 
    ->     table_schema;
+--------------------+-----------+
| Database           | Size (MB) |
+--------------------+-----------+
| cmdb               |     13.81 |
| indc_alarm         |      0.69 |
| information_schema |      0.00 |
| iot                |      2.02 |
| mysql              |      2.63 |
| performance_schema |      0.00 |
| sys                |      0.02 |
| topv2              |     31.64 |
| zh-iam             |      0.70 |
+--------------------+-----------+
9 rows in set (0.02 sec)

1.2 查询单个数据库的存储使用大小

SELECT 
    table_schema AS 'Database',
    ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) AS 'Size (MB)'
FROM 
    information_schema.tables
WHERE 
    table_schema = 'cmdb';

执行结果如下

mysql> SELECT 
    ->     table_schema AS 'Database',
    ->     ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) AS 'Size (MB)'
    -> FROM 
    ->     information_schema.tables
    -> WHERE 
    ->     table_schema = 'cmdb';
+----------+-----------+
| Database | Size (MB) |
+----------+-----------+
| cmdb     |     13.81 |
+----------+-----------+
1 row in set (0.01 sec)

五、关于 SSH 操作之类的脚本

1. 测试远程登陆

[root@k8s-app-1 script]# cat try_SSH_remote_login.py 
#!/usr/bin/env python3
import paramiko
import sys
import argparse
from pathlib import Path

def read_ip_list(file_path):
    """读取IP列表文件,支持多种格式"""
    ip_info = []
    try:
        with open(file_path, 'r') as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()
                if not line or line.startswith('#'):
                    continue
                
                parts = line.split()
                if len(parts) >= 3:
                    ip_info.append({
                        'ip': parts[0],
                        'port': int(parts[1]),
                        'password': parts[2],
                        'username': parts[3] if len(parts) > 3 else 'root'
                    })
                else:
                    print(f"警告: 第 {line_num} 行格式错误,已跳过: {line}")
    except FileNotFoundError:
        print(f"错误: 文件 '{file_path}' 不存在")
        sys.exit(1)
    except Exception as e:
        print(f"错误: 读取文件时发生错误 - {str(e)}")
        sys.exit(1)
    
    return ip_info

def test_ssh_login(ip, port, password, username, timeout=10):
    """测试SSH登录,返回是否成功"""
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    try:
        ssh.connect(ip, port=port, username=username, password=password, timeout=timeout)
        print(f"[SUCCESS] ✅ {ip}:{port} 登录成功")
        return True
    except paramiko.AuthenticationException:
        print(f"[FAILED] ❌ {ip}:{port} 认证失败(密码错误或用户名错误)")
    except paramiko.SSHException as e:
        print(f"[FAILED] ❌ {ip}:{port} SSH连接错误 - {str(e)}")
    except socket.timeout:
        print(f"[FAILED] ❌ {ip}:{port} 连接超时")
    except Exception as e:
        print(f"[FAILED] ❌ {ip}:{port} 连接失败 - {str(e)}")
    finally:
        ssh.close()
    return False

def main():
    parser = argparse.ArgumentParser(
        description="SSH远程登录测试工具",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
使用示例:
  %(prog)s -f ip-list.txt
  %(prog)s -f ip-list.txt -t 5
  %(prog)s -f ip-list.txt -o result.txt
  %(prog)s --single 192.168.1.1 22 password123

文件格式说明:
  IP地址 端口 密码 [用户名]
  192.168.1.1 22 password123 root
  10.0.0.1 2222 mypassword
  # 注释行以#开头
        """
    )
    
    parser.add_argument('-f', '--file', default='ip-list.txt', 
                       help='IP列表文件路径 (默认: ip-list.txt)')
    parser.add_argument('-t', '--timeout', type=int, default=10,
                       help='连接超时时间(秒) (默认: 10)')
    parser.add_argument('-o', '--output', 
                       help='将结果输出到文件')
    parser.add_argument('--single', nargs=3, metavar=('IP', 'PORT', 'PASSWORD'),
                       help='测试单个主机: IP 端口 密码')
    parser.add_argument('--username', default='root',
                       help='单个主机测试时的用户名 (默认: root)')
    
    args = parser.parse_args()
    
    # 测试单个主机
    if args.single:
        ip, port, password = args.single
        print(f"🚀 测试单个主机 {ip}:{port}...\n")
        success = test_ssh_login(ip, int(port), password, args.username, args.timeout)
        
        print("\n📊 ========== 测试结果 ==========")
        print(f"结果: {'✅ 成功' if success else '❌ 失败'}")
        sys.exit(0 if success else 1)
    
    # 从文件测试多个主机
    if not Path(args.file).exists():
        print(f"错误: 文件 '{args.file}' 不存在")
        print("请使用 -f 参数指定正确的文件路径,或使用 --single 测试单个主机")
        sys.exit(1)
    
    ip_info_list = read_ip_list(args.file)
    if not ip_info_list:
        print("错误: IP列表为空或格式错误!")
        print("文件格式应为: IP地址 端口 密码 [用户名]")
        sys.exit(1)
    
    success_list = []
    failed_list = []
    
    print(f"\n🚀 开始测试SSH登录 (超时: {args.timeout}秒)...\n")
    
    for info in ip_info_list:
        ip, port, password, username = info['ip'], info['port'], info['password'], info['username']
        print(f"测试 {username}@{ip}:{port}...")
        if test_ssh_login(ip, port, password, username, args.timeout):
            success_list.append(f"{username}@{ip}:{port}")
        else:
            failed_list.append(f"{username}@{ip}:{port}")
    
    # 输出结果
    result_output = []
    result_output.append("\n📊 ========== 测试结果汇总 ==========")
    result_output.append(f"✅ 成功登录的主机 ({len(success_list)}台):")
    for host in success_list:
        result_output.append(f"  - {host}")
    
    result_output.append(f"\n❌ 登录失败的主机 ({len(failed_list)}台):")
    for host in failed_list:
        result_output.append(f"  - {host}")
    
    result_output.append(f"\n成功率: {len(success_list)}/{len(ip_info_list)} "
                       f"({len(success_list)/len(ip_info_list)*100:.1f}%)")
    result_output.append("测试完成!")
    
    # 输出到终端和文件
    result_text = "\n".join(result_output)
    print(result_text)
    
    if args.output:
        try:
            with open(args.output, 'w') as f:
                f.write(result_text + "\n")
            print(f"\n结果已保存到: {args.output}")
        except Exception as e:
            print(f"警告: 无法保存结果到文件 - {str(e)}")

if __name__ == "__main__":
    import socket  # 添加socket导入用于超时异常处理
    main()
# 查看帮助
python3 try_SSH_remote_login.py --help

# 使用默认文件测试
python3 try_SSH_remote_login.py

# 指定文件测试
python3 try_SSH_remote_login.py -f my_ips.txt

# 测试单个主机
python3 try_SSH_remote_login.py --single 192.168.1.1 22 password123

# 带超时设置和输出文件
python3 try_SSH_remote_login.py -f ip-list.txt -t 5 -o result.txt

演示如下:

[root@k8s-app-1 python]# python3 try_SSH_remote_login.py --username root --single 192.168.233.131 22 123456
🚀 测试单个主机 192.168.233.131:22...

[SUCCESS] ✅ 192.168.233.131:22 登录成功

📊 ========== 测试结果 ==========
结果: ✅ 成功
[root@k8s-app-1 python]# 

多个主机测试

[root@k8s-app-1 python]# cat ip-list.txt 
192.168.233.103 22 123456
192.168.233.104 22 123456
192.168.233.105 22 123456
[root@k8s-app-1 python]# python3 try_SSH_remote_login.py --file ip-list.txt 

🚀 开始测试SSH登录 (超时: 10秒)...

测试 root@192.168.233.103:22...
[SUCCESS] ✅ 192.168.233.103:22 登录成功
测试 root@192.168.233.104:22...
[SUCCESS] ✅ 192.168.233.104:22 登录成功
测试 root@192.168.233.105:22...
[SUCCESS] ✅ 192.168.233.105:22 登录成功

📊 ========== 测试结果汇总 ==========
✅ 成功登录的主机 (3台):
  - root@192.168.233.103:22
  - root@192.168.233.104:22
  - root@192.168.233.105:22

❌ 登录失败的主机 (0台):

成功率: 3/3 (100.0%)
测试完成!

2. 维护用户名密码

批量管理远程主机:修改密码、创建用户、设置sudo、修复SSH端口

[root@k8s-app-1 python]# cat batch_user_manage_and_ssh_fix.py
import paramiko
import sys
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed

def read_ip_list(file):
    ip_info = []
    with open(file, 'r') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) == 3:
                ip_info.append({
                    'ip': parts[0],
                    'port': int(parts[1]),
                    'password': parts[2]
                })
    return ip_info

def get_os_type(ssh):
    stdin, stdout, stderr = ssh.exec_command('cat /etc/os-release')
    os_release = stdout.read().decode().lower()
    if 'ubuntu' in os_release:
        return 'ubuntu'
    elif 'centos' in os_release:
        return 'centos'
    else:
        return 'unknown'

def execute_remote_commands(info, args):
    ip = info['ip']
    port = info['port']
    login_password = info['password']
    login_user = args.user
    new_password = args.new_passwd
    create_user = args.create_user
    create_user_password = args.create_passwd

    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    result = {"ip": ip, "success": False}

    try:
        ssh.connect(ip, port=port, username=login_user, password=login_password, timeout=10)
        print(f"[SUCCESS] ✅ {ip}:{port} 登录成功")

        # 获取操作系统类型
        os_type = get_os_type(ssh)
        print(f"[INFO] 🖥️ {ip}: 操作系统类型 - {os_type}")

        # 修改登录用户密码
        if new_password:
            if os_type == 'ubuntu':
                cmd = f'echo "{login_user}:{new_password}" | sudo chpasswd'
            else:
                cmd = f'echo "{new_password}" | passwd --stdin {login_user}'
            
            stdin, stdout, stderr = ssh.exec_command(cmd)
            if stdout.channel.recv_exit_status() == 0:
                print(f"[UPDATE] 🔑 {ip}: 用户 {login_user} 密码已修改")
            else:
                print(f"[ERROR] ❌ {ip}: 登录用户密码修改失败 - {stderr.read().decode().strip()}")
        else:
            print(f"[SKIP] 🔐 {ip}: 未指定新密码,跳过登录用户密码修改")

        # 创建新用户(如果指定)
        if create_user:
            print(f"[DEBUG] 创建用户参数: {create_user}, 密码: {create_user_password}")  # 调试信息
            stdin, stdout, stderr = ssh.exec_command(f'id -u {create_user}')
            exit_status = stdout.channel.recv_exit_status()
            if exit_status != 0:
                print(f"[CREATE] 👤 {ip}: 用户 {create_user} 不存在,开始创建")
                # 创建用户
                stdin, stdout, stderr = ssh.exec_command(f'sudo useradd -m {create_user}')
                if stdout.channel.recv_exit_status() != 0:
                    print(f"[ERROR] ❌ {ip}: 用户创建失败 - {stderr.read().decode().strip()}")
                    raise Exception("用户创建失败")
                
                # 设置密码
                if create_user_password:
                    if os_type == 'ubuntu':
                        cmd = f'echo "{create_user}:{create_user_password}" | sudo chpasswd'
                    else:
                        cmd = f'echo "{create_user_password}" | passwd --stdin {create_user}'
                    
                    stdin, stdout, stderr = ssh.exec_command(cmd)
                    if stdout.channel.recv_exit_status() != 0:
                        print(f"[ERROR] ❌ {ip}: 密码设置失败 - {stderr.read().decode().strip()}")
                        raise Exception("密码设置失败")
                    print(f"[PASSWD] 🔑 {ip}: 用户 {create_user} 密码已设置")
                
                # 设置sudo权限
                sudo_cmd = f'echo "{create_user} ALL=(ALL) NOPASSWD:ALL" | sudo tee /etc/sudoers.d/{create_user}'
                stdin, stdout, stderr = ssh.exec_command(sudo_cmd)
                if stdout.channel.recv_exit_status() != 0:
                    print(f"[ERROR] ❌ {ip}: sudo权限设置失败 - {stderr.read().decode().strip()}")
                    raise Exception("sudo权限设置失败")
                
                # 设置文件权限
                chmod_cmd = f'sudo chmod 440 /etc/sudoers.d/{create_user}'
                ssh.exec_command(chmod_cmd)
                print(f"[SUDO] 🛡️ {ip}: 用户 {create_user} 已配置 sudo 权限")
            else:
                print(f"[SKIP] 👤 {ip}: 用户 {create_user} 已存在,跳过创建")

        # 设置 SSH 端口为 22(如不是)
        stdin, stdout, _ = ssh.exec_command('grep "^Port" /etc/ssh/sshd_config || echo "Port 22"')
        current_port = stdout.read().decode().strip()
        if "Port 22" not in current_port:
            ssh.exec_command('sed -i "s/^Port.*/Port 22/" /etc/ssh/sshd_config')
            print(f"[UPDATE] 🔧 {ip}: SSH 端口已设置为 22")
        else:
            print(f"[SKIP] 🔧 {ip}: SSH 端口已经是 22")

        # 重启 SSH 服务
        if os_type == 'ubuntu':
            restart_cmd = 'sudo systemctl restart ssh'
        else:
            restart_cmd = 'sudo systemctl restart sshd || sudo service sshd restart'
            
        stdin, stdout, stderr = ssh.exec_command(restart_cmd)
        if stdout.channel.recv_exit_status() == 0:
            print(f"[RESTART] ♻️ {ip}: SSH 服务已重启")
        else:
            print(f"[ERROR] ❌ {ip}: SSH 重启失败 - {stderr.read().decode().strip()}")

        result['success'] = True
    except paramiko.AuthenticationException:
        print(f"[FAILED] ❌ {ip}: 认证失败(密码错误)")
    except paramiko.SSHException as e:
        print(f"[FAILED] ❌ {ip}: SSH连接错误 - {str(e)}")
    except Exception as e:
        print(f"[FAILED] ❌ {ip}: 操作失败 - {str(e)}")
    finally:
        ssh.close()

    return result

def main():
    parser = argparse.ArgumentParser(description='批量管理远程主机:修改密码、创建用户、设置sudo、修复SSH端口')
    parser.add_argument('--file', default='ip-list.txt', help='IP列表文件(格式:IP 端口 密码)')
    parser.add_argument('--user', default='root', help='登录用户名')
    parser.add_argument('--new-passwd', help='新密码:用于修改登录用户密码')
    parser.add_argument('--create-user', help='要创建的新用户名')
    parser.add_argument('--create-passwd', help='新用户的密码')
    parser.add_argument('--workers', type=int, default=10, help='并发线程数(默认10)')

    args = parser.parse_args()

    # 调试输出参数
    print(f"\n[DEBUG] 命令行参数解析结果:")
    print(f"文件: {args.file}")
    print(f"登录用户: {args.user}")
    print(f"新密码: {args.new_passwd}")
    print(f"创建用户: {args.create_user}")
    print(f"创建用户密码: {args.create_passwd}")
    print(f"并发数: {args.workers}\n")

    ip_info_list = read_ip_list(args.file)
    if not ip_info_list:
        print("❌ IP列表为空或格式错误")
        sys.exit(1)

    print(f"\n🚀 开始处理 {len(ip_info_list)} 台主机,使用 {args.workers} 并发线程...\n")

    success, failed = [], []

    with ThreadPoolExecutor(max_workers=args.workers) as executor:
        futures = [executor.submit(execute_remote_commands, info, args) for info in ip_info_list]
        for future in as_completed(futures):
            result = future.result()
            (success if result['success'] else failed).append(result['ip'])

    print("\n📊 ========== 执行结果汇总 ==========")
    print(f"✅ 成功主机 ({len(success)} 台):")
    for ip in success:
        print(f"  - {ip}")

    print(f"\n❌ 失败主机 ({len(failed)} 台):")
    for ip in failed:
        print(f"  - {ip}")

    print("\n✅ 所有任务已完成。")

if __name__ == "__main__":
    main()
[root@k8s-app-1 python]# python3 batch_user_manage_and_ssh_fix.py --help
usage: batch_user_manage_and_ssh_fix.py [-h] [--file FILE] [--user USER] [--new-passwd NEW_PASSWD] [--create-user CREATE_USER] [--create-passwd CREATE_PASSWD] [--workers WORKERS]

批量管理远程主机:修改密码、创建用户、设置sudo、修复SSH端口

optional arguments:
  -h, --help            show this help message and exit
  --file FILE           IP列表文件(格式:IP 端口 密码)
  --user USER           登录用户名
  --new-passwd NEW_PASSWD
                        新密码:用于修改登录用户密码
  --create-user CREATE_USER
                        要创建的新用户名
  --create-passwd CREATE_PASSWD
                        新用户的密码
  --workers WORKERS     并发线程数(默认10)

3. 配置免密登陆

[root@k8s-app-1 script]# cat batch_root_free_login.py 
import paramiko
import sys
import os
from pathlib import Path

# 定义文件路径
ip_list_file = 'ip-list.txt'
ssh_key_file = str(Path.home() / '.ssh/id_rsa.pub')

# 读取 IP 列表文件
def read_ip_list(file):
    ip_info = []
    with open(file, 'r') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) == 3:
                ip_info.append({
                    'ip': parts[0],
                    'port': int(parts[1]),
                    'password': parts[2]
                })
    return ip_info

# 测试SSH连接并配置免密登录
def test_and_setup_ssh(ip, port, password):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    try:
        # 测试连接
        ssh.connect(ip, port=port, username='root', password=password, timeout=10)
        print(f"[SUCCESS] 成功连接到 {ip}:{port}")
        
        # 检查是否已有公钥文件
        if not os.path.exists(ssh_key_file):
            print(f"[WARNING] {ip}: 本地SSH公钥文件不存在,跳过配置免密登录")
            return True
        
        # 读取本地公钥
        with open(ssh_key_file, 'r') as key_file:
            public_key = key_file.read().strip()
        
        # 配置免密登录
        ssh.exec_command("mkdir -p ~/.ssh && chmod 700 ~/.ssh")
        ssh.exec_command(f"echo '{public_key}' >> ~/.ssh/authorized_keys && chmod 600 ~/.ssh/authorized_keys")
        print(f"[SUCCESS] {ip}: 已成功配置免密登录")
        return True
        
    except paramiko.AuthenticationException:
        print(f"[FAILED] {ip}: 认证失败,请检查密码")
    except paramiko.SSHException as e:
        print(f"[FAILED] {ip}: SSH连接错误 - {str(e)}")
    except Exception as e:
        print(f"[FAILED] {ip}: 连接失败 - {str(e)}")
    finally:
        ssh.close()
    return False

# 主函数
def main():
    # 读取IP列表
    ip_info_list = read_ip_list(ip_list_file)
    if not ip_info_list:
        print("错误: IP列表为空或格式错误")
        sys.exit(1)
    
    # 统计结果
    success_list = []
    failed_list = []
    
    print("\n开始测试SSH连接...\n")
    
    # 测试每台主机
    for info in ip_info_list:
        ip = info['ip']
        port = info['port']
        password = info['password']
        
        print(f"正在处理 {ip}:{port}...")
        if test_and_setup_ssh(ip, port, password):
            success_list.append(ip)
        else:
            failed_list.append(ip)
    
    # 打印汇总结果
    print("\n========== 测试结果汇总 ==========")
    print(f"成功连接的主机 ({len(success_list)}台):")
    for ip in success_list:
        print(f"  √ {ip}")
    
    print(f"\n连接失败的主机 ({len(failed_list)}台):")
    for ip in failed_list:
        print(f"  × {ip}")
    
    print("\n测试完成")

if __name__ == "__main__":
    main()

帮助命令:

#!/usr/bin/env python3
import paramiko
import sys
import os
import argparse
from pathlib import Path

def read_ip_list(file_path):
    """读取IP列表文件,支持多种格式"""
    ip_info = []
    try:
        with open(file_path, 'r') as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()
                if not line or line.startswith('#'):
                    continue
                
                parts = line.split()
                if len(parts) >= 3:
                    ip_info.append({
                        'ip': parts[0],
                        'port': int(parts[1]),
                        'password': parts[2],
                        'username': parts[3] if len(parts) > 3 else 'root'
                    })
                else:
                    print(f"警告: 第 {line_num} 行格式错误,已跳过: {line}")
    except FileNotFoundError:
        print(f"错误: 文件 '{file_path}' 不存在")
        sys.exit(1)
    except Exception as e:
        print(f"错误: 读取文件时发生错误 - {str(e)}")
        sys.exit(1)
    
    return ip_info

def test_and_setup_ssh(ip, port, password, username, ssh_key_file, timeout=10, dry_run=False):
    """测试SSH连接并配置免密登录"""
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    try:
        # 测试连接
        if dry_run:
            print(f"[DRY RUN] 将测试连接到 {username}@{ip}:{port}")
            return True
            
        ssh.connect(ip, port=port, username=username, password=password, timeout=timeout)
        print(f"[SUCCESS] 成功连接到 {username}@{ip}:{port}")
        
        # 检查是否已有公钥文件
        if not os.path.exists(ssh_key_file):
            print(f"[WARNING] {ip}: 本地SSH公钥文件不存在,跳过配置免密登录")
            return True
        
        # 读取本地公钥
        with open(ssh_key_file, 'r') as key_file:
            public_key = key_file.read().strip()
        
        # 配置免密登录
        commands = [
            "mkdir -p ~/.ssh",
            "chmod 700 ~/.ssh",
            f"echo '{public_key}' >> ~/.ssh/authorized_keys",
            "chmod 600 ~/.ssh/authorized_keys"
        ]
        
        for cmd in commands:
            stdin, stdout, stderr = ssh.exec_command(cmd)
            exit_status = stdout.channel.recv_exit_status()
            if exit_status != 0:
                error_output = stderr.read().decode().strip()
                print(f"[WARNING] {ip}: 命令执行失败 '{cmd}': {error_output}")
        
        print(f"[SUCCESS] {ip}: 已成功配置免密登录")
        return True
        
    except paramiko.AuthenticationException:
        print(f"[FAILED] {ip}: 认证失败,请检查密码")
    except paramiko.SSHException as e:
        print(f"[FAILED] {ip}: SSH连接错误 - {str(e)}")
    except Exception as e:
        print(f"[FAILED] {ip}: 连接失败 - {str(e)}")
    finally:
        ssh.close()
    return False

def main():
    parser = argparse.ArgumentParser(
        description="批量SSH免密登录配置工具",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
使用示例:
  %(prog)s -f ip-list.txt
  %(prog)s -f ip-list.txt -k ~/.ssh/my_key.pub
  %(prog)s -f ip-list.txt -t 5 --dry-run
  %(prog)s --single 192.168.1.1 22 password123
  %(prog)s --single 192.168.1.1 22 password123 -u admin

文件格式说明:
  IP地址 端口 密码 [用户名]
  192.168.1.1 22 password123 root
  10.0.0.1 2222 mypassword admin
  # 注释行以#开头
        """
    )
    
    parser.add_argument('-f', '--file', default='ip-list.txt', 
                       help='IP列表文件路径 (默认: ip-list.txt)')
    parser.add_argument('-k', '--key-file', default=str(Path.home() / '.ssh/id_rsa.pub'),
                       help='SSH公钥文件路径 (默认: ~/.ssh/id_rsa.pub)')
    parser.add_argument('-t', '--timeout', type=int, default=10,
                       help='连接超时时间(秒) (默认: 10)')
    parser.add_argument('-u', '--username', default='root',
                       help='用户名 (默认: root)')
    parser.add_argument('--dry-run', action='store_true',
                       help='试运行模式,只显示将要执行的操作而不实际执行')
    parser.add_argument('--single', nargs=3, metavar=('IP', 'PORT', 'PASSWORD'),
                       help='配置单个主机: IP 端口 密码')
    
    args = parser.parse_args()
    
    # 检查公钥文件是否存在
    if not args.dry_run and not os.path.exists(args.key_file):
        print(f"错误: SSH公钥文件 '{args.key_file}' 不存在")
        print("请使用 -k 参数指定正确的公钥文件路径,或使用 ssh-keygen 生成密钥对")
        sys.exit(1)
    
    # 测试单个主机
    if args.single:
        ip, port, password = args.single
        print(f"🚀 配置单个主机 {args.username}@{ip}:{port}...\n")
        
        success = test_and_setup_ssh(
            ip, int(port), password, args.username, 
            args.key_file, args.timeout, args.dry_run
        )
        
        print("\n📊 ========== 配置结果 ==========")
        print(f"结果: {'✅ 成功' if success else '❌ 失败'}")
        sys.exit(0 if success else 1)
    
    # 从文件测试多个主机
    if not os.path.exists(args.file):
        print(f"错误: 文件 '{args.file}' 不存在")
        print("请使用 -f 参数指定正确的文件路径,或使用 --single 配置单个主机")
        sys.exit(1)
    
    ip_info_list = read_ip_list(args.file)
    if not ip_info_list:
        print("错误: IP列表为空或格式错误!")
        print("文件格式应为: IP地址 端口 密码 [用户名]")
        sys.exit(1)
    
    success_list = []
    failed_list = []
    
    mode = "试运行模式" if args.dry_run else "执行模式"
    print(f"\n🚀 开始批量配置SSH免密登录 ({mode})...\n")
    
    for info in ip_info_list:
        ip, port, password, username = info['ip'], info['port'], info['password'], info['username']
        print(f"处理 {username}@{ip}:{port}...")
        
        if test_and_setup_ssh(ip, port, password, username, args.key_file, args.timeout, args.dry_run):
            success_list.append(f"{username}@{ip}:{port}")
        else:
            failed_list.append(f"{username}@{ip}:{port}")
    
    # 输出结果
    print("\n📊 ========== 配置结果汇总 ==========")
    print(f"✅ 成功配置的主机 ({len(success_list)}台):")
    for host in success_list:
        print(f"  - {host}")
    
    if failed_list:
        print(f"\n❌ 配置失败的主机 ({len(failed_list)}台):")
        for host in failed_list:
            print(f"  - {host}")
    
    print(f"\n成功率: {len(success_list)}/{len(ip_info_list)} "
          f"({len(success_list)/len(ip_info_list)*100:.1f}%%)")
    
    if args.dry_run:
        print("\n💡 提示: 本次为试运行模式,未实际执行任何配置操作")
        print("请移除 --dry-run 参数来实际执行配置")
    else:
        print("\n✅ 配置完成!")
    
    # 返回适当的退出码
    sys.exit(1 if failed_list and not args.dry_run else 0)

if __name__ == "__main__":
    main()

帮助命令:

# 查看帮助
python3 batch_root_free_login.py --help

# 使用默认配置
python3 batch_root_free_login.py

# 指定IP列表文件和公钥文件
python3 batch_root_free_login.py -f my_ips.txt -k ~/.ssh/my_key.pub

# 试运行模式(只显示将要执行的操作)
python3 batch_root_free_login.py --dry-run

# 配置单个主机
python3 batch_root_free_login.py --single 192.168.1.1 22 password123

# 配置单个主机并指定用户名
python3 batch_root_free_login.py --single 192.168.1.1 22 password123 -u admin

演示如下:

# 配置单个主机免密登陆,同时指定 root 和公钥
[root@k8s-app-1 python]# python3 batch_root_free_login.py --single 192.168.233.131 22 123456 -u root -k ~/.ssh/id_rsa.pub 
🚀 配置单个主机 root@192.168.233.131:22...

[SUCCESS] 成功连接到 root@192.168.233.131:22
[SUCCESS] 192.168.233.131: 已成功配置免密登录

📊 ========== 配置结果 ==========
结果: ✅ 成功

# 配置单个主机免密登陆,默认使用 root
[root@k8s-app-1 python]# python3 batch_root_free_login.py --single 192.168.233.131 22 123456
🚀 配置单个主机 root@192.168.233.131:22...

[SUCCESS] 成功连接到 root@192.168.233.131:22
[SUCCESS] 192.168.233.131: 已成功配置免密登录

📊 ========== 配置结果 ==========
结果: ✅ 成功

配置多个主机免密登陆

# 使用默认 root
[root@k8s-app-1 python]# python3 batch_root_free_login.py --file ip-list.txt

🚀 开始批量配置SSH免密登录 (执行模式)...

处理 root@192.168.233.103:22...
[SUCCESS] 成功连接到 root@192.168.233.103:22
[SUCCESS] 192.168.233.103: 已成功配置免密登录
处理 root@192.168.233.104:22...
[SUCCESS] 成功连接到 root@192.168.233.104:22
[SUCCESS] 192.168.233.104: 已成功配置免密登录
处理 root@192.168.233.105:22...
[SUCCESS] 成功连接到 root@192.168.233.105:22
[SUCCESS] 192.168.233.105: 已成功配置免密登录

📊 ========== 配置结果汇总 ==========
✅ 成功配置的主机 (3台):
  - root@192.168.233.103:22
  - root@192.168.233.104:22
  - root@192.168.233.105:22

成功率: 3/3 (100.0%%)

✅ 配置完成!

4. 自动化远程机器执行shell脚本

[root@k8s-app-1 script]# cat batch_push_and_exec_script.py 
#!/usr/bin/env python3
import paramiko
import argparse
import sys
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed

DANGEROUS_PATTERNS = [
    'rm -rf /', 'reboot', 'poweroff', 'shutdown', 'mkfs',
    ':(){ :|: & };:', 'dd if=', '>/dev/sd', '>:()'
]

def check_dangerous_script(local_script):
    """检查脚本中是否包含危险命令"""
    with open(local_script, 'r') as f:
        content = f.read()

    found = []
    for pattern in DANGEROUS_PATTERNS:
        if pattern in content:
            found.append(pattern)

    if found:
        print("\n⚠️⚠️⚠️ 警告:你的脚本中包含以下 **危险命令**,请务必确认:")
        for cmd in found:
            print(f"   ⚠️  {cmd}")
        print("继续执行可能会造成数据丢失或系统不可用!")
        confirm = input("❗ 是否继续执行?(yes/NO): ")
        if confirm.strip().lower() != 'yes':
            print("❌ 已取消执行。")
            sys.exit(1)

def read_ip_list(file_path):
    """读取IP列表文件,支持注释行"""
    ip_info = []
    try:
        with open(file_path, 'r') as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()
                if not line or line.startswith('#'):
                    continue
                
                parts = line.split()
                if len(parts) >= 3:
                    ip_info.append({
                        'ip': parts[0],
                        'port': int(parts[1]),
                        'password': parts[2],
                        'username': parts[3] if len(parts) > 3 else 'root'
                    })
                else:
                    print(f"警告: 第 {line_num} 行格式错误,已跳过: {line}")
    except FileNotFoundError:
        print(f"错误: 文件 '{file_path}' 不存在")
        sys.exit(1)
    except Exception as e:
        print(f"错误: 读取文件时发生错误 - {str(e)}")
        sys.exit(1)
    
    return ip_info

def ssh_push_and_exec(info, args):
    ip = info['ip']
    port = info['port']
    password = info['password']
    username = info.get('username', args.user)
    local_script = args.script
    remote_script = f"/tmp/{Path(local_script).name}"
    script_args = args.script_args or ""

    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    result = {"ip": ip, "success": False}

    try:
        ssh.connect(ip, port=port, username=username, password=password, timeout=args.timeout)
        print(f"[{ip}] ✅ 登录成功")

        # 上传脚本
        sftp = ssh.open_sftp()
        sftp.put(local_script, remote_script)
        sftp.chmod(remote_script, 0o755)
        sftp.close()
        print(f"[{ip}] 📤 脚本已上传到 {remote_script}")

        # 执行脚本
        cmd = f"bash {remote_script} {script_args}"
        stdin, stdout, stderr = ssh.exec_command(cmd)
        exit_code = stdout.channel.recv_exit_status()

        stdout_result = stdout.read().decode()
        stderr_result = stderr.read().decode()

        # 清理远程脚本
        ssh.exec_command(f"rm -f {remote_script}")

        if exit_code == 0:
            print(f"[{ip}] ✅ 脚本执行成功")
            result["success"] = True
        else:
            print(f"[{ip}] ❌ 脚本执行失败 (exit {exit_code})")

        # 打印输出
        if stdout_result.strip():
            print(f"[{ip}] --- STDOUT ---\n{stdout_result}")
        if stderr_result.strip():
            print(f"[{ip}] --- STDERR ---\n{stderr_result}")

    except Exception as e:
        print(f"[{ip}] ❌ 执行出错: {str(e)}")
    finally:
        ssh.close()

    return result

def main():
    parser = argparse.ArgumentParser(
        description='批量上传并执行本地脚本',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
使用示例:
  %(prog)s --script deploy.sh
  %(prog)s --script deploy.sh --file my_ips.txt
  %(prog)s --script deploy.sh --single 192.168.1.1 22 password123
  %(prog)s --script deploy.sh --script-args "--verbose --force"
  %(prog)s --script deploy.sh --workers 5 --timeout 30

文件格式说明:
  IP地址 端口 密码 [用户名]
  192.168.1.1 22 password123 root
  10.0.0.1 2222 mypassword admin
  # 注释行以#开头
        """
    )
    
    parser.add_argument('--file', default='ip-list.txt', help='IP列表文件 (默认: ip-list.txt)')
    parser.add_argument('--user', default='root', help='SSH 登录用户名 (默认: root)')
    parser.add_argument('--script', required=True, help='本地要上传并执行的脚本路径 (.sh)')
    parser.add_argument('--script-args', help='传递给远程脚本的参数(可选)')
    parser.add_argument('--workers', type=int, default=10, help='并发线程数 (默认: 10)')
    parser.add_argument('--timeout', type=int, default=10, help='连接超时时间(秒) (默认: 10)')
    parser.add_argument('--single', nargs=3, metavar=('IP', 'PORT', 'PASSWORD'),
                       help='执行单个主机: IP 端口 密码')
    
    args = parser.parse_args()

    # 脚本文件检查与危险命令提示
    script_path = Path(args.script)
    if not script_path.exists():
        print(f"❌ 脚本文件不存在: {args.script}")
        sys.exit(1)
    if script_path.suffix != '.sh':
        print(f"❌ 脚本必须是 .sh 文件: {args.script}")
        sys.exit(1)

    check_dangerous_script(args.script)

    # 处理单个主机模式
    if args.single:
        ip, port, password = args.single
        single_info = {
            'ip': ip,
            'port': int(port),
            'password': password,
            'username': args.user
        }
        
        print(f"\n🚀 开始执行单个主机 {args.user}@{ip}:{port}...\n")
        result = ssh_push_and_exec(single_info, args)
        
        print("\n📊 执行完成")
        if result["success"]:
            print(f"✅ 主机 {ip} 执行成功")
            sys.exit(0)
        else:
            print(f"❌ 主机 {ip} 执行失败")
            sys.exit(1)

    # 批量模式:从文件读取IP列表
    if not os.path.exists(args.file):
        print(f"错误: 文件 '{args.file}' 不存在")
        print("请使用 --file 参数指定正确的文件路径,或使用 --single 执行单个主机")
        sys.exit(1)
    
    ip_info_list = read_ip_list(args.file)
    if not ip_info_list:
        print("❌ IP 列表为空或格式错误")
        print("文件格式应为: IP地址 端口 密码 [用户名]")
        sys.exit(1)

    print(f"\n🚀 开始批量执行脚本,共 {len(ip_info_list)} 台主机,最大并发 {args.workers} ...\n")

    success, failed = [], []

    with ThreadPoolExecutor(max_workers=args.workers) as executor:
        futures = [executor.submit(ssh_push_and_exec, info, args) for info in ip_info_list]
        for future in as_completed(futures):
            result = future.result()
            (success if result["success"] else failed).append(result["ip"])

    print("\n📊 执行完成")
    print(f"✅ 成功主机 ({len(success)} 台):")
    for ip in success:
        print(f"  - {ip}")
    
    if failed:
        print(f"\n❌ 失败主机 ({len(failed)} 台):")
        for ip in failed:
            print(f"  - {ip}")
    
    print(f"\n成功率: {len(success)}/{len(ip_info_list)} "
          f"({len(success)/len(ip_info_list)*100:.1f}%%)")

    # 如果有失败的主机,返回非零退出码
    if failed:
        sys.exit(1)
    else:
        sys.exit(0)

if __name__ == '__main__':
    main()

帮助命令

[root@k8s-app-1 python]# python3 batch_push_and_exec_script.py --help
usage: batch_push_and_exec_script.py [-h] [--file FILE] [--user USER] --script SCRIPT [--script-args SCRIPT_ARGS] [--workers WORKERS] [--timeout TIMEOUT] [--single IP PORT PASSWORD]

批量上传并执行本地脚本

optional arguments:
  -h, --help            show this help message and exit
  --file FILE           IP列表文件 (默认: ip-list.txt)
  --user USER           SSH 登录用户名 (默认: root)
  --script SCRIPT       本地要上传并执行的脚本路径 (.sh)
  --script-args SCRIPT_ARGS
                        传递给远程脚本的参数(可选)
  --workers WORKERS     并发线程数 (默认: 10)
  --timeout TIMEOUT     连接超时时间(秒) (默认: 10)
  --single IP PORT PASSWORD
                        执行单个主机: IP 端口 密码

使用示例:
  batch_push_and_exec_script.py --script deploy.sh
  batch_push_and_exec_script.py --script deploy.sh --file my_ips.txt
  batch_push_and_exec_script.py --script deploy.sh --single 192.168.1.1 22 password123
  batch_push_and_exec_script.py --script deploy.sh --script-args "--verbose --force"
  batch_push_and_exec_script.py --script deploy.sh --workers 5 --timeout 30

文件格式说明:
  IP地址 端口 密码 [用户名]
  192.168.1.1 22 password123 root
  10.0.0.1 2222 mypassword admin
  # 注释行以#开头
# 查看帮助
python3 batch_push_and_exec_script.py --help

# 批量执行(使用默认ip-list.txt)
python3 batch_push_and_exec_script.py --script deploy.sh

# 批量执行(指定IP列表文件)
python3 batch_push_and_exec_script.py --script deploy.sh --file my_ips.txt

# 单个主机执行
python3 batch_push_and_exec_script.py --script deploy.sh --single 192.168.1.1 22 password123

# 带参数执行
python3 batch_push_and_exec_script.py --script deploy.sh --script-args "--verbose --force"

# 控制并发和超时
python3 batch_push_and_exec_script.py --script deploy.sh --workers 5 --timeout 30

六、K8S 和 容器相关脚本

1. 根据域名查询都有哪些 ingress

[root@k8s-app-1 meta42-tls]# cat check_ingress_secrets.sh
#!/bin/bash

# 目标域名
TARGET_DOMAIN="blog.tianxiang.love"

# 获取所有命名空间
namespaces=$(kubectl get ns -o jsonpath='{.items[*].metadata.name}')

# 打印表头
printf "%-30s %-50s %-50s %-50s\n" "Namespace" "Ingress Name" "TLS Secret Name" "Matching Host"
printf "===============================================================================================================================\n"

# 遍历所有命名空间
for ns in $namespaces; do
  # 获取当前命名空间的所有ingress
  ingresses=$(kubectl get ingress -n $ns -o jsonpath='{.items[*].metadata.name}')
  
  for ingress in $ingresses; do
    # 获取ingress的完整信息
    ingress_json=$(kubectl get ingress -n $ns $ingress -o json)
    
    # 检查是否有匹配的host
    matching_hosts=$(echo "$ingress_json" | jq -r --arg domain "$TARGET_DOMAIN" '.spec.rules[]?.host | select(. != null and contains($domain))')
    
    if [ -n "$matching_hosts" ]; then
      # 获取tls secret名称
      secrets=$(echo "$ingress_json" | jq -r '.spec.tls[]?.secretName | select(. != null)')
      
      # 如果没有tls配置,则显示N/A
      if [ -z "$secrets" ]; then
        printf "%-30s %-50s %-50s %-50s\n" "$ns" "$ingress" "N/A (No TLS configured)" "$matching_hosts"
      else
        # 一个ingress可能有多个tls配置,遍历输出
        for secret in $secrets; do
          printf "%-30s %-50s %-50s %-50s\n" "$ns" "$ingress" "$secret" "$matching_hosts"
        done
      fi
    fi
  done
done

2. 更新 ingress 使用的 secret

[root@k8s-app-1 meta42-tls]# cat update_ingress_certs.sh 
#!/bin/bash

# 目标域名
TARGET_DOMAIN="blog.tianxiang.love"
# 新证书文件路径
NEW_CERT_FILE="/data/tianxiang/script/meta42-tls/nginx/blog.tianxiang.love.pem"
NEW_KEY_FILE="/data/tianxiang/script/meta42-tls/nginx/blog.tianxiang.love.key"

# 检查证书文件是否存在
if [ ! -f "$NEW_CERT_FILE" ] || [ ! -f "$NEW_KEY_FILE" ]; then
  echo "错误:证书文件不存在,请检查路径"
  echo "NEW_CERT_FILE: $NEW_CERT_FILE"
  echo "NEW_KEY_FILE: $NEW_KEY_FILE"
  exit 1
fi

# 获取所有命名空间
namespaces=$(kubectl get ns -o jsonpath='{.items[*].metadata.name}')

# 打印表头
printf "%-30s %-50s %-50s %-50s\n" "Namespace" "Ingress Name" "TLS Secret Name" "Matching Host"
printf "===============================================================================================================================\n"

# 存储需要更新的secret列表
declare -A secrets_to_update

# 遍历所有命名空间
for ns in $namespaces; do
  # 获取当前命名空间的所有ingress
  ingresses=$(kubectl get ingress -n $ns -o jsonpath='{.items[*].metadata.name}')
  
  for ingress in $ingresses; do
    # 获取ingress的完整信息
    ingress_json=$(kubectl get ingress -n $ns $ingress -o json)
    
    # 检查是否有匹配的host
    matching_hosts=$(echo "$ingress_json" | jq -r --arg domain "$TARGET_DOMAIN" '.spec.rules[]?.host | select(. != null and contains($domain))')
    
    if [ -n "$matching_hosts" ]; then
      # 获取tls secret名称
      secrets=$(echo "$ingress_json" | jq -r '.spec.tls[]?.secretName | select(. != null)')
      
      # 只处理有TLS配置的ingress
      if [ -n "$secrets" ]; then
        # 一个ingress可能有多个tls配置,遍历输出
        for secret in $secrets; do
          printf "%-30s %-50s %-50s %-50s\n" "$ns" "$ingress" "$secret" "$matching_hosts"
          # 记录需要更新的secret
          secrets_to_update["$ns/$secret"]=1
        done
      fi
    fi
  done
done

# 询问是否要更新证书
if [ ${#secrets_to_update[@]} -gt 0 ]; then
  echo -e "\n发现 ${#secrets_to_update[@]} 个需要更新的TLS secret:"
  for secret in "${!secrets_to_update[@]}"; do
    echo " - $secret"
  done

  read -p "是否要更新以上所有secret的证书?(y/n) " -n 1 -r
  echo
  if [[ $REPLY =~ ^[Yy]$ ]]; then
    for secret in "${!secrets_to_update[@]}"; do
      ns="${secret%%/*}"
      secret_name="${secret#*/}"
      
      echo -e "\n正在更新 $ns/$secret_name ..."
      kubectl create secret tls "$secret_name" \
        --cert="$NEW_CERT_FILE" \
        --key="$NEW_KEY_FILE" \
        -n "$ns" \
        --dry-run=client -o yaml | kubectl apply -f -
      
      if [ $? -eq 0 ]; then
        echo "成功更新 $ns/$secret_name"
      else
        echo "更新 $ns/$secret_name 失败"
      fi
    done
    echo -e "\n所有证书更新完成"
  else
    echo "已取消证书更新"
  fi
else
  echo -e "\n没有找到需要更新的TLS secret"
fi

Linux系统与应用
shell
License:  CC BY 4.0
Share

Further Reading

Aug 6, 2025

Linux 使用 lvm 管理设备挂载分区

LVM(逻辑卷管理)是Linux环境下用于灵活管理磁盘分区的机制。它允许管理员将多个磁盘分区整合为一个卷组,形成存储池,并在卷组上创建逻辑卷以供使用。LVM的主要优点包括:文件系统大小不受物理磁盘限制、可在线扩展文件系统、支持新磁盘添加到存储池中、数据冗余以及方便的数据迁移。然而,LVM也存在一些局限性,比如移除磁盘时的操作复杂性、单个磁盘故障可能影响整个卷组、不能减小文件系统大小以及可能影响存储性能。LVM的基本组成包括物理卷(PV)、卷组(VG)、逻辑卷(LV)和物理块(PE)。创建LVM的过程涉及初始化物理卷、创建卷组、创建逻辑卷、格式化逻辑卷、挂载并配置自动挂载等步骤。通过这些操作,LVM提供了一种高效且灵活的方式来管理和扩展Linux系统的存储资源。

May 19, 2025

Linux 运维常用脚本和工具

本文档详细介绍了多个脚本及其用途,涵盖了ETCD备份、远程同步、Harbor仓库同步、压缩与解压缩、计划任务、网站健康监控、带宽流量统计、MySQL查询、SSH管理及K8S相关操作等。以下是每个部分的简洁摘要: ### 一、ETCD 备份 - **主程序脚本**:通过选择健康的ETCD节点进行快照备份,并删除超过7天的旧备份。 - **systemd 管理脚本**:配置systemd服务以定时执行ETCD备份。 - **systemd 定时任务**:每4小时运行一次ETCD备份。 ### 二、远程同步 - **定时远程同步本地文件**:每天凌晨1点将最新的6个ETCD备份文件同步到远程服务器。 - **脚本本地目录同步到远程**:同步本地目录(如mysql-backup、pgsql-backup和mongodb-backup)到远程服务器,支持重试机制并记录日志。 ### 三、Harbor 仓库远程自动同步 - **harbor-sync.sh**:自动同步A仓库的镜像到B仓库,支持守护模式和日志记录。 - **systemd 管理脚本**:配置systemd服务以持续运行Harbor同步脚本。 ### 四、常用Shell脚本 - **解压缩**:使用`tar`和`pigz`进行多线程压缩和解压缩。 - **计划任务**:同步本地最新目录到远端,执行脚本自动导入数据库。 - **循环探活网站**:使用`curl`和`ping`定期检查多个URL的健康状态。 - **带宽流量统计**:使用Python脚本统计多台服务器的出入口带宽流量。 - **参数化同步脚本**:灵活备份任意目录到远程位置,支持多种选项和错误处理。 ### 五、MySQL 常用语法 - **查询数据库存储使用大小**:查询整个数据库或单个数据库的存储使用情况。 ### 六、SSH 操作相关脚本 - **测试远程登录**:批量测试多个主机的SSH连接。 - **维护用户名密码**:批量修改密码、创建用户、设置sudo权限和修复SSH端口。 - **配置免密登录**:批量配置多台主机的SSH免密登录。 - **自动化远程机器执行脚本**:批量上传并执行本地脚本到远程机器,支持危险命令检测。 ### 七、K8S 和容器相关脚本 - **根据域名查询Ingress**:查询所有命名空间中匹配特定域名的Ingress及其TLS Secret。 - **更新Ingress使用的Secret**:更新指定域名的Ingress所使用的TLS证书。 这些脚本提供了从备份、同步、监控到自动化管理的一系列功能,适用于多种运维场景。

Mar 20, 2025

Linux 双网卡网络回包问题

一、背景 办公网:172.16.246.171/24 内网:192.168.0.0/16 最近我在使用 ESXI 和 vCenter 虚拟机工具,由于服务器网卡不是很多,只有4张卡,并且2个电口2个光口,由于没有使用光口交换机只有电口交换机,所以也就使用的电口网卡 由于创建虚拟机不想影响到办公网络,

OLDER

Docker-compose 部署 nebula 集群数据库

NEWER

Kubernetes 创建 Pod 底层原理

Recently Updated

  • Kubernetes 安装部署 Alist 并配置 Onlyoffice
  • KubeSphere-04-Dev-ops 流水线插件的使用
  • KubeSphere-03-Logging 日志插件的使用
  • KubeSphere-02-Service Mesh 的使用
  • KubeSphere-01-介绍与基础使用

Trending Tags

KVM Service Mesh Docker shell 路由规则 Mysql Containerd GitOps 网络设备 Prometheus

Contents

©2025 甄天祥-Linux-个人小站. Some rights reserved.

Using the Halo theme Chirpy