Commit 897d4ed4 authored by Oleg Borisenko's avatar Oleg Borisenko
Browse files

serious fragmentation issue semi-fixed (now it's isolated to 2 tapes exactly;...

serious fragmentation issue semi-fixed (now it's isolated to 2 tapes exactly; it was all tapes at once)
parent 1563a769
......@@ -17,12 +17,16 @@ from tapebackup import utils
log = logging.getLogger("tapebackup_eventloop")
red = redis.Redis()
def form_wgs_queue(target, files_to_backup, free_space):
def form_wgs_queue(files_to_backup, free_space):
queue_size = 0
wgs_queue = []
files_to_exclude_from_queue = 0
for f in files_to_backup:
current_file_folder = os.path.dirname(f.relative_path)
current_file_folder_level = len(current_file_folder.split('/'))
if current_file_folder_level < 1 or current_file_folder.split('/')[1:2] != ['wgs_data'] : #that means it's not wgs for sure
continue
if f.fsize + queue_size < free_space:
wgs_queue.append(f)
queue_size += f.fsize
......@@ -30,22 +34,59 @@ def form_wgs_queue(target, files_to_backup, free_space):
# now we are shrinking list to match conditions
# i use here extremely dumb heuristic: chance that we got exactly the last file for sequencing
# is about 1/2400 so i assume by default that we didn't and rollback to previous seq data folder
seq_folder = os.path.dirname(os.path.dirname(target.fullpath + wgs_queue[-1].relative_path))
for i in reversed(wgs_queue):
if seq_folder == os.path.dirname(os.path.dirname(target.fullpath + i.relative_path)):
prev_file_folder = os.path.dirname(i.relative_path)
prev_file_folder_level = len(prev_file_folder.split('/'))
# TODO: it would be better if one sequencing result will not be one two tapes (for now it's possible)
# Note: explanation. Folder level for RELATIVE path in wgs file structure can mean the following:
# 0 = may not be at all (no path at all)
# 1 = virtual root - never can get here at all
# 2 = wgs_data folder - there may be some trash like "zlims_test.txt" with no contents.
# If we get it we can ignore this for next tape; doesn't matter
# 3 = Sequencer serial OR external_seqs (the only folder that looks different, but it fits any tape).
# There can be trash files; can wait for next tape
# 4 = Sequencing proc id. In this folder plain files are trash (like "successful_rsync" reports)
# 5 = Lanes L0*. That matters; If we are here and there is no sufficient space, we want to drop all it
# for the next tape for sure. We go back a level and drop every file not fitting previous
# 6 = metrics inside L0*. If we get here we want to go two levels higher to drop this sequencing part
# from current tape.
# 7 or more should never happen. If it goes there, maybe it would be safer to drop just one file.
# important: since target is scanned in lexicographical order and we order by ID, previous file
# should always contain at least previous level of current file.
if current_file_folder_level <= 4:
while current_file_folder_level == prev_file_folder_level: # getting rid of all files from that level
queue_size -= i.fsize
files_to_exclude_from_queue += 1
return wgs_queue[:len(wgs_queue) - files_to_exclude_from_queue], queue_size
elif current_file_folder_level == 5:
# comparing "grandparents" to iterate to previous sequencing proc id
if os.path.dirname(current_file_folder) == os.path.dirname(os.path.dirname(i.relative_path)):
queue_size -= i.fsize
files_to_exclude_from_queue += 1
else:
return wgs_queue[:len(wgs_queue) - files_to_exclude_from_queue], queue_size
elif current_file_folder_level == 6:
# comparing grandgrandparent of "metrics" to previous sequencing proc id
if os.path.dirname(os.path.dirname(current_file_folder)) == os.path.dirname(
os.path.dirname(i.relative_path)):
queue_size -= i.fsize
files_to_exclude_from_queue += 1
else:
return wgs_queue[:len(wgs_queue) - files_to_exclude_from_queue], queue_size
else:
queue_size -= i.fsize
files_to_exclude_from_queue += 1
else:
pass
return wgs_queue[:len(wgs_queue) - files_to_exclude_from_queue], queue_size
else:
return [], 0
return wgs_queue[:len(wgs_queue) - files_to_exclude_from_queue], queue_size
def form_regular_queue(target, files_to_backup, free_space):
def form_regular_queue(files_to_backup, free_space):
queue_size = 0
regular_queue = []
files_to_exclude_from_queue = 0
for f in files_to_backup:
if f.fsize + queue_size < free_space:
regular_queue.append(f)
......@@ -75,15 +116,17 @@ def form_backup_queue(dbsession, free_space):
files_to_backup = dbsession.query(models.FileToBackup). \
filter(and_(models.FileToBackup.tape_label == None,
models.FileToBackup.target_unique_label == target.unique_label,
models.FileToBackup.is_file == True)).all()
models.FileToBackup.is_file == True)).order_by(models.FileToBackup.id).all()
log.info("Listed files")
if target.kind == models.DataKind.wgs:
current_queue, current_queue_size = form_wgs_queue(target, files_to_backup, space_left)
current_queue, current_queue_size = form_wgs_queue(files_to_backup, space_left)
total_copy_queue.extend(current_queue)
total_queue_size += current_queue_size
space_left -= current_queue_size
if target.kind == models.DataKind.vcf or target.kind == models.DataKind.db:
current_queue, current_queue_size = form_regular_queue(target, files_to_backup, space_left)
if target.kind == models.DataKind.vcf \
or target.kind == models.DataKind.db \
or target.kind == models.DataKind.files:
current_queue, current_queue_size = form_regular_queue(files_to_backup, space_left)
total_copy_queue.extend(current_queue)
total_queue_size += current_queue_size
space_left -= current_queue_size
......@@ -96,7 +139,7 @@ def copy_process(env, loop, current_job):
dbsession = env['request'].dbsession
manager = utils.tapemanager.TapeManager(dbsession)
current_batch = models.Batch.get_current(dbsession)
for file_to_copy in current_batch.files_to_backup[0:current_job-1]:
for file_to_copy in current_batch.files_to_backup[0:current_job]:
log.info("Copying %s", file_to_copy.relative_path)
target = dbsession.query(models.BackupTarget).filter(models.BackupTarget.unique_label == file_to_copy.target_unique_label).one()
src = target.fullpath + "/" + file_to_copy.relative_path
......@@ -147,6 +190,12 @@ def control_copy_queue(dbsession):
if len(active_copy_queue) == 0:
manager.finalize_tape()
return None
# for log purposes
total = 0
for i in active_copy_queue:
total += i.fsize
total_gb = total / (1024 ** 3)
log.info("New batch will take %d Gb", total_gb)
current_batch = models.Batch(tape)
current_batch.files_to_backup.extend(active_copy_queue)
dbsession.add(current_batch)
......@@ -155,6 +204,7 @@ def control_copy_queue(dbsession):
active_copy_queue = current_batch.files_to_backup # if none in state "running" form one; else wait
if len(active_copy_queue) == 0:
current_batch.status = models.BatchStatus.finished
log.info("Batch finished")
return None
manager.insert_into_drive(tape.last_seen_slot)
manager.use_tape()
......@@ -205,7 +255,7 @@ def main(argv=sys.argv):
if copy_process(env, loop, current_job):
queue_len -= current_job
iterations += current_job
log.info("%d iterations per %d seconds (lazy load)", job_size, time.time() - now)
log.info("%d iterations per %d seconds (lazy load)", current_job, time.time() - now)
now = time.time()
loop.close()
......
import datetime
import os.path
from pyramid.view import view_config
from pyramid.response import Response
from pyramid.httpexceptions import HTTPException
......@@ -13,6 +14,8 @@ def add_backup_target(request):
try:
target_props = dict((k, request.json_body[k]) for k in ('unique_label', 'kind', 'enabled', 'ignore_filter',
'rescan_interval', 'fullpath'))
# removing trailing slash since it's very important for logics later
target_props['fullpath'] = os.path.normpath(target_props['fullpath'])
new_target = models.backuptarget.BackupTarget(**target_props)
request.dbsession.add(new_target)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment