#!/usr/bin/env python3
"""
mysqldump-by-size.py_backup.py — clean, modular MySQL Docker backup tool.
Backup MySQL Docker containers by scanning the data volume:
- If a database total size <= threshold -> do a database-level mysqldump (one file).
- If database size > threshold -> do table-level mysqldump (one directory per db, one file per table),
but ignore tables matching patterns defined in YAML (patterns use % like MySQL; converted to glob *).
UsageFeatures:
python3 mysqldump-by-size.py --config config.yml- Chooses DB-level vs table-level dump based on directory size.
python3 mysqldump-by-size.py --config config.yml --container acquisitariat --data-dir /srv/acquisitariat/mysql
Requirements:- Ignores tables matching YAML-defined % patterns.
- Python 3.8+- Archives, compresses, encrypts with OpenSSL AES.
- PyYAML (pip install pyyaml)
- mysqldump wrapper in PATH that accepts: mysqldump <container> <normal mysqldump options>
- openssl CLI available for encryptionSupports multiple containers or a single container run.
GeneratedAuthor: Refactored cleanly by ChatGPT with GPT 5(senior Python + DevOps style)
"""
import argparse
import datetime
import fnmatch
import gzip
import os
import shutil
import subprocess
import sys
import tarfile
import tempfile
from pathlib import Path
try:
import yaml
except Exception:
print("PyYAML is required. Install with: pip install pyyaml", file=sys.stderr)
sys.exit(2)
# ---------- Helpers ----------# ========== CONFIG & CLI ==========
def human_bytes(n):load_config(path: Path) -> dict:
for unit in ["B","KB","MB","GB","TB"]:
if n < 1024:
return f"{n:.1f}{unit}"with open(path, 'r', encoding='utf-8') as f:
n /= 1024return yaml.safe_load(f)
def parse_args():
return f"{n:.1f}PB"
def get_dir_size(path: Path) -> int:p = argparse.ArgumentParser(description="Backup MySQL Docker containers.")
"""Return total size in bytes of files under pp.add_argument("--config", "-c", required=True, help="Path (recursive)."""to YAML config")
total = 0p.add_argument("--container", "-C", help="Run only for this container")
for p in path.rglob('*'):
if p.is_file():
try:
total += p.stat().st_size
except OSError:
passp.add_argument("--data-dir", "-d", help="Override data directory")
return totalp.parse_args()
# ========== FILESYSTEM HELPERS ==========
def find_databases(data_dirdir_size(path: Path): -> int:
"""Return list of database directory names in the data_dir.
We consider entries that are directories and skip hidden/temporary names.return sum(f.stat().st_size for f in path.rglob('*') if f.is_file())
def list_databases(data_dir: Path) -> list[str]:
"""List database directories (skip system DBs)."""
dbs = []skip = {"mysql", "performance_schema", "sys", "information_schema"}
if notreturn sorted(d.name for d in data_dir.is_terdir():
return dbs if d.is_dir() and d.name not in skip and not d.name.startswith('.'))
def list_tables(db_dir: Path) -> list[str]:
for entry in sorted(data_dir.iterdir()):
if entry.is_dir():
# skip internal schemas usually
if entry.name in ("mysql""""List table names inferred from MySQL files (.ibd, "performance_schema", "sys".frm, "information_schema"):
continue
# also skip files beginning with '.' or '#'
if entry.name.startswith('.') or entry.name.startswith('#'):
continue
dbs.append(entry.name)etc.)."""
return dbs
def find_tables_from_db_dir(db_dir: Path):exts = (".ibd", ".frm", ".MYD", ".MYI")
"""Try to infer table names by listing per-table files (.ibd, .frm, .MYD, .MYI).es = {f.stem for f in db_dir.iterdir()
Returns sorted unique table names. if f.is_file() and f.suffix in exts}
"""return sorted(tables)
# ========== PATTERN HANDLING ==========
def parse_patterns(patterns: list[str]) -> list[tuple[str, str]]:
table_basenames = set()"""Convert % wildcards to * and split into (db, table)."""
if not db_dir.is_dir():
returnparsed = []
for fpat in db_dir.iterdir():patterns or []:
if not f.is_file():
continuedb, tbl = (pat.split('.', 1) + ['%'])[:2] if '.' in pat else ('%', pat)
# common MySQL file extensions for table files
for ext in (".ibd", ".frm"parsed.append((db.replace('%', ".MYD"'*'), ".MYI"tbl.replace('%', ".ibdata"):
if f.name.endswith(ext):
base = f.name[:-len(ext)]
# ignore metadata like gen_clust_index or not table-like files
if base:
table_basenames.add(base)
break'*')))
return sorted(table_basenames)
defparsed
def should_ignore(db: str, table: str, pattern_to_glob(pat:s: list[tuple[str, str]]) -> str:bool:
"""Convert MySQL-style % wildcards into shell glob * wildcards."""return any(fnmatch.fnmatchcase(db, d) and fnmatch.fnmatchcase(table, t)
return pat.replace('%' for d, '*')t in patterns)
# ========== DUMP & COMMAND HELPERS ==========
def parse_ignore_patterns(patterns):
"""Parse list of strings like 'dbpattern.tablepattern' where patterns use % wildcards.
Return list of (db_globrun(cmd: list[str], table_glob) tuples.
"""
res = []output=None):
if not patterns:
return resprint("+", " ".join(map(str, cmd)))
for p in patterns:
p = p.strip()
if not p:
continue
if '.' in p:
dbp, tbp = p.split('.', 1)
else:
# If no dotwith subprocess.Popen(cmd, treat it as table pattern matching any db
dbpstdout=output, tbp = '%', pstderr=subprocess.PIPE) as p:
res.append((pattern_to_glob(dbp), pattern_to_glob(tbp)))
return res
def table_ignored(dbname: str, tablename: str, ignore_tuples) -> bool:
"""Return True if the table should be ignored given list of (db_glob_, table_glob)."""
for db_glob, table_glob in ignore_tuples:err = p.communicate()
if fnmatch.fnmatchcase(dbname, db_glob) and fnmatch.fnmatchcase(tablename, table_glob):if p.returncode != 0:
return True
return Falseraise RuntimeError(f"Command failed: {' '.join(cmd)}\n{err.decode()}")
def run_cmd(cmddump_database(container: str, capture_output=Falsedb: str, check=True):
"""Run a subprocess command with logging."""outfile: Path, opts: list[str]):
print(f"+ {' '.join(map(strrun(["mysqldump", container, *opts, "--databases", db], output=open(outfile, "wb"))
def dump_table(container: str, db: str, table: str, outfile: Path, cmd))}")opts: list[str]):
return subprocess.run(cmdrun(["mysqldump", container, *opts, db, table], capture_output=capture_outputopen(outfile, check=check)"wb"))
# ---------- Main backup logic ----------========== BACKUP STRATEGY ==========
def backup_database(container(cfg: str, container_name=Nonedb_dir: Path, data_dir_override=None):
"""
cfg is a dict from YAML with keys:
- containers: list of {name: <container>tmpdir: Path, data_dir: <path>} OR a dict keyed by name
- key_path: path to AES key file
- ignore_tables: list of patterns like %_file.storageblob
- threshold_bytes: integer threshold to choose db-level vs table-level (default 5*1024*1024)
- output_dir: where to put final encrypted archives (default ./backups)
- mysqldump_opts_db: list of extra options to pass for DB-level (optional)
- mysqldump_opts_table: list of extra options to pass for table-level (optional)cfg: dict, patterns):
"""Dump one database according to size threshold."""
threshold = cfg.get('threshold_bytes', 5 * 1024 * 1024)db_name = db_dir.name
key_path = Path(threshold = cfg.get('key_path')"threshold_bytes", 5 * 1024 * 1024)
if not key_path or not key_path.is_file():
raise FileNotFoundError(f"Key path not found or not a file: {key_path}"size = dir_size(db_dir)
out_root = Path(opts_db = cfg.get('output_dir'"mysqldump_opts_db", './backups')).absolute([])
out_root.mkdir(parents=Trueopts_table = cfg.get("mysqldump_opts_table", exist_ok=True[])
ignore_tuples = parse_ignore_patterns(cfg.get('ignore_tables', []))
mysqldump_opts_db = cfg.get('mysqldump_opts_db', [])
mysqldump_opts_table = cfg.get('mysqldump_opts_table', []print(f"Database {db_name}: {size/1024/1024:.2f} MB")
# Build container list to process
containers_cfg = cfg.get('containers', [])
# Accept both list of dicts or mapping
containers_list = []
if isinstance(containers_cfg, dict)if size <= threshold:
# mapping format: dump_database(container, db_name -> {data_dir: ...}, tmpdir / f"{db_name}.sql", opts_db)
for name, v in containers_cfg.items():
d = v.get('data_dir') if isinstance(v, dict) else vreturn
containers_list.append({'name': name, 'data_dir': d})
else:tables = list_tables(db_dir)
for item in containers_cfg:
if isinstance(item, dict):
name = item.get('name')
data_dir = item.get('data_dir')
if name:
containers_list.append({'name': name, 'data_dir': data_dir})
elif isinstance(item, str):
# simple list of names not supported without data_dir, skip
print("Warning: container entry is string (no data_dir). Skipping:", item)
continue
if container_name:if not tables:
# filter for that containerprint(f" Fallback: no tables detected, doing full dump.")
containers_list = [c for c in containers_list if c['dump_database(container, db_name, tmpdir / f"{db_name'] == container_name]}.sql", opts_db)
if not containers_list:
raise ValueError(f"Requested container {container_name} not found in config.")return
results = []db_outdir = tmpdir / db_name
for c in containers_list:
name = c['name']
data_dir = Path(data_dir_override) if data_dir_override else Path(c.get('data_dir', ''))db_outdir.mkdir(exist_ok=True)
for t in tables:
if not data_dir:if should_ignore(db_name, t, patterns):
print(f" - Skipping container {{db_name}: no data_dir configured.".{t}")
continue
if not data_dir.exists(dump_table(container, db_name, t, db_outdir / f"{t}.sql", opts_table)
def backup_container(container: str, data_dir: Path, cfg: dict):
print(f"Warning: data_dir {data_dir} for"""Run backup for one container {name} does not exist. Skipping.")."""
continue
timestamp = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ"print(f"\n=== Container: {container} ===")
tmpdir = Path(tempfile.mkdtemp(prefix=f"mysql-backup-{name}-"container}-"))
print(f"Processing container: {name}")try:
print(f" data_dir: {data_dir}"patterns = parse_patterns(cfg.get("ignore_tables", []))
print(f" tmpdir: {tmpdir}")
try:for db in list_databases(data_dir):
dbs = find_backup_database(container, databases(data_dir_dir / db, tmpdir, cfg, patterns)
if not dbs:archive_and_encrypt(container, tmpdir, cfg)
print(f"No databases found under {data_dir}.")finally:
continueshutil.rmtree(tmpdir, ignore_errors=True)
# ========== ARCHIVE & ENCRYPTION ==========
for db in dbs:
db_dir = data_dir / db
db_size = get_dir_size(db_dir)
print(f"Database {db} size: {human_bytes(db_size)} ({db_dir})")
if db_size <= threshold:
# DB-level dump
outfile = tmpdir / f"{db}.sql"def make_tar_gz(src: Path, dest: Path):
cmd = ["mysqldump", name] + list(mysqldump_opts_db) + ["--databases", db]
print(f" -> DB-level dump to {outfile}")
with open(outfile, "wb") as fh:
proc = subprocess.run(cmd, stdout=fh)
if proc.returncode != 0:
raise RuntimeError(f"mysqldump failed for {db} (container {name})")
else:
# table-level: create directory for db and dump each table
db_outdir = tmpdir / db
db_outdir.mkdir(parents=True, exist_ok=True)
tables = find_tables_from_db_dir(db_dir)
if not tables:
# If we can't detect tables by files, fallback to entire DB dump (safer)
print(f" -> Could not detect tables for {db}. Falling back to DB-level dump.")
outfile = tmpdir / f"{db}.sql"
cmd = ["mysqldump", name] + list(mysqldump_opts_db) + ["--databases", db]
with open(outfile, "wb") as fh:
proc = subprocess.run(cmd, stdout=fh)
if proc.returncode != 0:
raise RuntimeError(f"mysqldump failed for {db} (container {name})")
else:
for t in tables:
if table_ignored(db, t, ignore_tuples):
print(f" - Skipping table {db}.{t} (matched ignore pattern)")
continue
outfile = db_outdir / f"{t}.sql"
cmd = ["mysqldump", name] + list(mysqldump_opts_table) + [db, t]
print(f" - Dump table {db}.{t} -> {outfile}")
with open(outfile, "wb") as fh:
proc = subprocess.run(cmd, stdout=fh)
if proc.returncode != 0:
raise RuntimeError(f"mysqldump failed for {db}.{t} (container {name})")
# all dumps created under tmpdir. Now tar and encrypt
tarname = out_root / f"{name}_backup_{timestamp}.tar"
gzname = out_root / f"{name}_backup_{timestamp}.tar.gz"
encname = out_root / f"{name}_backup_{timestamp}.tar.gz.enc""""Tar and gzip all contents of src into dest."""
print(f"Creating tar.gz: {gzname}")with tarfile.open(dest, "w:gz") as tar:
# Create gzipped tar in a streaming mannerfor p in src.iterdir():
with tarfile.open(tarname, "w") as tar:
# add contents of tmpdir, but not top-level tmpdir itself
for p in sorted(tmpdir.iterdir()):
tar.add(p, arcname=p.name)
def encrypt_file(src: Path, dest: Path, key_path: Path):
# gzip the tar
with open(tarname, "rb") as f_in, open(gzname, "wb") as f_out:
# use shutil to compress
import gzip
with gzip.GzipFile(fileobj=f_out, mode="wb") as gz:
shutil.copyfileobj(f_in, gz)
tarname.unlink() # remove the intermediate tar
print(f"Encrypting with openssl -> {encname}")
openssl_cmd = run([
"openssl", "enc", "-aes-256-cbc", "-salt",
"-in", str(gzname),
"-out"src), str(encname),
"-pass""-out", f"file:{str(key_path)}"
]
proc = subprocess.run(openssl_cmd)
if proc.returncode != 0:
raise RuntimeError("OpenSSL encryption failed.")
# option: remove plain gz after encrypt
gzname.unlink()
print(f"Backup created and encrypted: {encname}")str(dest),
results.append(str(encname))
finally:"-pass", f"file:{key_path}"
# cleanup])
def archive_and_encrypt(container: str, tmpdir: Path, cfg: dict):
try:"""Create tar.gz and encrypt."""
shutil.rmtree(tmpdirts = datetime.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
except Exception as e:out_dir = Path(cfg.get("output_dir", "./backups"))
print(f"Warning: failed to remove tmpdir {tmpdir}: {e}")
return results
# ---------- CLI ----------out_dir.mkdir(parents=True, exist_ok=True)
def load_config(path: Path): tar_path = out_dir / f"{container}_{ts}.tar.gz"
with open(enc_path = tar_path, "r", encoding="utf-8") as fh:.with_suffix(".tar.gz.enc")
return yaml.safe_load(fhkey_path = Path(cfg["key_path"])
def main(): make_tar_gz(tmpdir, tar_path)
ap = argparse.ArgumentParser(description="Backup MySQL Docker containers by scanning data volumes."encrypt_file(tar_path, enc_path, key_path)
ap.add_argument("--config", "-c", required=True, help="YAML configuration file path"tar_path.unlink()
ap.add_argument("--container"print(f"✓ Encrypted archive: {enc_path}")
# ========== MAIN ORCHESTRATION ==========
def run_backup(cfg: dict, "-C"container=None, help="Name of a single container to backup (optional)")data_dir_override=None):
ap.add_argument("--data-dir", "-d"containers = cfg.get("containers", help="Override data dir for the container (optional)"[])
argif isinstance(containers, dict):
containers = ap.parse_args()[{"name": n, "data_dir": v["data_dir"]} for n, v in containers.items()]
cfg_path = Path(args.config)targets = [c for c in containers if not container or c["name"] == container]
if not cfg_path.is_file():
print("Config file not found:", cfg_path, file=sys.stderr)targets:
sys.exit(2)
cfg = load_config(cfg_pathraise ValueError(f"No container matching {container!r}")
try:for c in targets:
results = backup_container(cfg, container_name=args.container, data_dir = Path(data_dir_override=args. or c["data_dir)
except Exception as e:"])
if not data_dir.exists():
print("Error during backup:", e, file=sys.stderr)f"⚠️ Missing data dir for {c['name']}: {data_dir}")
continue
sys.exit(3)backup_container(c["name"], data_dir, cfg)
def main():
print("All done. Created archives:"args = parse_args()
for r in results:cfg = load_config(Path(args.config))
print(" -"run_backup(cfg, args.container, r)args.data_dir)
if __name__ == "__main__":
main()