Page Menu
Home
DevCentral
Search
Configure Global Search
Log In
Files
F12115248
Backup script for acquisitariat and other MySQL containers
No One
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Authored By
dereckson
Sat, Oct 4, 22:16
2025-10-04 22:16:52 (UTC+0)
Size
6 KB
Referenced Files
None
Subscribers
None
Backup script for acquisitariat and other MySQL containers
View Options
#!/usr/bin/env python3
"""
mysql_backup.py — clean, modular MySQL Docker backup tool.
Features:
- Chooses DB-level vs table-level dump based on directory size.
- Ignores tables matching YAML-defined % patterns.
- Archives, compresses, encrypts with OpenSSL AES.
- Supports multiple containers or a single container run.
Author: Refactored cleanly by ChatGPT (senior Python + DevOps style)
"""
import
argparse
import
datetime
import
fnmatch
import
gzip
import
os
import
shutil
import
subprocess
import
tarfile
import
tempfile
from
pathlib
import
Path
import
yaml
# ========== CONFIG & CLI ==========
def
load_config
(
path
:
Path
)
->
dict
:
with
open
(
path
,
'r'
,
encoding
=
'utf-8'
)
as
f
:
return
yaml
.
safe_load
(
f
)
def
parse_args
():
p
=
argparse
.
ArgumentParser
(
description
=
"Backup MySQL Docker containers."
)
p
.
add_argument
(
"--config"
,
"-c"
,
required
=
True
,
help
=
"Path to YAML config"
)
p
.
add_argument
(
"--container"
,
"-C"
,
help
=
"Run only for this container"
)
p
.
add_argument
(
"--data-dir"
,
"-d"
,
help
=
"Override data directory"
)
return
p
.
parse_args
()
# ========== FILESYSTEM HELPERS ==========
def
dir_size
(
path
:
Path
)
->
int
:
return
sum
(
f
.
stat
()
.
st_size
for
f
in
path
.
rglob
(
'*'
)
if
f
.
is_file
())
def
list_databases
(
data_dir
:
Path
)
->
list
[
str
]:
"""List database directories (skip system DBs)."""
skip
=
{
"mysql"
,
"performance_schema"
,
"sys"
,
"information_schema"
}
return
sorted
(
d
.
name
for
d
in
data_dir
.
iterdir
()
if
d
.
is_dir
()
and
d
.
name
not
in
skip
and
not
d
.
name
.
startswith
(
'.'
))
def
list_tables
(
db_dir
:
Path
)
->
list
[
str
]:
"""List table names inferred from MySQL files (.ibd, .frm, etc.)."""
exts
=
(
".ibd"
,
".frm"
,
".MYD"
,
".MYI"
)
tables
=
{
f
.
stem
for
f
in
db_dir
.
iterdir
()
if
f
.
is_file
()
and
f
.
suffix
in
exts
}
return
sorted
(
tables
)
# ========== PATTERN HANDLING ==========
def
parse_patterns
(
patterns
:
list
[
str
])
->
list
[
tuple
[
str
,
str
]]:
"""Convert % wildcards to * and split into (db, table)."""
parsed
=
[]
for
pat
in
patterns
or
[]:
db
,
tbl
=
(
pat
.
split
(
'.'
,
1
)
+
[
'%'
])[:
2
]
if
'.'
in
pat
else
(
'%'
,
pat
)
parsed
.
append
((
db
.
replace
(
'%'
,
'*'
),
tbl
.
replace
(
'%'
,
'*'
)))
return
parsed
def
should_ignore
(
db
:
str
,
table
:
str
,
patterns
:
list
[
tuple
[
str
,
str
]])
->
bool
:
return
any
(
fnmatch
.
fnmatchcase
(
db
,
d
)
and
fnmatch
.
fnmatchcase
(
table
,
t
)
for
d
,
t
in
patterns
)
# ========== DUMP & COMMAND HELPERS ==========
def
run
(
cmd
:
list
[
str
],
output
=
None
):
print
(
"+"
,
" "
.
join
(
map
(
str
,
cmd
)))
with
subprocess
.
Popen
(
cmd
,
stdout
=
output
,
stderr
=
subprocess
.
PIPE
)
as
p
:
_
,
err
=
p
.
communicate
()
if
p
.
returncode
!=
0
:
raise
RuntimeError
(
f
"Command failed:
{
' '
.
join
(
cmd
)
}
\n
{
err
.
decode
()
}
"
)
def
dump_database
(
container
:
str
,
db
:
str
,
outfile
:
Path
,
opts
:
list
[
str
]):
run
([
"mysqldump"
,
container
,
*
opts
,
"--databases"
,
db
],
output
=
open
(
outfile
,
"wb"
))
def
dump_table
(
container
:
str
,
db
:
str
,
table
:
str
,
outfile
:
Path
,
opts
:
list
[
str
]):
run
([
"mysqldump"
,
container
,
*
opts
,
db
,
table
],
output
=
open
(
outfile
,
"wb"
))
# ========== BACKUP STRATEGY ==========
def
backup_database
(
container
:
str
,
db_dir
:
Path
,
tmpdir
:
Path
,
cfg
:
dict
,
patterns
):
"""Dump one database according to size threshold."""
db_name
=
db_dir
.
name
threshold
=
cfg
.
get
(
"threshold_bytes"
,
5
*
1024
*
1024
)
size
=
dir_size
(
db_dir
)
opts_db
=
cfg
.
get
(
"mysqldump_opts_db"
,
[])
opts_table
=
cfg
.
get
(
"mysqldump_opts_table"
,
[])
print
(
f
"Database
{
db_name
}
:
{
size
/
1024
/
1024
:
.2f
}
MB"
)
if
size
<=
threshold
:
dump_database
(
container
,
db_name
,
tmpdir
/
f
"
{
db_name
}
.sql"
,
opts_db
)
return
tables
=
list_tables
(
db_dir
)
if
not
tables
:
print
(
f
" Fallback: no tables detected, doing full dump."
)
dump_database
(
container
,
db_name
,
tmpdir
/
f
"
{
db_name
}
.sql"
,
opts_db
)
return
db_outdir
=
tmpdir
/
db_name
db_outdir
.
mkdir
(
exist_ok
=
True
)
for
t
in
tables
:
if
should_ignore
(
db_name
,
t
,
patterns
):
print
(
f
" - Skipping
{
db_name
}
.
{
t
}
"
)
continue
dump_table
(
container
,
db_name
,
t
,
db_outdir
/
f
"
{
t
}
.sql"
,
opts_table
)
def
backup_container
(
container
:
str
,
data_dir
:
Path
,
cfg
:
dict
):
"""Run backup for one container."""
print
(
f
"
\n
=== Container:
{
container
}
==="
)
tmpdir
=
Path
(
tempfile
.
mkdtemp
(
prefix
=
f
"backup-
{
container
}
-"
))
try
:
patterns
=
parse_patterns
(
cfg
.
get
(
"ignore_tables"
,
[]))
for
db
in
list_databases
(
data_dir
):
backup_database
(
container
,
data_dir
/
db
,
tmpdir
,
cfg
,
patterns
)
archive_and_encrypt
(
container
,
tmpdir
,
cfg
)
finally
:
shutil
.
rmtree
(
tmpdir
,
ignore_errors
=
True
)
# ========== ARCHIVE & ENCRYPTION ==========
def
make_tar_gz
(
src
:
Path
,
dest
:
Path
):
"""Tar and gzip all contents of src into dest."""
with
tarfile
.
open
(
dest
,
"w:gz"
)
as
tar
:
for
p
in
src
.
iterdir
():
tar
.
add
(
p
,
arcname
=
p
.
name
)
def
encrypt_file
(
src
:
Path
,
dest
:
Path
,
key_path
:
Path
):
run
([
"openssl"
,
"enc"
,
"-aes-256-cbc"
,
"-salt"
,
"-in"
,
str
(
src
),
"-out"
,
str
(
dest
),
"-pass"
,
f
"file:
{
key_path
}
"
])
def
archive_and_encrypt
(
container
:
str
,
tmpdir
:
Path
,
cfg
:
dict
):
"""Create tar.gz and encrypt."""
ts
=
datetime
.
datetime
.
utcnow
()
.
strftime
(
"%Y%m
%d
T%H%M%SZ"
)
out_dir
=
Path
(
cfg
.
get
(
"output_dir"
,
"./backups"
))
out_dir
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
tar_path
=
out_dir
/
f
"
{
container
}
_
{
ts
}
.tar.gz"
enc_path
=
tar_path
.
with_suffix
(
".tar.gz.enc"
)
key_path
=
Path
(
cfg
[
"key_path"
])
make_tar_gz
(
tmpdir
,
tar_path
)
encrypt_file
(
tar_path
,
enc_path
,
key_path
)
tar_path
.
unlink
()
print
(
f
"✓ Encrypted archive:
{
enc_path
}
"
)
# ========== MAIN ORCHESTRATION ==========
def
run_backup
(
cfg
:
dict
,
container
=
None
,
data_dir_override
=
None
):
containers
=
cfg
.
get
(
"containers"
,
[])
if
isinstance
(
containers
,
dict
):
containers
=
[{
"name"
:
n
,
"data_dir"
:
v
[
"data_dir"
]}
for
n
,
v
in
containers
.
items
()]
targets
=
[
c
for
c
in
containers
if
not
container
or
c
[
"name"
]
==
container
]
if
not
targets
:
raise
ValueError
(
f
"No container matching
{
container
!r}
"
)
for
c
in
targets
:
data_dir
=
Path
(
data_dir_override
or
c
[
"data_dir"
])
if
not
data_dir
.
exists
():
print
(
f
"⚠️ Missing data dir for
{
c
[
'name'
]
}
:
{
data_dir
}
"
)
continue
backup_container
(
c
[
"name"
],
data_dir
,
cfg
)
def
main
():
args
=
parse_args
()
cfg
=
load_config
(
Path
(
args
.
config
))
run_backup
(
cfg
,
args
.
container
,
args
.
data_dir
)
if
__name__
==
"__main__"
:
main
()
File Metadata
Details
Attached
Mime Type
text/plain; charset=utf-8
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3047546
Default Alt Text
Backup script for acquisitariat and other MySQL containers (6 KB)
Attached To
Mode
P374 Backup script for acquisitariat and other MySQL containers
Attached
Detach File
Event Timeline
Log In to Comment