#!/usr/bin/env python3
"""
mysql_Async MySQL backup.py — clean, modular MySQL Docker backup toolp script for Nasqueron containers.
Features:
- Chooses DB-level vs table-level dump based on directory size.- Detect MySQL containers (image "nasqueron/mysql") automatically or use arguments
- Ignores tables matching YAML-defined % patterns.- For each container, detect MySQL data directory via `docker inspect`
- Archives, compresses, encrypts with OpenSSL AES.- Backup databases:
- Supports multiple containers or a single container run.
Author: Refactored cleanly by ChatGPT (senior Python + DevOps style)
"""
import argparse * At database level if total size ≤ 5 MB
import datetime * At table level otherwise (one file per table)
import fnmatch - Ignore tables by pattern from YAML configuration
import gzip - Archive and encrypt backups (AES via openssl)
import os - Run operations concurrently using asyncio for speed
"""
import shutilasyncio
import subprocess
import tarfile
import tempfile
from pathlib import Path
import yaml
import os
import fnmatch
import shlex
CONFIG_PATH = Path("/etc/nasqueron/mysql-backup.yaml")
SIZE_THRESHOLD_MB = 5
# ========== CONFIG & CLI ==========# ---------- Utilities ----------
def load_config(path: Pathrun_sync(cmd: list[str], **kwargs) -> dict:str:
with open(path, 'r', encoding='utf-8') as f:"""Run a command synchronously and return its stdout."""
return yaml.safe_load(freturn subprocess.run(cmd, stdout=subprocess.PIPE, text=True, check=True, **kwargs).stdout.strip()
async def parse_args():run_async(cmd: list[str]) -> str:
p = argparse.ArgumentParser(description="Backup MySQL Docker containers.")"""Run a command asynchronously and capture stdout."""
p.add_argument("--config", "-c", required=True, help="Path to YAML config")proc = await asyncio.create_subprocess_exec(
p.add_argument("--container" *cmd, "-C"stdout=asyncio.subprocess.PIPE, help="Run only for this container")stderr=asyncio.subprocess.PIPE
p.add_argument("--data-dir", "-d", help="Override data directory")
return p.parse_args()
# ========== FILESYSTEM HELPERS ==========
def dir_size(path: Path) -> int:stdout, stderr = await proc.communicate()
return sum(f.stat().st_size for f in path.rglob('*') if f.is_file())
def list_databases(data_dir: Path) -> list[str]:if proc.returncode != 0:
"""List database directories (skip system DBs).""" raise RuntimeError(f"Command failed: {' '.join(cmd)}\n{stderr.decode()}")
skip = {"mysql", "performance_schema", "sys", "information_schema"}return stdout.decode().strip()
def load_config() -> dict:
return sorted(d.name for d in data_dir.iterdir()with open(CONFIG_PATH) as f:
if d.is_dir() and d.name not in skip and not d.name.startswith('.')return yaml.safe_load(f)
def list_matches_ignore_pattern(db: str, tables(db_dir: Path) ->: str, ignore_patterns: list[str]:) -> bool:
"""List table names inferred from MySQL files (.ibd, .frm, etc.)"""Return True if table matches an ignore pattern."""
exts = (".ibd", ".frm", ".MYD", ".MYI")for pattern in ignore_patterns:
tables = {f.stem for f in db_dir.iterdir() if fnmatch.fnmatch(f"{db}.{table}", pattern.replace("%", "*")):
if f.is_file() and f.suffix in exts}return True
return sorted(tables)False
# ========== PATTERN HANDLING ==========
def parse_patterns(patterns: list[str]) -> list[tuple[strencrypt_file(path: Path, str]]:key_path: Path) -> Path:
"""Convert % wildcards to * and split into (db, table)"""Encrypt file with openssl AES."""
parsencrypted = []path.with_suffix(path.suffix + ".aes")
for pat in patterns or []:
dbcmd = ["openssl", "enc", tbl = (pat.split('.'"-aes-256-cbc", "-salt", 1) + ['%'])[:2] if '.' in pat else ('%'"-in", pat)str(path),
parsed.append((db.replace('%' "-out", '*'str(encrypted), tbl.replace('%'"-pass", '*')))f"file:{key_path}"]
return parsed
def should_ignore(db: str, table: str, patterns: list[tuple[str, str]]) -> bool:run_sync(cmd)
return any(fnmatch.fnmatchcase(db, d) and fnmatch.fnmatchcase(table, t)path.unlink() # Remove unencrypted tar
for d, t in patterns)return encrypted
# ========== DUMP & COMMAND HELPERS ==========# ---------- Docker helpers ----------
async def run(cmd: list[str], output=None):get_mysql_containers() -> list[str]:
print("+", " ".join(map(str, cmd)))"""Return list of container names using the nasqueron/mysql image."""
with subprocess.Popen(cmdoutput = await run_async(["docker", stdout=output"ps", stderr=subprocess.PIPE) as p:
_"--filter", err = p.communicate()
if p.returncode != 0:
raise RuntimeError(f"Command failed: {' '.join(cmd)}\n{err.decode()}")
def dump_database(container: str"ancestor=nasqueron/mysql", db: str"--format", outfile: Path, opts: list[str]):"{{.Names}}"])
run(["mysqldump", container, *opts, "--databases", db],return output=open(outfile, "wb").splitlines()
def dump_table(container: str, db: str, table: str, outfile: Path, opts: list[str]):async def get_data_volume(container: str) -> str:
run(["mysqldump", container, *opts, db, table], output=open(outfile, "wb"))
# ========== BACKUP STRATEGY ==========
def backup_database(container: str, db_dir: Path, tmpdir: Path, cfg: dict, patterns):"""Inspect container to find the host directory mounted to /var/lib/mysql."""
"""Dump one database according to size threshold."""output = await run_async(["docker", "inspect", "-f",
db_name = db_dir.name "{{range .Mounts}}{{if eq .Destination \"/var/lib/mysql\"}}{{.Source}}{{end}}{{end}}",
threshold = cfg.get("threshold_bytes", 5 * 1024 * 1024 container])
size = dir_size(db_dir)return output.strip()
# ---------- Backup logic ----------
async def get_databases(container: str) -> list[str]:
opts_db = cfg.get("mysqldump_opts_db", [])"""List databases in the container."""
opts_table = cfg.get(output = await run_async(["mysqldump_opts_table"", container, [])
"--no-data", "--all-databases"])
print(f"Database {db_name}: {size/1024/1024:.2f} MB")
return [line.split()[1] for line in output.splitlines() if line.startswith("CREATE DATABASE")]
def db_size_mb(db_dir: Path) -> float:
if total = sum(f.stat().st_size <= threshold:for f in db_dir.rglob("*") if f.is_file())
return total / (1024 * 1024)
async def dump_database(container, db_name: str, tmpdir / f"{db_name}.sql"db: str, opts_db)target: Path):
return
tables = list_tables(db_dir)cmd = ["mysqldump", container, db]
if not tables:async with asyncio.Lock(): # ensure orderly I/O
print(f" Fallback: no tables detected, doing full dump."dump = await run_async(cmd)
target.write_text(dump_databas)
async def dump_table(container, db_name: str, tmpdir / f"{db_name}.sql"db: str, table: str, opts_db)target: Path):
return
db_outdir = tmpdir / db_namecmd = ["mysqldump", container, db, table]
db_outdir.mkdir(exist_ok=True)
for t in tables:async with asyncio.Lock():
if should_ignore(db_name, t, patterns):
print(f" - Skipping {db_name}.{t}")
continuedump = await run_async(cmd)
dump_table(container, db_name, t, db_outdir / f"{t}.sql", opts_tabletarget.write_text(dump)
async def backup_container(container: str, data_dir: Pathstr, cfconfig: dict):
"""Run backup for one backup_dir = Path(tempfile.mkdtemp(prefix=f"backup_{container."""}_"))
print(f"\n=== Container: {container} ==="ignore_patterns = config.get("ignore_tables", [])
tmpdirkey_path = Path(tempfile.mkdtemp(prefix=f"backup-{container}-")config["aes_key"])
data_path = Path(data_dir)
try:db_dirs = [p for p in data_path.iterdir() if p.is_dir() and not p.name.startswith("mysql")]
tasks = []
patterns = parse_patterns(cfg.get("ignore_tables", []))for db_dir in db_dirs:
for db in list_databases(datadb = db_dir):
backup_database(container, data_dir / db, tmpdir, cfg, patterns).name
archive_and_encrypt(container, tmpdir, cfgsize_mb = db_size_mb(db_dir)
finally: db_target = backup_dir / db
shutil.rmtree(tmpdirdb_target.mkdir(parents=True, ignore_errorsexist_ok=True)
# ========== ARCHIVE & ENCRYPTION ==========
def make_tar_gz(src: Path, dest: Path):
"""Tar and gzip all contents of src into dest."""
with tarfile.open(dest, "w:gz") as tar:
for p in src.iterdir():if size_mb <= SIZE_THRESHOLD_MB:
tar.add(p, arcname=p.name)
def encrypt_file(src: Path, dest: Path, key_path: Path):
run([
"openssl", "enc", "-aes-256-cbc", "-salt",dump_path = db_target.with_suffix(".sql")
"-in", str(src) tasks.append(dump_database(container, "-out"db, str(dest),dump_path))
"-pass", f"file:{key_path}"else:
])
def archive_and_encrypt(container: str, tmpdir: Path, cfg: dict): tables = [t.stem for t in db_dir.glob("*.ibd")] # heuristic
"""Create tar.gz and encrypt.""" for table in tables:
ts = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") if not matches_ignore_pattern(db, table, ignore_patterns):
out_dir = Path(cfg.get("output_dir", "./backups")) table_path = db_target / f"{table}.sql"
out_dir.mkdir(parents=True tasks.append(dump_table(container, db, table, exist_ok=Truetable_path))
tar_path = out_dir / f"{container}_{ts}.tar.gz"
enc_path = tar_path.with_suffix(".tar.gz.enc")
key_path = Path(cfg["key_path"]await asyncio.gather(*tasks)
make_tar_gz(tmpdir, tar_pathtar_path = backup_dir.with_suffix(".tar")
encrypt_filewith tarfile.open(tar_path, enc_path"w") as tar:
tar.add(backup_dir, key_patharcname=container)
tarencrypted_path.unlink( = encrypt_file(tar_path, key_path)
print(f"✓ EncrypBackup completed archivefor {container}: {encrypted_path}")
# ========== MAIN ORCHESTRATION ==========# ---------- Main entry ----------
def run_backup(cfg: dict, container=None, data_dir_override=None):
containers = cfg.get("containers", [])
if isinstance(containers, dictasync def main():
containers = [{"name": n, "data_dir": v["data_dir"]} for n, v in containers.items()]
targets = [c for c in containers if not container or c["name"] == container]config = load_config()
if not targets:
raise ValueError(f"No containers = config.get("container matching {container!r}"s", {})
for c in targets:
data_dir = Path(data_dir_override or c["data_dir"])if not containers: # autodetect if config empty
if not data_dir.exists():
print(f"⚠️ Missing data dir for {c['name']}: {data_dir}")
continuenames = await get_mysql_containers()
backup_container(c["s = {name"], : await get_data_dir, cfg)
def main():volume(name) for name in names}
args = parse_args()
cfg = loadtasks = [backup_config(Path(args.tainer(name, path, config)) for name, path in containers.items()]
run_backup(cfg, args.container, args.data_dirawait asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())