avatar

甄天祥-Linux-个人小站

A text-focused Halo theme

  • 首页
  • 分类
  • 标签
  • 关于
Home 常见数据库备份方案
文章

常见数据库备份方案

Posted 2025-05-8 Updated 2025-08- 8
By Administrator
180~232 min read

一、备份策略

现有数据库类型:MySQL、PostgreSQL、MongoDB

完全备份:定期执行完整数据库备份,采取每日备份策略;

备份来源:源数据源为从库,不能对主库写入有影响;

备份目录:

目录示例:
[root@mysql-131 docker-app]# tree /var/openebs/local/
/var/openebs/local/
├── etcd-backup
│   ├── etcd-snapshot-20250430-133607.db
│   ├── etcd-snapshot-20250507-125654.db
│   ├── etcd-snapshot-20250507-160000.db
│   ├── etcd-snapshot-20250507-200000.db
│   ├── etcd-snapshot-20250508-000000.db
│   ├── etcd-snapshot-20250508-040000.db
│   ├── etcd-snapshot-20250508-080000.db
│   └── etcd-snapshot-20250508-120000.db
├── mongodb-backup
├── mysql-backup
│   ├── 20250428_150315
│   │   ├── agilebpm_5_20250428_150315.sql.gz
│   │   ├── app_center_20250428_150315.sql.gz
│   │   ├── application_20250428_150315.sql.gz
│   │   ├── dinky_20250428_150315.sql
│   │   ├── dispatcher_20250428_150315.sql.gz
│   │   ├── edgeplanet_20250428_150315.sql
│   │   ├── entrepot_20250428_150315.sql.gz

备份文件:采用压缩方式,文件以时间缀命名,便于恢复;

备份存储:预留充足的存储空间,备份网络传输采用光口传输;多地存储策略;

恢复测试:执行恢复演练,验证备份的有效性和恢复流程的可行性(可先在腾仁预发环境做恢复测试);

二、备份计划安排

数据库类型

备份时间

备份周期

保存周期

监控和报警

存储地址

自动化工具

MySQL

11:00

每日

15d

备份成功通知

/backup

cron作业与docker配合

MongoDB

15:00

每日

15d

备份成功通知

/backup

cron作业与docker配合

PostgreSQL

17:00

每日

15d

备份成功通知

/backup

cron作业与docker配合

三、备份脚本

1. MySQL脚本

#!/bin/bash
set -euo pipefail

# 配置部分
readonly BACKUP_DIR="/backup"
readonly BACKUP_RETENTION_DAYS=15
readonly TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
readonly CURRENT_BACKUP_PATH="${BACKUP_DIR}/${TIMESTAMP}"
readonly LOG_FILE="/var/log/mysql_backup.log"
readonly PYTHON_SCRIPT="/alert.py"
readonly SUMMARY_FILE="${CURRENT_BACKUP_PATH}/summary.json"
readonly PARALLEL_THREADS=${PARALLEL_THREADS:-4}
readonly DBS_RAW=${DBS:-""} # 从环境变量读取数据库连接信息,默认为空,"192.168.235.221:3306:root:123456,192.168.235.222:3306:root:123456"

# 将环境变量中的逗号分隔转换为数组
IFS=',' read -ra DB_ARRAY <<< "$DBS_RAW"
    
# MySQL 实例配置(兼容原有数组和环境变量方式)
declare -a DBS=(
  "${DB_ARRAY[@]}"
  # 可以保留原有的硬编码配置作为备用
  # "192.168.235.220:3306:root:123456"
)
# 日志函数
log() {
  local level=$1
  local message=$2
  echo "[$(date '+%Y-%m-%d %H:%M:%S')] [${level}] ${message}" | tee -a "$LOG_FILE"
}

# 使用Python获取数据库列表
get_databases() {
  local host=$1 port=$2 user=$3 pass=$4

  python3 <<EOF
import subprocess
import sys

try:
    cmd = [
        "mysql",
        "--host=$host",
        "--port=$port",
        "--user=$user",
        "--password=$pass",
        "-e", "SHOW DATABASES;"
    ]
    result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)

    for line in result.stdout.splitlines():
        db = line.strip()
        if db and db not in ["information_schema", "performance_schema", "mysql", "sys", "Database"]:
            print(db)
except subprocess.CalledProcessError as e:
    print(f"ERROR: {e.stderr}", file=sys.stderr)
    sys.exit(1)
except Exception as e:
    print(f"ERROR: {str(e)}", file=sys.stderr)
    sys.exit(1)
EOF
}

# 备份数据库并多线程压缩文件
backup_database() {
  local host=$1 port=$2 user=$3 pass=$4 db=$5
  local backup_file="${CURRENT_BACKUP_PATH}/${db}_${TIMESTAMP}.sql"

  log "INFO" "开始备份数据库 '${db}'(${host}:${port})"

  if ! mysqldump --host="$host" --port="$port" --user="$user" --password="$pass" \
    --set-gtid-purged=OFF --single-transaction --routines --triggers "$db" > "$backup_file"; then
    log "ERROR" "数据库 '${db}' 备份失败!"
    return 1
  fi

  echo "$backup_file" | xargs -P "$PARALLEL_THREADS" -n 1 -I{} bash -c '
    echo "压缩: {}.gz"
    if command -v pigz &>/dev/null; then
      pigz -f -p '"$PARALLEL_THREADS"' "{}"
    else
      gzip -f "{}"
    fi
  '

  log "INFO" "MySQL 数据库 '${db}' 备份完成(已压缩)"
  return 0
}

# 主函数
main() {
  mkdir -p "$CURRENT_BACKUP_PATH" || { log "ERROR" "无法创建备份目录"; exit 1; }

  log "INFO" "===== 开始 MySQL 数据库备份 ====="
  log "INFO" "备份目录:${CURRENT_BACKUP_PATH}"

  total_dbs=0
  failed_dbs=0

  for dbinfo in "${DBS[@]}"; do
    IFS=":" read -r host port user pass <<< "$dbinfo"

    log "INFO" "正在获取 ${host}:${port} 的数据库列表..."
    
    # 获取数据库列表
    databases=$(get_databases "$host" "$port" "$user" "$pass") || {
      log "ERROR" "连接 ${host}:${port} 获取数据库列表失败"
      failed_dbs=$((failed_dbs + 1))
      continue
    }

    for db in $databases; do
      total_dbs=$((total_dbs + 1))
      if ! backup_database "$host" "$port" "$user" "$pass" "$db"; then
        failed_dbs=$((failed_dbs + 1))
      fi
      #sleep 60
    done
  done

  log "INFO" "清理 ${BACKUP_RETENTION_DAYS} 天前的旧备份..."
  find "$BACKUP_DIR" -mindepth 1 -maxdepth 1 -type d -mtime +$BACKUP_RETENTION_DAYS -exec rm -rf {} \;

  # 状态判断
  if [[ $total_dbs -eq 0 ]]; then
    status="fail"
    log "ERROR" "未成功识别到任何数据库,可能数据库连接全失败"
  elif [[ $failed_dbs -eq 0 ]]; then
    status="success"
  elif [[ $failed_dbs -ge $total_dbs ]]; then
    status="fail"
  else
    status="partial"
  fi

  # 生成 summary.json 文件,供 alert.py 脚本调用
  log "INFO" "生成 summary.json ..."
  python3 <<EOF
import os
import json
import sys

try:
    backup_path = "$CURRENT_BACKUP_PATH"
    summary_file = "$SUMMARY_FILE"
    
    if not os.path.isdir(backup_path):
        print(f"备份路径不存在: {backup_path}", file=sys.stderr)
        sys.exit(1)
        
    # 支持的扩展名列表
    valid_extensions = ('.sql.gz', '.dump.gz', '.bson.gz')
    
    summary = [
        {
            'file': f,
            'size': f'{round(os.path.getsize(os.path.join(backup_path, f)) / (1024 * 1024), 2)}MB',
            'path': os.path.join(backup_path, f),
            'type': 'MySQL' if f.endswith('.sql.gz') else
                    'PostgreSQL' if f.endswith('.dump.gz') else
                    'MongoDB' if f.endswith('.bson.gz') else 'Unknown'
        }
        for f in os.listdir(backup_path) 
        if any(f.endswith(ext) for ext in valid_extensions)
    ]
    
    with open(summary_file, 'w') as f:
        json.dump(summary, f, indent=2)
except Exception as e:
    print(f"生成摘要失败: {e}", file=sys.stderr)
    sys.exit(1)
EOF

  [[ $? -ne 0 ]] && log "ERROR" "生成摘要失败" || log "INFO" "summary.json 生成完成"

  log "INFO" "尝试发送飞书通知(状态:$status)..."
  if python3 "$PYTHON_SCRIPT" "$SUMMARY_FILE" "$status"; then
    log "INFO" "飞书通知发送成功 ✅"
  else
    log "ERROR" "飞书通知发送失败 ❌"
  fi

  log "INFO" "备份任务全部完成 ✅"
  exit 0
}

main

2. MongoDB脚本

#!/bin/bash
set -euo pipefail

# 配置部分
readonly BACKUP_DIR="/backup"
readonly BACKUP_RETENTION_DAYS=15
readonly TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
readonly CURRENT_BACKUP_PATH="${BACKUP_DIR}/${TIMESTAMP}"
readonly LOG_FILE="/var/log/mongodb_backup.log"
readonly PYTHON_SCRIPT="/alert.py"
readonly SUMMARY_FILE="${CURRENT_BACKUP_PATH}/summary.json"
readonly PARALLEL_THREADS=${PARALLEL_THREADS:-4}
readonly DDBS_RAWS=${DBS:-""} # 从环境变量读取数据库连接信息,默认为空,"192.168.233.131:20030:root:123456,192.168.233.132:20030:root:123456"

# 将环境变量中的逗号分隔转换为数组
IFS=',' read -ra DB_ARRAY <<< "$DBS_RAW"
    
# # MongoDB 实例配置(兼容原有数组和环境变量方式)
declare -a DBS=(
  "${DB_ARRAY[@]}"
  # 可以保留原有的硬编码配置作为备用
  # 192.168.233.131:20030:root:123456"
)

# 日志函数
log() {
  local level=$1
  local message=$2
  echo "[$(date '+%Y-%m-%d %H:%M:%S')] [${level}] ${message}" | tee -a "$LOG_FILE"
}

# 使用Python解析数据库列表
get_databases() {
  local host=$1 port=$2 user=$3 pass=$4
  
# 使用Python解析JSON输出
  python3 <<EOF
import json
import subprocess
import sys

try:
    cmd = [
        'mongosh', '--host', '$host', '--port', '$port',
        '-u', '$user', '-p', '$pass', '--authenticationDatabase', 'admin',
        '--quiet', '--eval', 'JSON.stringify(db.getMongo().getDBNames())'
    ]
    result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)

    dbs = json.loads(result.stdout)
    for db in dbs:
        if db not in ['admin', 'local', 'config']:
            print(db)
