Commit 78cf82dd authored by Oleg Borisenko's avatar Oleg Borisenko
1) more control over copy queue (but still single sequencing result can be placed to two tapes if compression fails).
2) tape change bug
3) graceful stop
parent 7ff75e60
......@@ -2,6 +2,8 @@ import argparse
import asyncio
import datetime
import logging
import signal
import os
import redis
import sys
......@@ -125,11 +127,12 @@ def form_backup_queue(dbsession, free_space):
files_to_backup = dbsession.query(models.FileToBackup). \
filter(and_(models.FileToBackup.tape_label == None,
models.FileToBackup.target_unique_label == target.unique_label,
models.FileToBackup.is_file == True)).order_by(
models.FileToBackup.is_file == True)).order_by(models.FileToBackup.relative_path).all()
if not files_to_backup:
files_to_backup_total += len(files_to_backup)"Listed files")
if target.kind == models.DataKind.wgs:
current_queue, current_queue_size = form_wgs_queue(files_to_backup, space_left)
......@@ -144,6 +147,7 @@ def form_backup_queue(dbsession, free_space):
total_queue_size += current_queue_size
space_left -= current_queue_size"Formed new copy queue, size %d bytes / %d GB", total_queue_size, total_queue_size/1024**3)
return total_copy_queue, files_to_backup_total
......@@ -152,20 +156,40 @@ def copy_process(env, loop, current_job):
dbsession = env['request'].dbsession
manager = utils.tapemanager.TapeManager(dbsession)
current_batch = models.Batch.get_current(dbsession)
# 200GB is special reserved space not to lock the tape. We don't use manager.df since it is more pessimistic
free_space = manager.really_free - 200 * 1024**3
going_to_backup_job_size = 0
for file_to_copy in current_batch.files_to_backup[0:current_job]:
going_to_backup_job_size += file_to_copy.fsize"Going to backup during this job %d bytes, %d are available", going_to_backup_job_size, free_space)
if going_to_backup_job_size > free_space:
log.warning("The whole batch doesn't fit the tape size; the initial batch took more space than expected and "
"compression didn't work well. Need to start new batch")
return False
for file_to_copy in current_batch.files_to_backup[0:current_job]:"Copying %s", file_to_copy.relative_path)
target = dbsession.query(models.BackupTarget).filter(models.BackupTarget.unique_label == file_to_copy.target_unique_label).one()
src = target.fullpath + "/" + file_to_copy.relative_path
dest = "/srv_mount_dont_go_here/tapes/" + current_batch.tape_id + "/" + file_to_copy.relative_path
free_space = manager.really_free
if free_space < file_to_copy.fsize:
raise Exception("Batch doesn't fit the tape size; need to stop immediately and investigate!")
log.warning("Batch doesn't fit the tape size; the initial batch took more space than expected and "
"compression didn't work well. Need to start new batch")
return False
if red.get(src):
checksum = red.get(src)[: 32].decode()
copy_result = utils.secure_copy(src, dest)
copy_result, checksum = utils.secure_copy2(src, dest, loop)
red.set(src, checksum)
# NOTE: really important: we decrease local var for free space despite the copy result since ltfs
# marks that space as used anyway (even if copy failed).
free_space -= file_to_copy.fsize
if copy_result == 0:
# NOTE: very important to have this; without it no changes to files occur
......@@ -240,12 +264,26 @@ def parse_args(argv):
return parser.parse_args(argv[1:])
class GracefulExiter():
def __init__(self):
self.state = False
signal.signal(signal.SIGINT, self.change_state)
def change_state(self, signum, frame):"Exiting gracefully; repeat CTRL+C to exit forcibly (not recommended)")
signal.signal(signal.SIGINT, signal.SIG_DFL)
self.state = True
def exit(self):
return self.state
# TODO: more args + daemonize
def main(argv=sys.argv):
args = parse_args(argv)
env = bootstrap(args.config_uri)
flag = GracefulExiter()
loop = asyncio.get_event_loop()
while True:
......@@ -258,8 +296,12 @@ def main(argv=sys.argv):
now = time.time()
with env['request'].tm:
if flag.exit():
raise KeyboardInterrupt("Gracefully stopped before rescan.")
dbsession = env['request'].dbsession
queue_len = control_copy_queue(dbsession)
if flag.exit():
raise KeyboardInterrupt("Gracefully stopped after rescan.")
if not queue_len:
while queue_len > 0:
......@@ -270,11 +312,19 @@ def main(argv=sys.argv):
current_job = job_size
# "else" leads to retry
if copy_process(env, loop, current_job):
queue_len -= current_job
iterations += current_job"%d iterations per %d seconds (lazy load)", current_job, time.time() - now)
now = time.time()
log.warning("Copy process interrupted.")
# trying to handle graceful stop
if flag.exit():
raise KeyboardInterrupt("Gracefully stopped the copy process after copy.")
except SQLAlchemyError as e:
......@@ -7,7 +7,7 @@ log = logging.getLogger(__name__)
def secure_copy(src, dst):["mkdir", "-p", "-v", os.path.dirname(dst)], capture_output=True, shell=False)
copy_result =["rsync", "-a", src, dst], capture_output=True, shell=False)
copy_result =["rsync", "--inplace", "--append-verify", "--partial", "-a", src, dst], capture_output=True, shell=False)
if copy_result.stderr:
......@@ -31,7 +31,7 @@ async def cmd(command):
def secure_copy2(src, dst, loop):["mkdir", "-p", "-v", os.path.dirname(dst)], capture_output=True, shell=False)
md5sum_proc = loop.create_task(cmd("md5sum %s" % src))
rsync_proc = loop.create_task(cmd("rsync -a %s %s" % (src, dst)))
rsync_proc = loop.create_task(cmd("rsync --inplace --append-verify --partial -a %s %s" % (src, dst)))
results = loop.run_until_complete(asyncio.gather(md5sum_proc, rsync_proc))
checksum = results[0][1][: 32].decode()
copy_ret_code = results[1][0]
