前言:作为运维人员,数据库的备份一定是日常工作中必不可少的一项工作,除了经常写的shell脚本来备份数据库,然后加入到Linux自带的crontab中以外,我利用了工作闲暇时间用python写了一个这样的脚本。
优点:不依赖Linux环境变量,一条python命令即可完成,使用方式简单

配置文件:DB_config.py

使用之前根据自己环境更改配置

#!/usr/bin/python3
#!encoding=utf-8

"""连接数据库信息"""
DB_information ={
    'db_ip':'192.168.1.198',
    'db_user':'root',
    'db_password':'Asd@123!',
    'db_port':3306
}

"""备份目录路径,结尾加/"""
Backup_path = "./db_backup/"

"""定时执行的时间,为24小时制"""
cron_time = {
    'hour':13,
    'minute':31
}

主程序代码:DB_Backup.py

脚本所依赖的第三方库:pymysql、apscheduler(需提前下载,参考命令:pip3 install pymysql、pip3 install apscheduler)

#!/usr/bin/python3
#!encoding=utf-8
"""作者:陈浩
   更新时间:2022.4.22
"""
#引入需要的库
import os
import os.path
import logging
import time
import sys
import pymysql
import shutil
from subprocess import getstatusoutput
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
import DB_config

class DatabaseBR:
    """备份、恢复、定时执行的类"""
    Time = time.strftime('%Y-%m-%d %H:%M:%S')
    Dirtime = time.strftime('%Y%m%d%H%M%S')
    Dirbackup = DB_config.Backup_path + Dirtime
    sql = "show databases;"

    logging.basicConfig(level=logging.DEBUG, filename='./sql_backup.log', filemode='a',
                        format='%(asctime)s - %(levelname)s: %(message)s') #参数filemode:a是追加,w是覆盖
    logging.info("检测备份目录是否存在……")
    if not os.path.exists(DB_config.Backup_path):
        logging.warning("不存在,正在创建中……")
        os.mkdir(DB_config.Backup_path)
    else:
        logging.info("存在!无需创建")


    def backing(self):
        """数据库备份的方法"""
        con = []
        logging.info("正在连接数据库……")
        try:
            con = pymysql.connect(host=DB_config.DB_information['db_ip'],
                                  port=DB_config.DB_information['db_port'],
                                  user=DB_config.DB_information['db_user'],
                                  password=DB_config.DB_information['db_password'],
                                  charset='UTF8')
            cur = con.cursor()
            logging.warning("正在执行sql……")
            cur.execute(self.sql)
            results = cur.fetchall()
            for result in results:
                result = list(result)
                result = result[0]
                dumps = "mysqldump -u{0} -p{1} -h{2} -P{3} -E -R {4} > {5}.sql".format(
                    DB_config.DB_information['db_user'],
                    DB_config.DB_information['db_password'],
                    DB_config.DB_information['db_ip'],
                    DB_config.DB_information['db_port'],
                    result,
                    DB_config.Backup_path + result
                )
                os.popen(dumps)
            cur.close()
            logging.warning("提交事务")
            con.commit()
        except Exception as e:
            logging.error("发生未知错误!", e)
        finally:
            logging.info("关闭数据库连接")
            con.close()

    def recovering(self):
        """数据库恢复的方法"""
        pass

    def con_task(self):
        """执行定时任务的方法"""
        try:
            blocking = BlockingScheduler() #实例化父类
            blocking.add_job(self.backing, 'cron', hour=DB_config.cron_time['hour'],
                             minute=DB_config.cron_time['minute'])
            blocking.start()
        except (Exception, SystemExit, KeyboardInterrupt) as e:
            print("发生了未知的错误或用户已手动结束任务,Error{}".format(e))


if __name__ == '__main__':
    """主函数"""
    databasebr = DatabaseBR()
    databasebr.con_task()

目录结构:单独创建一个目录,把这俩脚本放下面,注意:新建一个__init__.py的文件,然后给予可执行权限,最后执行python3 DB_Backup.py即可(直接执行为前台执行,后台执行使用nohup)

目前只可以实现定时备份,恢复数据的还在编写当中,考虑到实际业务中,恢复数据的操作一般不会用作定制任务来执行,这一块还需要花时间来进行,恢复数据的操作可能会单独写一个脚本出来,单独做恢复的操作,后续一段时间会更新

Logo

更多推荐