except subprocess.CalledProcessError as e:
    print(f"ERROR: {e.stderr}", file=sys.stderr)
    sys.exit(1)
except Exception as e:
    print(f"ERROR: {str(e)}", file=sys.stderr)
    sys.exit(1)
EOF
}

# 备份数据库并多线程压缩文件
backup_database() {
  local host=$1 port=$2 user=$3 pass=$4 db=$5

  log "INFO" "开始备份数据库 '${db}'(${host}:${port})"

  # 先执行不压缩的备份
  if ! mongodump --host "$host" --port "$port" \
     --username "$user" --password "$pass" \
     --authenticationDatabase admin \
     --db "$db" \
     --out "$CURRENT_BACKUP_PATH"; then
    log "ERROR" "数据库 '${db}' 备份失败!"
    return 1
  fi

  find "$CURRENT_BACKUP_PATH/$db" -type f -name "*.bson" | \
    xargs -P "$PARALLEL_THREADS" -n 1 -I{} bash -c '
      echo "压缩: {}.gz"
      if command -v pigz &>/dev/null; then
        pigz -f -p '"$PARALLEL_THREADS"' "{}"
      else
        gzip -f "{}"
      fi
    '

  log "INFO" "MongoDB 数据库 '${db}' 备份完成(已压缩)"
  return 0
}

# 主函数(保持不变)
main() {
  mkdir -p "$CURRENT_BACKUP_PATH" || { log "ERROR" "无法创建备份目录"; exit 1; }

  log "INFO" "===== 开始 MongoDB 数据库备份 ====="
  log "INFO" "备份目录:${CURRENT_BACKUP_PATH}"

  total_dbs=0
  failed_dbs=0

  for dbinfo in "${DBS[@]}"; do
    IFS=":" read -r host port user pass <<< "$dbinfo"

    log "INFO" "正在获取 ${host}:${port} 的数据库列表..."
    
    # 获取数据库列表
    mapfile -t databases < <(get_databases "$host" "$port" "$user" "$pass" || echo "")
    
    if [[ ${#databases[@]} -eq 0 ]]; then
      log "ERROR" "未获取到任何数据库或连接失败"
      failed_dbs=$((failed_dbs + 1))
      continue
    fi

    for db in "${databases[@]}"; do
      [[ -z "$db" ]] && continue
      total_dbs=$((total_dbs + 1))
      
      if ! backup_database "$host" "$port" "$user" "$pass" "$db"; then
        failed_dbs=$((failed_dbs + 1))
      fi
      sleep 1
    done
  done

  log "INFO" "清理 ${BACKUP_RETENTION_DAYS} 天前的旧备份..."
  find "$BACKUP_DIR" -mindepth 1 -maxdepth 1 -type d -mtime +$BACKUP_RETENTION_DAYS -exec rm -rf {} \;

  # 状态判断
  if [[ $total_dbs -eq 0 ]]; then
    status="fail"
    log "ERROR" "未成功识别到任何数据库,可能数据库连接全失败"
  elif [[ $failed_dbs -eq 0 ]]; then
    status="success"
  elif [[ $failed_dbs -ge $total_dbs ]]; then
    status="fail"
  else
    status="partial"
  fi

  # 生成 summary.json 文件,供 alert.py 脚本调用
  log "INFO" "生成 summary.json ..."
  python3 <<EOF
import os
import json
import sys

try:
    backup_path = "$CURRENT_BACKUP_PATH"
    summary_file = "$SUMMARY_FILE"
    
    if not os.path.isdir(backup_path):
        print(f"备份路径不存在: {backup_path}", file=sys.stderr)
        sys.exit(1)
        
    # 支持的扩展名列表
    valid_extensions = ('.sql.gz', '.dump.gz', '.bson.gz')
    
    summary = []
    
    # 遍历备份目录
    for root, dirs, files in os.walk(backup_path):
        for file in files:
            if any(file.endswith(ext) for ext in valid_extensions):
                file_path = os.path.join(root, file)
                # 获取相对路径(相对于备份目录)
                rel_path = os.path.relpath(file_path, backup_path)
                summary.append({
                    'file': rel_path,  # 使用相对路径显示
                    'size': f'{round(os.path.getsize(file_path) / (1024 * 1024), 2)}MB',
                    'path': file_path,
                    'type': 'MySQL' if file.endswith('.sql.gz') else
                            'PostgreSQL' if file.endswith('.dump.gz') else
                            'MongoDB' if file.endswith('.bson.gz') else 'Unknown'
                })
    
    with open(summary_file, 'w') as f:
        json.dump(summary, f, indent=2)
except Exception as e:
    print(f"生成摘要失败: {e}", file=sys.stderr)
    sys.exit(1)
EOF

  [[ $? -ne 0 ]] && log "ERROR" "生成摘要失败" || log "INFO" "summary.json 生成完成"

  log "INFO" "尝试发送飞书通知(状态:$status)..."
  if python3 "$PYTHON_SCRIPT" "${CURRENT_BACKUP_PATH}/summary.json" "$status"; then
    log "INFO" "飞书通知发送成功 ✅"
  else
    log "ERROR" "飞书通知发送失败 ❌"
  fi

  log "INFO" "备份任务全部完成 ✅"
  exit 0
}

main

3. PostgreSQL脚本

#!/bin/bash
set -euo pipefail

# 配置部分
readonly BACKUP_DIR="/backup"
readonly BACKUP_RETENTION_DAYS=15
readonly TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
readonly CURRENT_BACKUP_PATH="${BACKUP_DIR}/${TIMESTAMP}"
readonly LOG_FILE="/var/log/pgsql_backup.log"
readonly PYTHON_SCRIPT="/alert.py"
readonly SUMMARY_FILE="${CURRENT_BACKUP_PATH}/summary.json"
readonly PARALLEL_THREADS=${PARALLEL_THREADS:-4}
readonly BACKUP_TIMEOUT=${BACKUP_TIMEOUT:-28800}  # 单个数据库备份超时(秒),默认8小时
readonly DBS_RAW=${DBS:-""} # 从环境变量读取数据库连接信息,默认为空,"192.168.233.131:25432:postgres:postgres,192.168.233.132:25432:postgres:postgres"
readonly EXCLUDE_TABLES=${EXCLUDE_TABLES:-""}  # 新增:要排除的表(逗号分隔), "public.t_ds_task_instance,public.large_table1"

# 将逗号分隔转换为数组
IFS=',' read -r -a DBS <<< "$DBS_RAW"

# 日志函数
log() {
  local level=$1
  local message=$2
  echo "[$(date '+%Y-%m-%d %H:%M:%S')] [${level}] ${message}" | tee -a "$LOG_FILE"
}

# 获取所有数据库列表(不包含模板库)
get_databases() {
  local host=$1 port=$2 user=$3 pass=$4
  PGPASSWORD="$pass" psql -h "$host" -p "$port" -U "$user" -tAc "SELECT datname FROM pg_database WHERE datistemplate = false;" || {
    log "ERROR" "获取数据库列表失败: $?"
    return 1
  }
}

# 测试数据库连接
test_db_connection() {
  local host=$1 port=$2 user=$3 pass=$4 db=$5
  log "INFO" "测试数据库连接: ${db}@${host}:${port}"
  
  if ! PGPASSWORD="$pass" psql -h "$host" -p "$port" -U "$user" -d "$db" -tAc "SELECT 1;" >/dev/null 2>> "$LOG_FILE"; then
    log "ERROR" "数据库连接测试失败: ${db}@${host}:${port}"
    return 1
  fi
  
  log "INFO" "数据库连接测试成功"
  return 0
}

# 备份数据库(修改:排除表处理)
backup_database() {
  local host=$1 port=$2 user=$3 pass=$4 db=$5
  local backup_file="${CURRENT_BACKUP_PATH}/${db}_${TIMESTAMP}.dump"
  
  log "INFO" "开始备份数据库 '${db}'(${host}:${port})"
  log "INFO" "排除表配置: ${EXCLUDE_TABLES:-无}"

  # 测试数据库连接
  if ! test_db_connection "$host" "$port" "$user" "$pass" "$db"; then
    return 1
  fi

  # 设置TCP keepalive防止超时
  export PGOPTIONS="-c tcp_keepalives_idle=60 -c tcp_keepalives_interval=60 -c tcp_keepalives_count=10"

  # 构建排除参数
  local exclude_tables=""

  # 添加排除表
  if [[ -n "$EXCLUDE_TABLES" ]]; then
    exclude_tables="--exclude-table-data=${EXCLUDE_TABLES}"
    log "INFO" "排除参数: $exclude_tables"
  fi
  
  # 备份整个数据库(带超时控制)
  log "INFO" "执行备份命令: pg_dump -h $host -p $port -U $user -F c -f $backup_file $db $exclude_tables"
  
  # 捕获详细错误信息
  local error_file="${CURRENT_BACKUP_PATH}/${db}_error.log"
  if ! env PGPASSWORD="$pass" timeout $BACKUP_TIMEOUT pg_dump -h "$host" -p "$port" -U "$user" \
      -F c -f "$backup_file" "$db" \
      $exclude_tables 2> "$error_file"; then
      
    # 检查错误文件内容
    local error_content=$(<"$error_file")
    log "ERROR" "备份命令错误详情:\n${error_content}"
    
    # 检查是否为超时
    if [[ $? -eq 124 ]]; then
      log "ERROR" "数据库 '${db}' 备份超时(超过 $((BACKUP_TIMEOUT/3600)) 小时)"
    else
      log "ERROR" "数据库 '${db}' 备份失败!错误码: $?"
    fi
    
    # 清理临时错误文件
    rm -f "$error_file"
    return 1
  fi
  
  # 清理临时错误文件
  rm -f "$error_file"
  
  # 压缩备份文件
  log "INFO" "压缩文件: ${backup_file}"
  if command -v pigz &>/dev/null; then
    if ! pigz -f -p "$PARALLEL_THREADS" "$backup_file" 2>> "$LOG_FILE"; then
      log "ERROR" "压缩文件失败: ${backup_file}"
      return 1
    fi
  else
    if ! gzip -f "$backup_file" 2>> "$LOG_FILE"; then
      log "ERROR" "压缩文件失败: ${backup_file}"
      return 1
    fi
  fi

  log "INFO" "PostgreSQL 数据库 '${db}' 备份完成(已压缩)"
  return 0
}

# 检查磁盘空间
check_disk_space() {
  local required_space=$1  # 需要的最小空间(MB)
  local available_space=$(df -m "$BACKUP_DIR" | awk 'NR==2 {print $4}')
  
  if [[ $available_space -lt $required_space ]]; then
    log "ERROR" "磁盘空间不足! 需要: ${required_space}MB, 可用: ${available_space}MB"
    return 1
  fi
  
  log "INFO" "磁盘空间检查通过: 需要 ${required_space}MB, 可用 ${available_space}MB"
  return 0
}

# 主函数
main() {
  mkdir -p "$CURRENT_BACKUP_PATH" || { log "ERROR" "无法创建备份目录"; exit 1; }

  log "INFO" "===== 开始 PostgreSQL 数据库备份 ====="
  log "INFO" "备份目录:${CURRENT_BACKUP_PATH}"
  log "INFO" "备份超时: $((BACKUP_TIMEOUT/3600)) 小时"
  
  # 检查磁盘空间(假设每个数据库至少需要1GB)
  if ! check_disk_space 1024; then
    log "ERROR" "磁盘空间不足,备份终止"
    exit 1
  fi

  total_dbs=0
  failed_dbs=0

  for dbinfo in "${DBS[@]}"; do
    IFS=":" read -r host port user pass <<< "$dbinfo"

    databases=$(get_databases "$host" "$port" "$user" "$pass") || {
      log "ERROR" "连接 ${host}:${port} 获取数据库列表失败"
      failed_dbs=$((failed_dbs + 1))
      continue
    }

    for db in $databases; do
      total_dbs=$((total_dbs + 1))
      if ! backup_database "$host" "$port" "$user" "$pass" "$db"; then
        failed_dbs=$((failed_dbs + 1))
      fi
      sleep 10
    done
  done

  log "INFO" "清理 ${BACKUP_RETENTION_DAYS} 天前的旧备份..."
  find "$BACKUP_DIR" -mindepth 1 -maxdepth 1 -type d -mtime +$BACKUP_RETENTION_DAYS -exec rm -rf {} \;

  if [[ $total_dbs -eq 0 ]]; then
    status="fail"
    log "ERROR" "未成功识别到任何数据库,可能数据库连接失败"
  elif [[ $failed_dbs -eq 0 ]]; then
    status="success"
  elif [[ $failed_dbs -ge $total_dbs ]]; then
    status="fail"
  else
    status="partial"
  fi

  # 生成 summary.json 文件
  log "INFO" "生成 summary.json ..."
  python3 <<EOF
import os
import json
import sys

try:
    backup_path = "$CURRENT_BACKUP_PATH"
    summary_file = "$SUMMARY_FILE"
    
    if not os.path.isdir(backup_path):
        print(f"备份路径不存在: {backup_path}", file=sys.stderr)
        sys.exit(1)
        
    # 支持的扩展名列表
    valid_extensions = ('.sql.gz', '.dump.gz', '.bson.gz', '.partial.sql')
    
    summary = [
        {
            'file': f,
            'size': f'{round(os.path.getsize(os.path.join(backup_path, f)) / (1024 * 1024), 2)}MB',
            'path': os.path.join(backup_path, f),
            'type': 'MySQL' if f.endswith('.sql.gz') else
                    'PostgreSQL' if f.endswith('.dump.gz') else
                    'MongoDB' if f.endswith('.bson.gz') else
                    'LargeTable' if f.endswith('.partial.sql') else 'Unknown'
        }
        for f in os.listdir(backup_path) 
        if any(f.endswith(ext) for ext in valid_extensions)
    ]
    
    with open(summary_file, 'w') as f:
        json.dump(summary, f, indent=2)
except Exception as e:
    print(f"生成摘要失败: {e}", file=sys.stderr)
    sys.exit(1)
EOF

  [[ $? -ne 0 ]] && log "ERROR" "生成摘要失败" || log "INFO" "summary.json 生成完成"

  log "INFO" "尝试发送飞书通知(状态:$status)..."
  if python3 "$PYTHON_SCRIPT" "$SUMMARY_FILE" "$status"; then
    log "INFO" "飞书通知发送成功 ✅"
  else
    log "ERROR" "飞书通知发送失败 ❌"
  fi

  log "INFO" "备份任务完成(状态:$status)"
  exit 0
}

main

4. 飞书通知脚本

#!/usr/bin/env python3
import requests
import json
import sys
import datetime
import os

# Webhook 地址
WEBHOOK_URL = os.getenv("FEISHU_WEBHOOK_URL", "https://open.feishu.cn/open-apis/bot/v2/hook/")

# 参数解析
if len(sys.argv) < 3:
    print("用法: alert.py <summary_file> <status>")
    sys.exit(1)

summary_file = sys.argv[1]
status = sys.argv[2].lower()

# 状态样式映射
status_map = {
    "success": {"emoji": "✅", "color": "green", "title": "备份成功"},
    "partial": {"emoji": "⚠️", "color": "orange", "title": "备份部分失败"},
    "fail": {"emoji": "❌", "color": "red", "title": "备份失败"}
}
info = status_map.get(status, status_map["fail"])

# 当前时间(+8 时区)
now = datetime.datetime.utcnow() + datetime.timedelta(hours=8)
timestamp = now.strftime('%Y-%m-%d %H:%M:%S')

# 解析备份文件列表
backup_files = []
total_size = 0
backup_type = "数据库"  # 默认值
try:
    with open(summary_file, "r") as f:
        file_list = json.load(f)
        if file_list:
            # 从第一个文件获取备份类型
            backup_type = file_list[0].get("type", "数据库")
            
            for item in file_list:
                file_path = item.get("path", "")
                file_name = os.path.basename(file_path)
                size_str = item.get("size", "0B")
                
                # 转换大小到字节数(简化处理)
                size_value = float(size_str.replace("MB", "").replace("GB", "").replace("KB", ""))
                if "GB" in size_str:
                    size_bytes = size_value * 1024 * 1024 * 1024
                elif "MB" in size_str:
                    size_bytes = size_value * 1024 * 1024
                elif "KB" in size_str:
                    size_bytes = size_value * 1024
                else:  # 默认认为是字节
                    size_bytes = size_value
                
                total_size += size_bytes
                backup_files.append({
                    "name": file_name,
                    "size": size_str,
                    "path": file_path
                })
except Exception as e:
    backup_files = []

# 格式化总大小
def format_size(bytes):
    for unit in ['B', 'KB', 'MB', 'GB']:
        if bytes < 1024.0:
            return f"{bytes:.2f}{unit}"
        bytes /= 1024.0
    return f"{bytes:.2f}TB"

total_size_str = format_size(total_size)

# 构建飞书卡片
elements = [
    {
        "tag": "div",
        "text": {
            "tag": "lark_md",
            "content": f"**⏰ 备份时间**\n{timestamp}\n\n**🔄 备份状态**\n{info['emoji']} {status.upper()}\n\n**📦 备份总大小**\n{total_size_str}"
        }
    },
    {"tag": "hr"}
]

if backup_files:
    # 文件列表按大小降序排列
    backup_files.sort(key=lambda x: float(x["size"].replace("MB", "").replace("GB", "").replace("KB", "")), reverse=True)
    
    files_content = "**📃 备份文件清单**\n\n"
    for file in backup_files:
        files_content += f"- **{file['name']}** ({file['size']})\n"
        files_content += f"  {file['path']}\n\n"
    
    elements.append({
        "tag": "div",
        "text": {
            "tag": "lark_md",
            "content": files_content
        }
    })
else:
    elements.append({
        "tag": "div",
        "text": {
            "tag": "lark_md",
            "content": "**⚠️ 警告**\n未找到备份文件或解析失败"
        }
    })

# 添加分隔线
elements.append({"tag": "hr"})

# 添加备份主机和路径信息
elements.append({
    "tag": "div",
    "text": {
        "tag": "lark_md",
        "content": "**💻备份主机**\n服务器IP:11.0.1.20\n\n**📂备份路径**\n/var/openebs/local/"
    }
})

# 添加分隔线
elements.append({"tag": "hr"})

elements.append({
    "tag": "note",
    "elements": [
        {
            "tag": "plain_text",
            "content": "由自动备份系统发送"
        }
    ]
})

card = {
    "msg_type": "interactive",
    "card": {
        "header": {
            "title": {
                "tag": "plain_text",
                "content": f"{info['emoji']} {backup_type}备份{info['title']}"
            },
            "template": info["color"]
        },
        "elements": elements
    }
}

# 发送请求
resp = requests.post(
    WEBHOOK_URL,
    headers={"Content-Type": "application/json"},
    data=json.dumps(card)
)

# 响应判断
if resp.status_code != 200 or resp.json().get("code") != 0:
    print("飞书通知发送失败:", resp.text)
    sys.exit(1)

print("飞书通知发送成功 ✅")

5. 手动触发飞书消息通知

手动生成摘要

[root@mysql-131 docker-app]# export CURRENT_BACKUP_PATH="/var/openebs/local/mysql-backup/20250508_113002"
[root@mysql-131 docker-app]# export SUMMARY_FILE="/var/openebs/local/mysql-backup/20250508_113002/summary.json"
[root@mysql-131 docker-app]# cat summary.py 
import os
import json
import sys

try:
    backup_path = os.getenv("CURRENT_BACKUP_PATH")
    summary_file = os.getenv("SUMMARY_FILE")
    
    if not os.path.isdir(backup_path):
        print(f"备份路径不存在: {backup_path}", file=sys.stderr)
        sys.exit(1)
        
    # 支持的扩展名列表
    valid_extensions = ('.sql.gz', '.dump.gz', '.bson.gz')
    
    summary = []
    
    # 遍历备份目录
    for root, dirs, files in os.walk(backup_path):
        for file in files:
            if any(file.endswith(ext) for ext in valid_extensions):
                file_path = os.path.join(root, file)
                # 获取相对路径(相对于备份目录)
                rel_path = os.path.relpath(file_path, backup_path)
                summary.append({
                    'file': rel_path,  # 使用相对路径显示
                    'size': f'{round(os.path.getsize(file_path) / (1024 * 1024), 2)}MB',
                    'path': file_path,
                    'type': 'MySQL' if file.endswith('.sql.gz') else
                            'PostgreSQL' if file.endswith('.dump.gz') else
                            'MongoDB' if file.endswith('.bson.gz') else 'Unknown'
                })
    
    with open(summary_file, 'w') as f:
        json.dump(summary, f, indent=2)
except Exception as e:
    print(f"生成摘要失败: {e}", file=sys.stderr)
    sys.exit(1)
[root@mysql-131 docker-app]# python3 summary.py

触发通知

[root@mysql-131 mysql-backup]# export FEISHU_WEBHOOK_URL="https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxxxxxxxxxxx"
[root@mysql-131 mysql-backup]# python3 .alert.py /var/openebs/local/mysql-backup/20250508_113002/summary.json SUCCESS
飞书通知发送成功 ✅

四、启动方式

1. 制作启动镜像

[root@mysql-131 mysql-backup]# cat Dockerfile 
FROM centos:7

# 安装必要的依赖库
RUN rm -rf /etc/yum.repos.d/*.repo && \
    echo "172.16.246.171   centos7-yum.linuxtian.com" >> /etc/hosts && \
    curl -o /etc/yum.repos.d/CentOS-Base.repo http://centos7-yum.linuxtian.com/CentOS-Base.repo && \
    yum -y install epel-release &&\
    yum -y install \
    gzip \
    pigz \
    vim \
    wget \
    openssl \
    openssl-devel \
    zlib-devel \
    bzip2-devel \
    ncurses-devel \
    sqlite-devel \
    readline-devel \
    tk-devel \
    gdbm-devel \
    db4-devel \
    libpcap-devel \
    xz-devel \
    make \
    gcc

# 安装 mysql
RUN echo "172.16.246.171   centos7-yum.linuxtian.com" >> /etc/hosts && \
    rpm -Uvh https://dev.mysql.com/get/mysql80-community-release-el7-5.noarch.rpm && \
    yum -y install --nogpgcheck \
    mysql \
    mysql-server \
    mysql-community-client
    

# 安装 mongodb
RUN echo "172.16.246.171   centos7-yum.linuxtian.com" >> /etc/hosts && \
    echo "[mongodb-org-7.0]" >> /etc/yum.repos.d/mongodb-org-7.0.repo && \
    echo "name=MongoDB Repository" >> /etc/yum.repos.d/mongodb-org-7.0.repo && \
    echo "baseurl=https://repo.mongodb.org/yum/redhat/7/mongodb-org/7.0/x86_64/" >> /etc/yum.repos.d/mongodb-org-7.0.repo && \
    echo "gpgcheck=1" >> /etc/yum.repos.d/mongodb-org-7.0.repo && \
    echo "enabled=1" >> /etc/yum.repos.d/mongodb-org-7.0.repo && \
    echo "gpgkey=https://pgp.mongodb.com/server-7.0.asc" >> /etc/yum.repos.d/mongodb-org-7.0.repo && \
    yum install -y \
    mongodb-org-7.0.6 \
    mongodb-org-database-7.0.6 \
    mongodb-org-server-7.0.6 \
    mongodb-mongosh-7.0.6 \
    mongodb-org-mongos-7.0.6 \
    mongodb-org-tools-7.0.6


# 安装 postgresql
RUN echo "172.16.246.171   centos7-yum.linuxtian.com" >> /etc/hosts && \
    echo "[centos-sclo-rh]" >> /etc/yum.repos.d/CentOS-SCLo-scl-rh.repo && \
    echo "name=CentOS-7 - SCLo rh" >> /etc/yum.repos.d/CentOS-SCLo-scl-rh.repo && \
    echo "baseurl=http://mirrors.aliyun.com/centos/7/sclo/x86_64/rh/" >> /etc/yum.repos.d/CentOS-SCLo-scl-rh.repo && \
    echo "gpgcheck=0" >> /etc/yum.repos.d/CentOS-SCLo-scl-rh.repo && \
    echo "enabled=1" >> /etc/yum.repos.d/CentOS-SCLo-scl-rh.repo && \
    rpm -Uvh https://download.postgresql.org/pub/repos/yum/reporpms/EL-7-x86_64/pgdg-redhat-repo-latest.noarch.rpm && \
    yum -y install llvm-toolset-7 llvm-toolset-7-clang && \
    yum -y install \
        postgresql14 \
        postgresql14-server \
        postgresql14-contrib \
        postgresql14-devel

# 清理缓存
RUN yum clean all

# 编译安装 Python
RUN wget --directory-prefix=/usr/local/ https://www.python.org/ftp/python/3.6.0/Python-3.6.0.tgz && \
    tar xvf /usr/local/Python-3.6.0.tgz -C /usr/local/ && \
    cd /usr/local/Python-3.6.0 && \
    ./configure --prefix=/usr/local && \
    make -j8 && \
    make install && \
    rm -rf /usr/local/Python-3.6.0.tgz
    

# 确保 /usr/bin/python3 不冲突
RUN if [ -L /usr/bin/python3 ]; then \
      rm /usr/bin/python3; \
    fi

# 创建 Python 3 的软链接
RUN ln -s /usr/local/bin/python3.6 /usr/bin/python3

# 修改 yum 和 urlgrabber 的 Python 路径
RUN sed -i 's/python/python2/g' /usr/bin/yum \
    && sed -i 's/python/python2/g' /usr/libexec/urlgrabber-ext-down

# 修改 pip3 的安装源为国内的源
RUN mkdir -p /root/.pip && \
    echo "[global]" > /root/.pip/pip.conf && \
    echo "index-url = https://pypi.tuna.tsinghua.edu.cn/simple" >> /root/.pip/pip.conf && \
    echo "trusted-host = pypi.tuna.tsinghua.edu.cn" >> /root/.pip/pip.conf && \
    pip3 install requests

CMD ["echo", "Mysql, MongoDB and PostgreSQL clients installed!"]

2. 构建镜像

[root@mysql-131 mysql-backup]# docker build . -t registry.cn-hangzhou.aliyuncs.com/tianxiang_app/database-backup:v4

3. 计划任务启动

[root@mysql-131 docker-app]# crontab -l
0 11 * * * cd /data/docker-app/mysql-backup/ && docker-compose run --rm mysql-backup
0 15 * * * cd /data/docker-app/mongodb-backup/ && docker-compose run --rm mongodb-backup
0 17 * * * cd /data/docker-app/pgsql-backup/ && docker-compose run --rm pgsql-backup

4. 手动启动

以 mysql 备份举例

[root@mysql-131 mysql-backup]# ls -al
total 21752
drwxr-xr-x 2 root root      115 May  8 09:17 .
drwxr-xr-x 5 root root       68 May  7 18:13 ..
-rwxr-xr-x 1 root root     2534 May  7 11:29 .alert.py
-rw-r--r-- 1 root root      479 May  7 18:13 docker-compose.yml
-rw-r--r-- 1 root root     3444 May  7 17:57 Dockerfile
-rwxr-xr-x 1 root root     3808 May  7 14:12 .mysql-backup.sh
-rw-r--r-- 1 root root 22256403 May  7 10:05 Python-3.6.0.tgz
​
[root@mysql-131 mysql-backup]# cat docker-compose.yml 
services:
  mysql-backup:
    image: registry.cn-hangzhou.aliyuncs.com/tianxiang_app/database-backup:v4
    container_name: mysql-backup
    command: bash /mysql-backup.sh
    environment:
      PARALLEL_THREADS: 8  # 多线程压缩
      PYTHONIOENCODING: utf-8
      FEISHU_WEBHOOK_URL: "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxxxxxxxxx"
      DBS: "192.168.235.221:3306:root:123456,192.168.235.222:3306:root:123456"
    volumes:
      - /var/openebs/local/mysql-backup:/backup
      - /data/docker-app/mysql-backup/.mysql-backup.sh:/mysql-backup.sh
      - /data/docker-app/mysql-backup/.alert.py:/alert.py
      - /etc/localtime:/etc/localtime
      - /var/log/mysql_backup.log:/var/log/mysql_backup.log
    restart: "no"
​
[root@mysql-131 mysql-backup]# docker-compose run --rm mysql-backup

5. 查看日志进度

[root@mysql-131 mysql-backup]# tail -f /var/log/mysql_backup.log 
[2025-05-07 13:36:19] [INFO] 开始备份数据库 'polyapi'(192.168.233.131:11095)
[2025-05-07 13:36:19] [INFO] 已压缩备份文件:/backup/20250507_113059/polyapi_20250507_113059.sql.gz
[2025-05-07 13:37:19] [INFO] 开始备份数据库 'process'(192.168.233.131:11095)
[2025-05-07 13:37:21] [INFO] 已压缩备份文件:/backup/20250507_113059/process_20250507_113059.sql.gz
[2025-05-07 13:38:21] [INFO] 开始备份数据库 'agilebpm_5'(192.168.233.131:16795)
[2025-05-07 14:00:45] [INFO] 已压缩备份文件:/backup/20250507_113059/agilebpm_5_20250507_113059.sql.gz
[2025-05-07 14:01:45] [INFO] 清理 15 天前的旧备份...
[2025-05-07 14:01:45] [INFO] 尝试发送飞书通知(状态:success)...
[2025-05-07 14:01:46] [INFO] 备份任务全部完成 ✅

示例截图.png

五、k8s cron job 类型启动备份

1. configmap 配置

[root@k8s-master1 mysql-backup]# cat configmap.yaml
kind: ConfigMap
apiVersion: v1
metadata:
  name: backup-script
data:
  backup.sh: |-
    #!/bin/bash
    set -euo pipefail
    
    # 备份跟路径
    readonly BACKUP_DIR="/backup"
    # 保留备份文件15天
    readonly BACKUP_RETENTION_DAYS=15
    # 备份目录命名规范
    readonly TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
    # 备份文件绝对路径
    readonly CURRENT_BACKUP_PATH="${BACKUP_DIR}/${TIMESTAMP}"
    # 备份日志
    readonly LOG_FILE="/var/log/db_backup.log"
    # 告警通知py脚本
    readonly PYTHON_SCRIPT="/tmp/alert.py"
    # 备份文件列表信息
    readonly SUMMARY_FILE="${CURRENT_BACKUP_PATH}/summary.json"
    # 默认4线程压缩文件
    readonly PARALLEL_THREADS=${PARALLEL_THREADS:-4}
    # 从环境变量读取数据库连接信息,默认为空
    readonly DBS=${DBS:-""}

    # 将环境变量中的逗号分隔转换为数组
    IFS=',' read -ra DB_ARRAY <<< "$DBS"
    
    # MySQL 实例配置(兼容原有数组和环境变量方式)
    declare -a DBS=(
      "${DB_ARRAY[@]}"
      # 可以保留原有的硬编码配置作为备用
      # "192.168.100.220:3306:root:123456"
    )
    
    # 日志函数
    log() {
      local level=$1
      local message=$2
      echo "[$(date '+%Y-%m-%d %H:%M:%S')] [${level}] ${message}" | tee -a "$LOG_FILE"
    }
    
    # 使用Python获取数据库列表
    get_databases() {
      local host=$1 port=$2 user=$3 pass=$4
    
      python3 <<EOF
    import subprocess
    import sys
    
    try:
        cmd = [
            "mysql",
            "--host=$host",
            "--port=$port",
            "--user=$user",
            "--password=$pass",
            "-e", "SHOW DATABASES;"
        ]
        result = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
    
        for line in result.stdout.splitlines():
            db = line.strip()
            if db and db not in ["information_schema", "performance_schema", "mysql", "sys", "Database"]:
                print(db)
    except subprocess.CalledProcessError as e:
        print(f"ERROR: {e.stderr}", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"ERROR: {str(e)}", file=sys.stderr)
        sys.exit(1)
    EOF
    }
    
    # 备份数据库并多线程压缩文件
    backup_database() {
      local host=$1 port=$2 user=$3 pass=$4 db=$5
      local backup_file="${CURRENT_BACKUP_PATH}/${db}_${TIMESTAMP}.sql"
    
      log "INFO" "开始备份数据库 '${db}'(${host}:${port})"
    
      if ! mysqldump --host="$host" --port="$port" --user="$user" --password="$pass" \
        --set-gtid-purged=OFF --single-transaction --routines --triggers "$db" > "$backup_file"; then
        log "ERROR" "数据库 '${db}' 备份失败!"
        return 1
      fi
    
      echo "$backup_file" | xargs -P "$PARALLEL_THREADS" -n 1 -I{} bash -c '
        echo "压缩: {}.gz"
        if command -v pigz &>/dev/null; then
          pigz -f -p '"$PARALLEL_THREADS"' "{}"
        else
          gzip -f "{}"
        fi
      '
    
      log "INFO" "MySQL 数据库 '${db}' 备份完成(已压缩)"
      return 0
    }
    
    # 主函数
    main() {
      mkdir -p "$CURRENT_BACKUP_PATH" || { log "ERROR" "无法创建备份目录"; exit 1; }
    
      log "INFO" "===== 开始 MySQL 数据库备份 ====="
      log "INFO" "备份目录:${CURRENT_BACKUP_PATH}"
    
      total_dbs=0
      failed_dbs=0
    
      for dbinfo in "${DBS[@]}"; do

        [ -z "$dbinfo" ] && continue  # 跳过空项

        IFS=":" read -r host port user pass <<< "$dbinfo"
    
        log "INFO" "正在获取 ${host}:${port} 的数据库列表..."
        
        # 获取数据库列表
        databases=$(get_databases "$host" "$port" "$user" "$pass") || {
          log "ERROR" "连接 ${host}:${port} 获取数据库列表失败"
          failed_dbs=$((failed_dbs + 1))
          continue
        }
    
        for db in $databases; do
          total_dbs=$((total_dbs + 1))
          if ! backup_database "$host" "$port" "$user" "$pass" "$db"; then
            failed_dbs=$((failed_dbs + 1))
          fi
          #sleep 60
        done
      done
    
      log "INFO" "清理 ${BACKUP_RETENTION_DAYS} 天前的旧备份..."
      find "$BACKUP_DIR" -mindepth 1 -maxdepth 1 -type d -mtime +$BACKUP_RETENTION_DAYS -exec rm -rf {} \;
    
      # 状态判断
      if [[ $total_dbs -eq 0 ]]; then
        status="fail"
        log "ERROR" "未成功识别到任何数据库,可能数据库连接全失败"
      elif [[ $failed_dbs -eq 0 ]]; then
        status="success"
      elif [[ $failed_dbs -ge $total_dbs ]]; then
        status="fail"
      else
        status="partial"
      fi
    
      # 生成 summary.json 文件,供 alert.py 脚本调用
      log "INFO" "生成 summary.json ..."
      python3 <<EOF
    import os
    import json
    import sys
    
    try:
        backup_path = "$CURRENT_BACKUP_PATH"
        summary_file = "$SUMMARY_FILE"
        
        if not os.path.isdir(backup_path):
            print(f"备份路径不存在: {backup_path}", file=sys.stderr)
            sys.exit(1)
            
        # 支持的扩展名列表
        valid_extensions = ('.sql.gz', '.dump.gz', '.bson.gz')
        
        summary = [
            {
                'file': f,
                'size': f'{round(os.path.getsize(os.path.join(backup_path, f)) / (1024 * 1024), 2)}MB',
                'path': os.path.join(backup_path, f),
                'type': 'MySQL' if f.endswith('.sql.gz') else
                        'PostgreSQL' if f.endswith('.dump.gz') else
                        'MongoDB' if f.endswith('.bson.gz') else 'Unknown'
            }
            for f in os.listdir(backup_path) 
            if any(f.endswith(ext) for ext in valid_extensions)
        ]
        
        with open(summary_file, 'w') as f:
            json.dump(summary, f, indent=2)
    except Exception as e:
        print(f"生成摘要失败: {e}", file=sys.stderr)
        sys.exit(1)
    EOF
    
      [[ $? -ne 0 ]] && log "ERROR" "生成摘要失败" || log "INFO" "summary.json 生成完成"
    
      log "INFO" "尝试发送飞书通知(状态:$status)..."
      if python3 "$PYTHON_SCRIPT" "$SUMMARY_FILE" "$status"; then
        log "INFO" "飞书通知发送成功 ✅"
      else
        log "ERROR" "飞书通知发送失败 ❌"
      fi
    
      log "INFO" "备份任务全部完成 ✅"
      exit 0
    }
    
    main

  alert.py: |-
    #!/usr/bin/env python3
    import requests
    import json
    import sys
    import datetime
    import os
    
    # Webhook 地址
    WEBHOOK_URL = os.getenv("FEISHU_WEBHOOK_URL", "https://open.feishu.cn/open-apis/bot/v2/hook/")
    
    # 参数解析
    if len(sys.argv) < 3:
        print("用法: alert.py <summary_file> <status>")
        sys.exit(1)
    
    summary_file = sys.argv[1]
    status = sys.argv[2].lower()
    
    # 状态样式映射
    status_map = {
        "success": {"emoji": "✅", "color": "green", "title": "备份成功"},
        "partial": {"emoji": "⚠️", "color": "orange", "title": "备份部分失败"},
        "fail": {"emoji": "❌", "color": "red", "title": "备份失败"}
    }
    info = status_map.get(status, status_map["fail"])
    
    # 当前时间(+8 时区)
    now = datetime.datetime.utcnow() + datetime.timedelta(hours=8)
    timestamp = now.strftime('%Y-%m-%d %H:%M:%S')
    
    # 解析备份文件列表
    backup_files = []
    total_size = 0
    backup_type = "数据库"  # 默认值
    try:
        with open(summary_file, "r") as f:
            file_list = json.load(f)
            if file_list:
                # 从第一个文件获取备份类型
                backup_type = file_list[0].get("type", "数据库")
                
                for item in file_list:
                    file_path = item.get("path", "")
                    file_name = os.path.basename(file_path)
                    size_str = item.get("size", "0B")
                    
                    # 转换大小到字节数(简化处理)
                    size_value = float(size_str.replace("MB", "").replace("GB", "").replace("KB", ""))
                    if "GB" in size_str:
                        size_bytes = size_value * 1024 * 1024 * 1024
                    elif "MB" in size_str:
                        size_bytes = size_value * 1024 * 1024
                    elif "KB" in size_str:
                        size_bytes = size_value * 1024
                    else:  # 默认认为是字节
                        size_bytes = size_value
                    
                    total_size += size_bytes
                    backup_files.append({
                        "name": file_name,
                        "size": size_str,
                        "path": file_path
                    })
    except Exception as e:
        backup_files = []
    
    # 格式化总大小
    def format_size(bytes):
        for unit in ['B', 'KB', 'MB', 'GB']:
            if bytes < 1024.0:
                return f"{bytes:.2f}{unit}"
            bytes /= 1024.0
        return f"{bytes:.2f}TB"
    
    total_size_str = format_size(total_size)
    
    # 构建飞书卡片
    elements = [
        {
            "tag": "div",
            "text": {
                "tag": "lark_md",
                "content": f"**⏰ 备份时间**\n{timestamp}\n\n**🔄 备份状态**\n{info['emoji']} {status.upper()}\n\n**📦 备份总大小**\n{total_size_str}"
            }
        },
        {"tag": "hr"}
    ]
    
    if backup_files:
        # 文件列表按大小降序排列
        backup_files.sort(key=lambda x: float(x["size"].replace("MB", "").replace("GB", "").replace("KB", "")), reverse=True)
        
        files_content = "**📃 备份文件清单**\n\n"
        for file in backup_files:
            files_content += f"- **{file['name']}** ({file['size']})\n"
            files_content += f"  {file['path']}\n\n"
        
        elements.append({
            "tag": "div",
            "text": {
                "tag": "lark_md",
                "content": files_content
            }
        })
    else:
        elements.append({
            "tag": "div",
            "text": {
                "tag": "lark_md",
                "content": "**⚠️ 警告**\n未找到备份文件或解析失败"
            }
        })
    
    # 添加分隔线
    elements.append({"tag": "hr"})
    
    # 添加备份主机和路径信息
    elements.append({
        "tag": "div",
        "text": {
            "tag": "lark_md",
            "content": "**💻生产备份主机**\n服务器IP:192.168.100.220\n\n**备份主机名称**\nk8s-node7-storage\n\n**📂备份路径**\n/data/db-databases-backup/mysql-mgr-cluster/"
        }
    })
    
    # 添加分隔线
    elements.append({"tag": "hr"})
    
    elements.append({
        "tag": "note",
        "elements": [
            {
                "tag": "plain_text",
                "content": "由自动备份系统发送"
            }
        ]
    })
    
    card = {
        "msg_type": "interactive",
        "card": {
            "header": {
                "title": {
                    "tag": "plain_text",
                    "content": f"{info['emoji']} {backup_type}备份{info['title']}"
                },
                "template": info["color"]
            },
            "elements": elements
        }
    }
    
    # 发送请求
    resp = requests.post(
        WEBHOOK_URL,
        headers={"Content-Type": "application/json"},
        data=json.dumps(card)
    )
    
    # 响应判断
    if resp.status_code != 200 or resp.json().get("code") != 0:
        print("飞书通知发送失败:", resp.text)
        sys.exit(1)
    
    print("飞书通知发送成功 ✅")

2. cronjob 启动

[root@k8s-master1 mysql-backup]# cat cronjob.yaml 
apiVersion: batch/v1
kind: CronJob
metadata:
  name: mysql-backup
spec:
  # 时间表: 根据您所需的备份频率进行调整
  # 例如, 0 2***将在每天凌晨2点运行
  schedule: "0 2 * * *"
  jobTemplate:
    spec:
      template:
        spec:
          nodeName: k8s-node7-storage      # 表示容器在这个机器上运行,也就是在这个机器上进行备份
          containers:
          - name: mysql-backup
            image: registry.cn-hangzhou.aliyuncs.com/tianxiang_app/database-backup:v4
            command: ["bash", "/tmp/backup.sh"]
            env:
            - name: PARALLEL_THREADS
              value: "8"
            - name: PYTHONIOENCODING
              value: "utf-8"
            - name: FEISHU_WEBHOOK_URL
              value: "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxxx-xxxxxxxx-xxxxxxxxxx"
            - name: DBS
              value: "192.168.100.214:3306:root:123456"   # 表示要备份的数据库地址用户名信息,以 ',' 逗号分隔可以写多个
            volumeMounts:
            - name: backup-storage
              mountPath: /backup
            - name: backup-script
              mountPath: /tmp/
            - name: localtime
              mountPath: /etc/localtime
              readOnly: true
            - name: log-file
              mountPath: /var/log/db_backup.log
          volumes:
          - name: backup-storage
            hostPath:
              path: /data/db-databases-backup/mysql-mgr-cluster
              type: DirectoryOrCreate
          - name: backup-script
            configMap:
              name: backup-script
              defaultMode: 0744
          - name: localtime
            hostPath:
              path: /etc/localtime
          - name: log-file
            hostPath:
              path: /var/log/db_backup.log
              type: FileOrCreate
          restartPolicy: Never

3. 提交资源配置

[root@k8s-master1 mysql-backup]# kubectl apply -f .
configmap/backup-script created
cronjob.batch/mysql-backup created

4. 首次触发启动

[root@k8s-master1 mysql-backup]# kubectl create job --from=cronjob/mysql-backup mysql-backup-manual-1
job.batch/mysql-backup-manual-1 created

5. 查看日志

[root@k8s-master1 mysql-backup]# kubectl logs -f mysql-backup-manual-1-2khx9 
[2025-06-05 14:46:01] [INFO] ===== 开始 MySQL 数据库备份 =====
[2025-06-05 14:46:01] [INFO] 备份目录:/backup/20250605_144601
[2025-06-05 14:46:01] [INFO] 正在获取 192.168.100.214:3306 的数据库列表...
[2025-06-05 14:46:01] [INFO] 开始备份数据库 'halo'(192.168.100.214:3306)

六、恢复脚本

1. 数据库恢复脚本

1.1 恢复脚本

#!/bin/bash
set -euo pipefail

# 默认配置
readonly DEFAULT_BACKUP_DIR="/backup"
readonly DEFAULT_MYSQL_USER="mysql_user"
readonly DEFAULT_MYSQL_PASS="mysql_pass"
readonly DEFAULT_PG_USER="pg_user"
readonly DEFAULT_PG_PASS="pg_pass"
readonly DEFAULT_MONGO_USER="mongo_user"
readonly DEFAULT_MONGO_PASS="mongo_pass"
readonly DEFAULT_DB_HOST="localhost"
readonly LOG_FILE="/var/log/db_restore.log"
readonly PARALLEL_THREADS=${PARALLEL_THREADS:-16}
readonly NOTIFICATION="/data/db_restore_notification.py"

# 临时目录
readonly TMP_DIR="/tmp/db_restore"
mkdir -p "$TMP_DIR"

# 全局变量用于记录恢复结果
declare -A RESTORE_SUCCESS
declare -A RESTORE_FAILURE

# 日志函数
log() {
    local level=$1
    local message=$2
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] [${level}] ${message}" >> "$LOG_FILE"
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] [${level}] ${message}"
}

# 显示用法
usage() {
    echo "集成数据库恢复脚本 (支持MySQL/PostgreSQL/MongoDB)"
    echo "用法: $0 -t 数据库类型 [-d 数据库名] [-D 备份目录] [-H 主机] [-P 端口] [-u 用户] [-p 密码]"
    exit 1
}

# 清理临时文件
cleanup() {
    if [ -f "${TMP_FILE:-}" ]; then
        rm -f "$TMP_FILE"
        log "INFO" "已清理临时文件: $TMP_FILE"
    fi
}

# 从文件名提取数据库名
extract_db_name() {
    local filename=$1
    local db_type=$2
    local basename=$(basename "$filename")
    basename=${basename%.gz}
    basename=${basename%.sql}
    basename=${basename%.dump}
    basename=${basename%.bson}
    basename=${basename%.archive}

    case "$db_type" in
        mysql|postgresql)
            echo "$basename" | rev | cut -d'_' -f3- | rev
            ;;
        mongodb)
            if [[ "$basename" == *.*.* ]]; then
                echo "$basename" | cut -d'.' -f1
            else
                echo "$basename" | rev | cut -d'_' -f3- | rev
            fi
            ;;
        *)
            echo "$basename"
            ;;
    esac
}

# 显示恢复结果汇总
show_restore_summary() {
    echo -e "\n\n=== 数据库恢复结果汇总 ===" | tee -a "$LOG_FILE"
    
    if [ ${#RESTORE_SUCCESS[@]} -gt 0 ]; then
        echo -e "\n✅ 成功恢复的数据库:" | tee -a "$LOG_FILE"
        for db in "${!RESTORE_SUCCESS[@]}"; do
            echo "  - $db" | tee -a "$LOG_FILE"
        done
    else
        echo -e "\nℹ️ 没有成功恢复的数据库" | tee -a "$LOG_FILE"
    fi
    
    if [ ${#RESTORE_FAILURE[@]} -gt 0 ]; then
        echo -e "\n❌ 恢复失败的数据库:" | tee -a "$LOG_FILE"
        for db in "${!RESTORE_FAILURE[@]}"; do
            echo "  - $db (原因: ${RESTORE_FAILURE[$db]})" | tee -a "$LOG_FILE"
        done
    else
        echo -e "\nℹ️ 没有恢复失败的数据库" | tee -a "$LOG_FILE"
    fi
    
    echo -e "\n恢复日志已保存到: $LOG_FILE" | tee -a "$LOG_FILE"
    echo "=== 汇总结束 ===" | tee -a "$LOG_FILE"
}

# 发送飞书通知
send_feishu_notification() {
    local status
    if [ ${#RESTORE_FAILURE[@]} -eq 0 ]; then
        status="success"
    elif [ ${#RESTORE_SUCCESS[@]} -eq 0 ]; then
        status="fail"
    else
        status="partial_success"
    fi

    # 构造 JSON
    local success_json="{"
    for key in "${!RESTORE_SUCCESS[@]}"; do
        value=${RESTORE_SUCCESS[$key]}
        success_json+="\"$key\":\"$value\","
    done
    success_json="${success_json%,}}"

    local failed_json="{"
    for key in "${!RESTORE_FAILURE[@]}"; do
        value=${RESTORE_FAILURE[$key]}
        failed_json+="\"$key\":\"$value\","
    done
    failed_json="${failed_json%,}}"

    # 调用 Python 脚本发送通知
    python3 "$NOTIFICATION" \
        "$status" \
        "$success_json" \
        "$failed_json" \
        "$LOG_FILE"
}

restore_mysql() {
    local db_name=$1 backup_file=$2 db_user=$3 db_pass=$4 db_host=$5 db_port=$6
    log "INFO" "开始MySQL恢复: 数据库=${db_name} 备份=${backup_file}"
    
    if ! mysql -h"$db_host" -P"$db_port" -u"$db_user" -p"$db_pass" -e "DROP DATABASE IF EXISTS \`$db_name\`; CREATE DATABASE \`$db_name\`;" 2>> "$LOG_FILE"; then
        RESTORE_FAILURE["$db_name"]="创建数据库失败"
        log "ERROR" "MySQL数据库创建失败: $db_name"
        return 1
    fi
    
    if [[ "$backup_file" == *.gz ]]; then
        if command -v pigz &>/dev/null; then
           if ! pigz -dc -p "$PARALLEL_THREADS" "$backup_file" | MYSQL_PWD="$db_pass" mysql -h"$db_host" -P"$db_port" -u"$db_user" "$db_name" 2>> "$LOG_FILE"; then
               RESTORE_FAILURE["$db_name"]="解压并导入失败"
               log "ERROR" "MySQL数据库恢复失败: $db_name"
               return 1
           fi
        else
           if ! gzip -dc "$backup_file" | MYSQL_PWD="$db_pass" mysql -h"$db_host" -P"$db_port" -u"$db_user" "$db_name" 2>> "$LOG_FILE"; then
               RESTORE_FAILURE["$db_name"]="解压并导入失败"
               log "ERROR" "MySQL数据库恢复失败: $db_name"
               return 1
           fi
        fi
    else
        if ! MYSQL_PWD="$db_pass" mysql -h"$db_host" -P"$db_port" -u"$db_user" "$db_name" < "$backup_file" 2>> "$LOG_FILE"; then
            RESTORE_FAILURE["$db_name"]="导入失败"
            log "ERROR" "MySQL数据库恢复失败: $db_name"
            return 1
        fi
    fi
    
    RESTORE_SUCCESS["$db_name"]=1
    log "INFO" "MySQL数据库恢复成功: $db_name"
    return 0
}

restore_postgresql() {
    local db_name=$1 backup_file=$2 db_user=$3 db_pass=$4 db_host=$5 db_port=$6
    log "INFO" "开始PostgreSQL恢复: 数据库=${db_name} 备份=${backup_file}"
    export PGPASSWORD="$db_pass"
    local restore_file="$backup_file"
    
    if [[ "$backup_file" == *.gz ]]; then
        TMP_FILE="${TMP_DIR}/$(basename "${backup_file%.*}")"
        if command -v pigz &>/dev/null; then
            if ! pigz -dc -p "$PARALLEL_THREADS" "$backup_file" > "$TMP_FILE" 2>> "$LOG_FILE"; then
                RESTORE_FAILURE["$db_name"]="解压失败"
                log "ERROR" "PostgreSQL备份解压失败: $backup_file"
                return 1
            fi
        else
            if ! gunzip -c "$backup_file" > "$TMP_FILE" 2>> "$LOG_FILE"; then
                RESTORE_FAILURE["$db_name"]="解压失败"
                log "ERROR" "PostgreSQL备份解压失败: $backup_file"
                return 1
            fi
        fi
        restore_file="$TMP_FILE"
    fi
    
    if ! createdb -h "$db_host" -p "$db_port" -U "$db_user" "$db_name" 2>> "$LOG_FILE"; then
        RESTORE_FAILURE["$db_name"]="创建数据库失败"
        log "ERROR" "PostgreSQL数据库创建失败: $db_name"
        return 1
    fi
    
    if ! pg_restore -h "$db_host" -p "$db_port" -U "$db_user" -d "$db_name" "$restore_file" 2>> "$LOG_FILE"; then
        RESTORE_FAILURE["$db_name"]="恢复失败"
        log "ERROR" "PostgreSQL数据库恢复失败: $db_name"
        return 1
    fi
    
    RESTORE_SUCCESS["$db_name"]=1
    log "INFO" "PostgreSQL数据库恢复成功: $db_name"
    return 0
}

restore_mongodb() {
    local db_name=$1 db_dir=$2 db_user=$3 db_pass=$4 db_host=$5 db_port=$6
    log "INFO" "开始 MongoDB 恢复,目标数据库: $db_name,目录: $db_dir"
    
    local has_failure=0
    local collection_count=0
    local success_count=0
    
    # 获取所有集合文件
    local bson_files=($(find "$db_dir" -maxdepth 1 -type f -name "*.bson.gz"))
    if [ ${#bson_files[@]} -eq 0 ]; then
        RESTORE_FAILURE["$db_name"]="没有找到.bson.gz文件"
        log "ERROR" "在目录 $db_dir 中没有找到任何 .bson.gz 文件"
        return 1
    fi
    
    for bson_file in "${bson_files[@]}"; do
        local collection=$(basename "$bson_file" .bson.gz)
        ((collection_count++))
        
        if ! mongorestore \
            --host "$db_host" \
            --port "$db_port" \
            --username "$db_user" \
            --password "$db_pass" \
            --authenticationDatabase "admin" \
            --db "$db_name" \
            --collection "$collection" \
            --drop \
            --gzip \
            "$bson_file" >> "$LOG_FILE" 2>&1; then
            log "ERROR" "MongoDB集合恢复失败: $db_name.$collection"
            has_failure=1
        else
            ((success_count++))
            log "INFO" "MongoDB集合恢复成功: $db_name.$collection"
        fi
    done
    
    if [ $has_failure -eq 0 ]; then
        RESTORE_SUCCESS["$db_name"]="$success_count/$collection_count"
        log "INFO" "✅ 数据库 $db_name 恢复完成 (成功恢复 $success_count 个集合)"
    else
        RESTORE_FAILURE["$db_name"]="部分集合恢复失败 (成功 $success_count/$collection_count)"
        log "WARNING" "⚠️ 数据库 $db_name 部分恢复完成 (成功 $success_count/$collection_count)"
    fi
    
    # 显示集合列表
    log "INFO" "数据库 $db_name 中的集合列表:"
    if ! mongosh --host "$db_host" --port "$db_port" -u "$db_user" -p "$db_pass" \
        --authenticationDatabase "admin" --quiet --eval "db.getMongo().getDB('$db_name').getCollectionNames()" | tee -a "$LOG_FILE"; then
        log "ERROR" "无法获取 $db_name 的集合列表"
    fi
    
    return $has_failure
}

main() {
    local db_type="" db_name="" backup_dir=""
    local db_host="$DEFAULT_DB_HOST" db_port="" db_user="" db_pass=""

    while getopts "t:d:D:H:P:u:p:h" opt; do
        case $opt in
            t) db_type="$OPTARG" ;;
            d) db_name="$OPTARG" ;;
            D) backup_dir="$OPTARG" ;;
            H) db_host="$OPTARG" ;;
            P) db_port="$OPTARG" ;;
            u) db_user="$OPTARG" ;;
            p) db_pass="$OPTARG" ;;
            h) usage ;;
            *) usage ;;
        esac
    done

    [[ -z "$db_type" ]] && usage

    case "$db_type" in
        mysql)
            db_port="${db_port:-3306}"
            db_user="${db_user:-$DEFAULT_MYSQL_USER}"
            db_pass="${db_pass:-$DEFAULT_MYSQL_PASS}"
            backup_dir="${backup_dir:-${DEFAULT_BACKUP_DIR}/mysql}"
            backup_pattern="*.sql.gz"
            ;;
        postgresql|postgres)
            db_type="postgresql"
            db_port="${db_port:-5432}"
            db_user="${db_user:-$DEFAULT_PG_USER}"
            db_pass="${db_pass:-$DEFAULT_PG_PASS}"
            backup_dir="${backup_dir:-${DEFAULT_BACKUP_DIR}/postgresql}"
            backup_pattern="*.dump.gz"
            ;;
        mongodb|mongo)
            db_type="mongodb"
            db_port="${db_port:-27017}"
            db_user="${db_user:-$DEFAULT_MONGO_USER}"
            db_pass="${db_pass:-$DEFAULT_MONGO_PASS}"
            backup_dir="${backup_dir:-${DEFAULT_BACKUP_DIR}/mongodb}"
            ;;
        *) log "ERROR" "不支持的数据库类型: $db_type"; usage ;;
    esac

    [[ ! -d "$backup_dir" ]] && log "ERROR" "备份目录不存在: $backup_dir" && exit 1

    # 清空之前的恢复结果
    RESTORE_SUCCESS=()
    RESTORE_FAILURE=()

    if [[ "$db_type" == "mongodb" ]]; then
        log "INFO" "在 $backup_dir 中查找 MongoDB 备份子目录..."
        shopt -s nullglob
        db_dirs=("$backup_dir"/*/)
        shopt -u nullglob
        [[ ${#db_dirs[@]} -eq 0 ]] && log "ERROR" "在 $backup_dir 中找不到任何 MongoDB 数据库目录" && exit 1
        
        for db_subdir in "${db_dirs[@]}"; do
            local current_db_name="$db_name"
            [[ -z "$current_db_name" ]] && current_db_name=$(basename "$db_subdir")
            log "INFO" "处理 MongoDB 数据库目录: $db_subdir (数据库名: $current_db_name)"
            restore_mongodb "$current_db_name" "$db_subdir" "$db_user" "$db_pass" "$db_host" "$db_port" || true
        done
    else
        backup_files=($(ls -t "$backup_dir"/$backup_pattern 2>/dev/null || true))
        [[ ${#backup_files[@]} -eq 0 ]] && log "ERROR" "在 $backup_dir 中找不到 $backup_pattern 备份文件" && exit 1
        
        for backup_file in "${backup_files[@]}"; do
            local current_db_name="$db_name"
            [[ -z "$current_db_name" ]] && current_db_name=$(extract_db_name "$backup_file" "$db_type")
            [[ -z "$current_db_name" ]] && log "WARNING" "无法从文件名提取数据库名,跳过: $backup_file" && continue
            
            log "INFO" "处理备份文件: $backup_file (数据库: $current_db_name)"
            case "$db_type" in
                mysql) restore_mysql "$current_db_name" "$backup_file" "$db_user" "$db_pass" "$db_host" "$db_port" || true ;;
                postgresql) restore_postgresql "$current_db_name" "$backup_file" "$db_user" "$db_pass" "$db_host" "$db_port" || true ;;
            esac
        done
    fi

    # 清理临时文件
    cleanup

    # 显示结果汇总日志
    show_restore_summary

    # 发送飞书webhook
    send_feishu_notification
    
    # 如果有失败的恢复,返回非零状态码
    if [ ${#RESTORE_FAILURE[@]} -gt 0 ]; then
        exit 1
    fi
}

trap cleanup EXIT
main "$@"

1.2 webhook 脚本

#!/usr/bin/env python3
import requests
import json
import sys
import datetime
import os
import io

sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
# Webhook 地址
WEBHOOK_URL = os.getenv("FEISHU_WEBHOOK_URL", "https://open.feishu.cn/open-apis/bot/v2/hook/")

def send_feishu_notification(status, success_dbs=None, failed_dbs=None, log_file=""):
    """
    发送飞书通知
    
    参数:
    status: 状态 (success/partial/fail)
    success_dbs: 成功恢复的数据库字典 {db_name: details}
    failed_dbs: 失败恢复的数据库字典 {db_name: reason}
    log_file: 日志文件路径
    """
    # 状态样式映射
    status_map = {
        "success": {"emoji": "✅", "color": "green", "title": "恢复成功"},
        "partial": {"emoji": "⚠️", "color": "orange", "title": "恢复部分失败"},
        "partial_success": {"emoji": "⚠️", "color": "orange", "title": "恢复部分成功"},
        "fail": {"emoji": "❌", "color": "red", "title": "恢复失败"}
    }
    
    info = status_map.get(status.lower(), status_map["fail"])
    
    # 当前时间(+8 时区)
    now = datetime.datetime.utcnow() + datetime.timedelta(hours=8)
    timestamp = now.strftime('%Y-%m-%d %H:%M:%S')

    # 构建飞书卡片
    elements = [
        {
            "tag": "div",
            "text": {
                "tag": "lark_md",
                "content": f"**⏰ 恢复时间**\n{timestamp}\n\n**🔄 恢复状态**\n{info['emoji']} {info['title']}"
            }
        },
        {"tag": "hr"}
    ]

    # 添加成功数据库
    if success_dbs:
        success_content = "**✅ 成功恢复的数据库**\n\n"
        for db, details in success_dbs.items():
            success_content += f"- {db}"
            if details != "1":  # 如果不是简单的成功标记(1),显示详细信息
                success_content += f" ({details})"
            success_content += "\n"
        
        elements.append({
            "tag": "div",
            "text": {
                "tag": "lark_md",
                "content": success_content
            }
        })

    # 添加失败数据库
    if failed_dbs:
        failed_content = "**❌ 恢复失败的数据库**\n\n"
        for db, reason in failed_dbs.items():
            failed_content += f"- {db}: {reason}\n"
        
        elements.append({
            "tag": "div",
            "text": {
                "tag": "lark_md",
                "content": failed_content
            }
        })

    # 添加日志文件信息
    if log_file:
        elements.append({"tag": "hr"})
        elements.append({
            "tag": "div",
            "text": {
                "tag": "lark_md",
                "content": f"**📝 恢复日志**\n{log_file}"
            }
        })

    # 添加主机信息
    elements.append({"tag": "hr"})
    elements.append({
        "tag": "div",
        "text": {
            "tag": "lark_md",
            "content": "**💻 数据库主机**\n服务器IP:11.0.1.20:3306"
        }
    })

    elements.append({
        "tag": "note",
        "elements": [
            {
                "tag": "plain_text",
                "content": "由数据库恢复系统发送"
            }
        ]
    })

    card = {
        "msg_type": "interactive",
        "card": {
            "header": {
                "title": {
                    "tag": "plain_text",
                    "content": f"{info['emoji']} 数据库{info['title']}"
                },
                "template": info["color"]
            },
            "elements": elements
        }
    }

    # 发送请求
    resp = requests.post(
        WEBHOOK_URL,
        headers={"Content-Type": "application/json"},
        data=json.dumps(card)
    )
    return resp

if __name__ == "__main__":
    # 简单测试模式
    if len(sys.argv) == 2 and sys.argv[1] == "SUCCESS":
        # 测试成功通知
        resp = send_feishu_notification(
            status="success",
            success_dbs={"db1": "10 collections", "db2": "1"},
            failed_dbs={},
            log_file="/var/log/db_restore.log"
        )
    elif len(sys.argv) == 2 and sys.argv[1] == "FAIL":
        # 测试失败通知
        resp = send_feishu_notification(
            status="fail",
            success_dbs={},
            failed_dbs={"db3": "连接超时", "db4": "权限不足"},
            log_file="/var/log/db_restore.log"
        )
    elif len(sys.argv) >= 4:
        # 正常调用模式
        status = sys.argv[1]
        success_dbs = json.loads(sys.argv[2]) if len(sys.argv) > 2 else {}
        failed_dbs = json.loads(sys.argv[3]) if len(sys.argv) > 3 else {}
        log_file = sys.argv[4] if len(sys.argv) > 4 else ""
        
        resp = send_feishu_notification(status, success_dbs, failed_dbs, log_file)
    else:
        print("用法:")
        print("测试成功通知: db_restore_notification.py SUCCESS")
        print("测试失败通知: db_restore_notification.py FAIL")
        print("正常使用: db_restore_notification.py <status> <success_json> <failed_json> <log_file>")
        sys.exit(1)
    
    # 响应判断
    if resp.status_code != 200 or resp.json().get("code") != 0:
        print("飞书通知发送失败:", resp.text)
        sys.exit(1)
    
    print("飞书通知发送成功 ✅")

2. 脚本功能说明

2.1 多数据库支持**:

  • MySQL (.sql, .sql.gz)

  • PostgreSQL (.dump, .dump.gz)

  • MongoDB (.gz, .bson, 目录格式)

2.2 支持飞书webhook消息通知

mysql恢复截图.png

mongodb恢复截图.png

2.2 参数说明:

集成数据库恢复脚本 (支持MySQL/PostgreSQL/MongoDB)
用法: ./db_restore.sh -t 数据库类型 [-d 数据库名] [-D 备份目录] [-H 主机] [-P 端口] [-u 用户] [-p 密码]
选项:
  -t  数据库类型 (必填,支持: mysql|postgresql|mongodb)
  -d  目标数据库名 (可选,如不指定则从备份文件名提取)
  -D  备份目录 (可选,默认: /backup/数据库类型)
  -H  数据库主机 (默认: localhost)
  -P  数据库端口 (MySQL默认:3306, PostgreSQL默认:5432, MongoDB默认:27017)
  -u  数据库用户
  -p  数据库密码
  -h  显示帮助信息

2.3 使用示例:

[root@k8s-master1 sql]# ls -alh mysql/20250516_110003/ 
total 21G
drwxr-xr-x 2 root root 4.0K May 16 19:01 .
drwxr-xr-x 3 root root 4.0K May 16 19:01 ..
-rw-r--r-- 1 root root  12G May 16 17:21 agilebpm_5_20250516_110003.sql.gz
-rw-r--r-- 1 root root 226K May 16 17:50 app_center_20250516_110003.sql.gz

[root@k8s-master1 sql]# ./db_restore.sh -t mysql -H 192.168.248.30 -P 30759 -u root -p 123456 -D mysql/20250516_110003/

[root@k8s-master1 sql]# ./db_restore.sh -t mongodb -H 192.168.248.30 -P 30112 -u root -p 'e7bb41b693c6fb9a' -D mongodb/20250516_140002

后台运行脚本

[root@k8s-master1 sql]# nohup ./db_restore.sh -t mysql -H 192.168.248.30 -P 30759 -u root -p 'Zh_Mysql@123!' -D mysql/20250516_110003/ > restore-mysql.log 2>&1 &

[root@k8s-master1 sql]# nohup ./db_restore.sh -t mongodb -H 192.168.248.30 -P 30112 -u root -p 'e7bb41b693c6fb9a' -D mongodb/20250516_140002 > restore-mongodb.log 2>&1 &

查看日志

# 查看标准输出到终端的日志

[root@k8s-master1 sql]# tail -f restore.log 
[2025-05-17 17:31:23] [INFO] 开始MySQL恢复: 数据库=flow 备份=mysql/20250516_110003//flow_20250516_110003.sql.gz
mysql: [Warning] Using a password on the command line interface can be insecure.
[2025-05-17 17:31:27] [INFO] MySQL数据库恢复成功

[root@k8s-master1 sql]# tail -f restore-mongodb.log
nohup: ignoring input
[2025-05-17 17:34:58] [INFO] 在 mongodb/20250516_140002 中查找  备份文件...
[2025-05-17 17:34:58] [INFO] 在 mongodb/20250516_140002 中查找 MongoDB 备份子目录...
[2025-05-17 17:34:58] [INFO] 处理 MongoDB 数据库目录: mongodb/20250516_140002/cmdb/ (数据库名: cmdb)
[2025-05-17 17:34:58] [INFO] 开始 MongoDB 恢复,目标数据库: cmdb,目录: mongodb/20250516_140002/cmdb/
  ➤ 正在恢复集合: obj_property_tag (文件: mongodb/20250516_140002/cmdb/obj_property_tag.bson.gz)
  ➤ 正在恢复集合: obj_point_data (文件: mongodb/20250516_140002/cmdb/obj_point_data.bson.gz)

# 查看输出到日志文件中的日志
[root@k8s-master1 sql]# tail -f /var/log/db_restore.log
[2025-05-17 17:14:36] [INFO] 处理备份文件: mysql/20250516_110003//indc_emergency_drill_20250516_110003.sql.gz (数据库: indc_emergency_drill)
[2025-05-17 17:14:36] [INFO] 开始MySQL恢复: 数据库=indc_emergency_drill 备份=mysql/20250516_110003//indc_emergency_drill_20250516_110003.sql.gz
[2025-05-17 17:14:39] [INFO] MySQL数据库恢复成功
[2025-05-17 17:14:39] [INFO] 处理备份文件: mysql/20250516_110003//fileserver_20250516_110003.sql.gz (数据库: fileserver)
[2025-05-17 17:14:39] [INFO] 开始MySQL恢复: 数据库=fileserver 备份=mysql/20250516_110003//fileserver_20250516_110003.sql.gz
[2025-05-17 17:14:39] [INFO] MySQL数据库恢复成功
=== 数据库恢复结果汇总 ===

✅ 成功恢复的数据库:
  - process
  - indc_emergency_drill

ℹ️ 没有恢复失败的数据库

恢复日志已保存到: /var/log/db_restore.log
=== 汇总结束 ===
飞书通知发送成功 ✅

[root@k8s-master1 sql]# tail -f /var/log/db_restore.log 
[2025-05-18 13:33:54] [INFO] ✅ 数据库 pageplug 恢复完成 (成功恢复 23 个集合)
[2025-05-18 13:33:54] [INFO] 数据库 pageplug 中的集合列表:
[
  'passwordResetToken', 'actionCollection',
  'collection',         'userData',
  'newPage',            'datasource',
  'workspace',          'newAction',
  'usagePulse',         'action',
  'config',             'permissionGroup',
  'plugin',             'tenant',
  'application',        'user',
  'mongockLock',        'mongockChangeLog',
  'role',               'theme',
  'inviteUser',         'sequence',
  'page'
]


=== 数据库恢复结果汇总 ===

✅ 成功恢复的数据库:
  - pageplug
  - meta42_warehouse

ℹ️ 没有恢复失败的数据库

恢复日志已保存到: /var/log/db_restore.log
=== 汇总结束 ===
飞书通知发送成功 ✅

2.4 智能功能:

  • 自动处理压缩文件(.gz)

  • 自动查找目录中的压缩文件

  • 支持MongoDB目录格式和归档格式

  • 完善的错误处理和日志记录

2.5 注意事项:

  • 确保各数据库客户端工具已安装(mysql, pg_restore, mongorestore, python3)

  • 确保安装requests 库(可通过 pip install requests 安装)

  • 备份文件需要放在对应的类型目录下(mysql/postgresql/mongodb)

  • 执行用户需要有对应数据库的管理权限

数据库
MongoDB PostgreSQL Mysql
License:  CC BY 4.0
Share

Further Reading

Aug 3, 2025

Kubernetes 安装部署 MySQL-Operater

本文详细介绍了如何在Kubernetes集群中部署和管理MySQL InnoDB集群,使用了MySQL Operator这一工具。首先,通过Helm添加仓库并更新,下载所需的离线包。接着,根据需要修改配置文件,包括镜像源、资源请求等,并启动MySQL Operator服务及InnoDB集群。文章还提供了详细的配置示例,如设置Pod的调度策略、优化MySQL服务器配置以及创建和管理MySQL自动备份的方法。最后,文中说明了如何解决sidecar容器权限问题,并指导用户如何正确卸载和删除整个集群。整个过程涵盖了从部署到维护的全过程,适合有一定Kubernetes基础的运维人员参考。

Jun 4, 2025

kubernetes 部署 redis-cluster

本文详细介绍了Redis的多种部署模式及其优缺点,包括主从模式、哨兵模式和集群模式。主从模式通过一个主节点和多个从节点实现数据冗余和读写分离,但无自动故障转移;哨兵模式则增加了自动故障转移功能,提高了系统的高可用性;而集群模式不仅支持数据分片,还集成了故障转移能力,适用于海量数据和高并发场景。文章随后展示了如何在Kubernetes环境中部署一个3主3从的Redis集群,包括配置文件和服务启动过程,并演示了集群扩容与缩容的具体步骤。最后,介绍了使用redis-shake工具进行跨集群数据同步的方法,涵盖了解析、恢复、备份及同步等功能,特别强调了其在不同环境下的应用灵活性。

May 8, 2025

常见数据库备份方案

本文档详细描述了一个数据库备份与恢复系统的实现,包括策略、计划安排、脚本编写及启动方式。该系统支持MySQL、PostgreSQL和MongoDB三种类型的数据库,并采用每日全量备份策略,备份源为从库以减少对主库的影响。备份文件按时间戳命名并压缩存储,同时在多个地理位置保存副本以确保数据安全。通过cron作业结合Docker容器执行自动备份任务,且有飞书通知机制实时反馈备份状态。此外,还提供了详尽的数据库恢复脚本,支持多线程操作和多种格式的备份文件处理,能够智能识别并恢复指定数据库或所有数据库,并同样具备发送恢复结果至飞书的功能。整个过程强调了自动化与安全性,确保了数据备份与恢复的高效性和可靠性。

OLDER

H3C 路由器 IPSec VPN 配置文档

NEWER

Docker-compose 部署 nebula 集群数据库

Recently Updated

  • Kubernetes 安装部署 Alist 并配置 Onlyoffice
  • KubeSphere-04-Dev-ops 流水线插件的使用
  • KubeSphere-03-Logging 日志插件的使用
  • KubeSphere-02-Service Mesh 的使用
  • KubeSphere-01-介绍与基础使用

Trending Tags

KVM Service Mesh Docker shell 路由规则 Mysql Containerd GitOps 网络设备 Prometheus

Contents

©2025 甄天祥-Linux-个人小站. Some rights reserved.

Using the Halo theme Chirpy