Commit 7ff75e60 authored by Oleg Borisenko's avatar Oleg Borisenko
Browse files

small fixes

parent be8288ac
......@@ -89,7 +89,7 @@ def form_wgs_queue(files_to_backup, free_space):
return wgs_queue[:len(wgs_queue) - files_to_exclude_from_queue], queue_size
else:
return [], 0
raise Exception("Unreachable state")
return wgs_queue[:len(wgs_queue) - files_to_exclude_from_queue], queue_size
def form_regular_queue(files_to_backup, free_space):
......@@ -118,6 +118,7 @@ def form_backup_queue(dbsession, free_space):
targets = dbsession.query(models.BackupTarget).all()
total_copy_queue = []
total_queue_size = 0
files_to_backup_total = 0
space_left = free_space
for target in targets:
log.info("Listing files")
......@@ -125,6 +126,10 @@ def form_backup_queue(dbsession, free_space):
filter(and_(models.FileToBackup.tape_label == None,
models.FileToBackup.target_unique_label == target.unique_label,
models.FileToBackup.is_file == True)).order_by(models.FileToBackup.id).all()
if not files_to_backup:
continue
files_to_backup_total += len(files_to_backup)
log.info("Listed files")
if target.kind == models.DataKind.wgs:
current_queue, current_queue_size = form_wgs_queue(files_to_backup, space_left)
......@@ -139,7 +144,7 @@ def form_backup_queue(dbsession, free_space):
total_queue_size += current_queue_size
space_left -= current_queue_size
return total_copy_queue
return total_copy_queue, files_to_backup_total
def copy_process(env, loop, current_job):
......@@ -193,7 +198,11 @@ def control_copy_queue(dbsession):
manager.insert_into_drive(tape.last_seen_slot)
manager.use_tape()
free_space = manager.df
active_copy_queue = form_backup_queue(dbsession, free_space)
active_copy_queue, files_to_copy_total = form_backup_queue(dbsession, free_space)
if files_to_copy_total == 0:
log.info("All data has been backed up; waiting for new files. Falling asleep for 5 minutes.")
time.sleep(300)
return None
# this means we can not pack more data to current tape
if len(active_copy_queue) == 0:
manager.finalize_tape()
......@@ -237,6 +246,7 @@ def main(argv=sys.argv):
args = parse_args(argv)
setup_logging(args.config_uri)
env = bootstrap(args.config_uri)
loop = asyncio.get_event_loop()
try:
while True:
queue_len = 0
......@@ -265,15 +275,16 @@ def main(argv=sys.argv):
iterations += current_job
log.info("%d iterations per %d seconds (lazy load)", current_job, time.time() - now)
now = time.time()
loop.close()
time.sleep(1)
time.sleep(2)
except SQLAlchemyError as e:
log.error(str(e))
raise
except Exception as e:
log.error(e)
raise
finally:
loop.close()
if __name__ == '__main__':
main()
\ No newline at end of file
......@@ -30,8 +30,8 @@ async def cmd(command):
def secure_copy2(src, dst, loop):
subprocess.run(["mkdir", "-p", "-v", os.path.dirname(dst)], capture_output=True, shell=False)
md5sum_proc = cmd("md5sum %s" % src)
rsync_proc = cmd("rsync -a %s %s" % (src, dst))
md5sum_proc = loop.create_task(cmd("md5sum %s" % src))
rsync_proc = loop.create_task(cmd("rsync -a %s %s" % (src, dst)))
results = loop.run_until_complete(asyncio.gather(md5sum_proc, rsync_proc))
checksum = results[0][1][: 32].decode()
copy_ret_code = results[1][0]
......
......@@ -292,6 +292,7 @@ class TapeManager:
return None
def insert_into_drive(self, slot):
self.dbsession.begin_nested()
self.scan()
if self.drives[0]['element_address'] == slot:
#already there
......@@ -362,6 +363,8 @@ class TapeManager:
elif self.get_drive_state() is DriveState.data_tape_inside:
matching_tape = self.dbsession.query(models.Tape).filter(models.Tape.label == self.drives[0][self.PRIMARY_VOLUME_TAG]).one_or_none()
if matching_tape:
# just selfcheck
log.info("Identified tape %s, drive has the tape %s", matching_tape.label, self.drives[0][self.PRIMARY_VOLUME_TAG])
return matching_tape
else:
raise Exception("Tape %s not found in inventory; you need to rescan" % self.drives[0][self.PRIMARY_VOLUME_TAG])
......@@ -372,7 +375,7 @@ class TapeManager:
def use_tape(self):
tape = self.identify_tape()
if tape.state == models.TapeState.finalized:
raise Exception("The tape is finalized; you can not use it for backup")
raise Exception("The tape %s is finalized; you can not use it for backup", tape.label)
elif tape.state == models.TapeState.inuse:
# checking consistency of TapeState in database: it should be mounted or mountable
if self.mount():
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment