Commit a4b3e221 authored by Oleg Borisenko's avatar Oleg Borisenko
Browse files

several issues with tape space fixed

parent 0f6a8989
......@@ -49,9 +49,6 @@ class Batch(Base):
# very dumb code; too tired to make in elegant
@staticmethod
def get_current(dbsession):
finished = dbsession.query(Batch).filter(Batch.status == BatchStatus.finished).one_or_none()
if finished:
return finished
paused = dbsession.query(Batch).filter(Batch.status == BatchStatus.paused).one_or_none()
if paused:
return paused
......
......@@ -23,21 +23,22 @@ def form_wgs_queue(target, files_to_backup, free_space):
wgs_queue = []
files_to_exclude_from_queue = 0
for f in files_to_backup:
if f.is_file:
if f.fsize + queue_size < free_space:
wgs_queue.append(f)
queue_size += f.fsize
else:
# now we are shrinking list to match conditions
# i use here extremely dumb heuristic: chance that we got exactly the last file for sequencing
# is about 1/2400 so i assume by default that we didn't and rollback to previous seq data folder
seq_folder = os.path.dirname(os.path.dirname(target.fullpath + wgs_queue[-1].relative_path))
for i in reversed(wgs_queue):
if seq_folder == os.path.dirname(os.path.dirname(target.fullpath + i.relative_path)):
queue_size -= i.fsize
files_to_exclude_from_queue += 1
else:
pass
if f.fsize + queue_size < free_space:
wgs_queue.append(f)
queue_size += f.fsize
elif wgs_queue:
# now we are shrinking list to match conditions
# i use here extremely dumb heuristic: chance that we got exactly the last file for sequencing
# is about 1/2400 so i assume by default that we didn't and rollback to previous seq data folder
seq_folder = os.path.dirname(os.path.dirname(target.fullpath + wgs_queue[-1].relative_path))
for i in reversed(wgs_queue):
if seq_folder == os.path.dirname(os.path.dirname(target.fullpath + i.relative_path)):
queue_size -= i.fsize
files_to_exclude_from_queue += 1
else:
pass
else:
return [], 0
return wgs_queue[:len(wgs_queue) - files_to_exclude_from_queue], queue_size
......@@ -46,10 +47,9 @@ def form_regular_queue(target, files_to_backup, free_space):
regular_queue = []
files_to_exclude_from_queue = 0
for f in files_to_backup:
if f.is_file:
if f.fsize + queue_size < free_space:
regular_queue.append(f)
queue_size += f.fsize
if f.fsize + queue_size < free_space:
regular_queue.append(f)
queue_size += f.fsize
return regular_queue, queue_size
......@@ -74,7 +74,8 @@ def form_backup_queue(dbsession, free_space):
log.info("Listing files")
files_to_backup = dbsession.query(models.FileToBackup). \
filter(and_(models.FileToBackup.tape_label == None,
models.FileToBackup.target_unique_label == target.unique_label)).all()
models.FileToBackup.target_unique_label == target.unique_label,
models.FileToBackup.is_file == True)).all()
log.info("Listed files")
if target.kind == models.DataKind.wgs:
current_queue, current_queue_size = form_wgs_queue(target, files_to_backup, space_left)
......@@ -93,12 +94,16 @@ def form_backup_queue(dbsession, free_space):
def copy_process(env, loop, current_job):
with env['request'].tm:
dbsession = env['request'].dbsession
manager = utils.tapemanager.TapeManager(dbsession)
current_batch = models.Batch.get_current(dbsession)
for file_to_copy in current_batch.files_to_backup[0:current_job-1]:
log.info("Copying %s", file_to_copy.relative_path)
target = dbsession.query(models.BackupTarget).filter(models.BackupTarget.unique_label == file_to_copy.target_unique_label).one()
src = target.fullpath + "/" + file_to_copy.relative_path
dest = "/srv_mount_dont_go_here/tapes/" + current_batch.tape_id + "/" + file_to_copy.relative_path
free_space = manager.df
if free_space < file_to_copy.fsize:
raise Exception("Batch doesn't fit the tape size; need to stop immediately and investigate!")
if red.get(src):
checksum = red.get(src)[: 32].decode()
copy_result = utils.secure_copy(src, dest)
......@@ -148,6 +153,9 @@ def control_copy_queue(dbsession):
else:
tape = current_batch.get_tape()
active_copy_queue = current_batch.files_to_backup # if none in state "running" form one; else wait
if len(active_copy_queue) == 0:
current_batch.status = models.BatchStatus.finished
return None
manager.insert_into_drive(tape.last_seen_slot)
manager.use_tape()
# check the tape is the same
......@@ -185,6 +193,8 @@ def main(argv=sys.argv):
with env['request'].tm:
dbsession = env['request'].dbsession
queue_len = control_copy_queue(dbsession)
if not queue_len:
continue
while queue_len > 0:
if queue_len - job_size <= 0:
current_job = queue_len
......
......@@ -33,6 +33,7 @@ class TapeManager:
self.tape_drive_device = '/dev/sg3'
self.mkltfs_binary = '/usr/local/bin/mkltfs '
# canonical mount "/usr/local/bin/ltfs /mnt/tape/ -o devname=/dev/sg3"
self.mountpoint = None
self.mountpoint_base = "/srv_mount_dont_go_here/tapes/"
self.mount_command = '/usr/local/bin/ltfs'
self.umount_command = 'umount'
......@@ -53,13 +54,23 @@ class TapeManager:
is_data_tape = True if label and slot[self.PRIMARY_VOLUME_TAG].decode('utf-8')[0:3] != "CLN" else False
return label, is_data_tape
def disk_usage(self, mountpoint):
total, used, free = shutil.disk_usage(mountpoint)
# NOTE: we reserve 200Mb for final index
free = free - 200 * 1024 * 1024
def disk_usage(self):
if self.mountcheck():
self.mountpoint = mountpoint = self.mountpoint_base + self.drives[0][self.PRIMARY_VOLUME_TAG]
else:
self.df = 0
return
total, used, free = shutil.disk_usage(self.mountpoint)
# NOTE: "free" is bullshit for tapes (really big difference). Correct value is total - used
# NOTE: we reserve 200Mb for final index + 300Gb since tape blocks itself on low space. Also there is
# a strange behaviour: sys reports 250Gb more space than really is.
# Other sources recommend 1.2Tb, but it seems experimentally that 300Gb should be ok:
# "LTO-8 tape stores up to 11.3 TB and are a lot faster at 250-300MB/s.
# see more on https://yoyotta.com/help/LTO_FAQ.html"
really_free = total - used - 200 * 1024 * 1024 - 300 * 1024 * 1024 * 1024
log.info("Tape %s mounted and it has %d capacity, %d used, %d free" % (
self.drives[0][self.PRIMARY_VOLUME_TAG], total, used, free))
self.df = free
self.drives[0][self.PRIMARY_VOLUME_TAG], total, used, really_free))
self.df = really_free
def mountcheck(self):
mountpoint = self.mountpoint_base + self.drives[0][self.PRIMARY_VOLUME_TAG]
......@@ -82,7 +93,8 @@ class TapeManager:
if mount_completed.returncode != 0:
log.error("Unable to mount tape %s to %s" % (self.drives[0][self.PRIMARY_VOLUME_TAG], mountpoint))
return False
self.disk_usage(mountpoint)
self.mountpoint = mountpoint
self.disk_usage()
return True
......@@ -97,12 +109,15 @@ class TapeManager:
mountpoint = self.mountpoint_base + self.drives[0][self.PRIMARY_VOLUME_TAG]
hash_filename = "%s_HASHES.md5.txt"%self.drives[0][self.PRIMARY_VOLUME_TAG]
tmp_hash_file = "/home/evogen/" + hash_filename
log.info("Starting to finalize the tape %s", self.drives[0][self.PRIMARY_VOLUME_TAG])
md5deep_completed = subprocess.run(["md5deep",
"-r",
"-W", tmp_hash_file,
mountpoint], capture_output=True, shell=False)
if md5deep_completed.returncode != 0:
log.info("Checksums complete")
if md5deep_completed.returncode == 0:
secure_copy(tmp_hash_file, mountpoint+"/"+hash_filename)
log.info("Finalized the tape %s", self.drives[0][self.PRIMARY_VOLUME_TAG])
else:
log.error("Couldn't compute hashes; %s", hash_filename)
tape = self.identify_tape()
......@@ -141,6 +156,7 @@ class TapeManager:
if umount_completed.returncode:
raise Exception("Failed to unmount => can not move tape. Try again when tape is not used.")
else:
self.mountpoint = None
log.info("Unmounted tape %s successfully", self.drives[0][self.PRIMARY_VOLUME_TAG])
self.df = 0
empty_slot = self.find_first_empty_magazine_slot()
......@@ -249,6 +265,7 @@ class TapeManager:
self.scan_transport(device, element_address_assignment)
# Identify known and new tapes
self.scan_tapes()
self.disk_usage()
self.dbsession.flush()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment