Commit 06c535b4 authored by Oleg Borisenko's avatar Oleg Borisenko
Browse files

optimized queries a bit; logging; redis

parent 8499fe9b
......@@ -45,13 +45,7 @@ listen = 0.0.0.0:6543
###
[loggers]
keys = root, tapebackup, sqlalchemy, alembic
[handler_filelog]
class = handlers.TimedRotatingFileHandler
args = ('/var/log/tapebackup.log', 'midnight', 1, 30, 'utf-8')
level = INFO
formatter = generic
keys = root, tapebackup, tapebackup_eventloop, sqlalchemy, alembic
[handlers]
keys = console, filelog
......@@ -65,12 +59,12 @@ handlers = console, filelog
[logger_tapebackup]
level = DEBUG
handlers = console
handlers =
qualname = tapebackup
[logger_tapebackup_eventloop]
level = DEBUG
handlers = console
handlers =
qualname = tapebackup
[logger_sqlalchemy]
......@@ -89,7 +83,13 @@ qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
level = WARN
formatter = generic
[handler_filelog]
class = handlers.TimedRotatingFileHandler
args = ('/var/log/tapebackup.log', 'midnight', 1, 30, 'utf-8')
level = INFO
formatter = generic
[formatter_generic]
......
......@@ -21,6 +21,7 @@ pyramid-tm==2.4
PYSCSI==2.0.1
python-dateutil==2.8.1
python-editor==1.0.4
redis==3.5.3
repoze.lru==0.7
six==1.15.0
SQLAlchemy==1.4.11
......
......@@ -36,7 +36,7 @@ class Batch(Base):
files_to_backup = relationship("FileToBackup",
secondary=association_table,
back_populates="batches",
lazy='subquery')
lazy='select')
# files_to_backup = association_proxy("CopyQueue", "batch", creator=lambda bat: CopyQueue(batch=bat))
def change_status(self, status):
......
import argparse
import asyncio
import datetime
import logging
import os
import redis
import sys
import time
......@@ -12,6 +15,7 @@ from tapebackup import utils
log = logging.getLogger("tapebackup_eventloop")
red = redis.Redis()
def form_wgs_queue(target, files_to_backup, free_space):
......@@ -86,7 +90,7 @@ def form_backup_queue(dbsession, free_space):
return total_copy_queue
def copy_process(env):
def copy_process(env, loop):
with env['request'].tm:
dbsession = env['request'].dbsession
current_batch = models.Batch.get_current(dbsession)
......@@ -95,14 +99,20 @@ def copy_process(env):
target = dbsession.query(models.BackupTarget).filter(models.BackupTarget.unique_label == file_to_copy.target_unique_label).one()
src = target.fullpath + "/" + file_to_copy.relative_path
dest = "/srv_mount_dont_go_here/tapes/" + current_batch.tape_id + "/" + file_to_copy.relative_path
copy_result, checksum = utils.secure_copy(src, dest)
if red.get(src):
checksum = red.get(src)[: 32].decode()
copy_result = utils.secure_copy(src, dest)
else:
copy_result, checksum = utils.secure_copy2(src, dest, loop)
red.set(src, checksum)
if copy_result == 0:
# NOTE: very important to have this; without it no changes to files occur
dbsession.begin_nested()
file_to_copy.tape_label = current_batch.tape_id
file_to_copy.checksum = checksum
file_to_copy.copied_at_time = datetime.datetime.now()
current_batch.files_to_backup.remove(file_to_copy)
log.info("Copy successfully, now queue len is %d", len(current_batch.files_to_backup))
log.info("Copied successfully, now queue len is %d", len(current_batch.files_to_backup))
return True
else:
log.error("Failed to copy %s, return code was %d", file_to_copy.relative_path, copy_result)
......@@ -161,16 +171,24 @@ def main(argv=sys.argv):
args = parse_args(argv)
setup_logging(args.config_uri)
env = bootstrap(args.config_uri)
loop = asyncio.get_event_loop()
try:
while True:
queue_len = 0
iterations = 0
now = time.time()
with env['request'].tm:
dbsession = env['request'].dbsession
queue_len = control_copy_queue(dbsession)
while queue_len > 0:
if copy_process(env):
if copy_process(env, loop):
queue_len -= 1
iterations += 1
if not iterations % 100:
log.debug("100 iterations per %d seconds (lazy load)", time.time() - now)
now = time.time()
time.sleep(2)
except SQLAlchemyError as e:
......@@ -179,6 +197,8 @@ def main(argv=sys.argv):
except Exception as e:
log.error(e)
raise
finally:
loop.close()
if __name__ == '__main__':
main()
\ No newline at end of file
from .tapemanager import TapeManager
from .copy_controller import secure_copy
\ No newline at end of file
from .copy_controller import secure_copy, secure_copy2
\ No newline at end of file
import asyncio
import logging
import os
import subprocess
......@@ -6,15 +7,32 @@ log = logging.getLogger(__name__)
def secure_copy(src, dst):
subprocess.run(["mkdir", "-p", "-v", os.path.dirname(dst)], capture_output=True, shell=False)
md5sum_result = subprocess.run(["md5sum", src], capture_output=True, shell=False)
checksum = md5sum_result.stdout[:32].decode()
if md5sum_result.stderr:
log.error(md5sum_result.stderr.decode())
else:
log.info(checksum)
copy_result = subprocess.run(["rsync", "-a", src, dst], capture_output=True, shell=False)
if copy_result.stderr:
log.error(copy_result.stderr.decode())
else:
log.info(copy_result.stdout.decode())
return copy_result.returncode, checksum
return copy_result.returncode
async def cmd(command):
proc = await asyncio.create_subprocess_shell(
command, stdin=None, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
log.debug("Proc created: %s" % command)
stdout, stderr = await proc.communicate()
exit_code = proc.returncode
log.debug("Finish: %s" % command)
if stderr:
log.error(stderr.decode())
else:
log.info(stdout.decode())
return exit_code, stdout
def secure_copy2(src, dst, loop):
subprocess.run(["mkdir", "-p", "-v", os.path.dirname(dst)], capture_output=True, shell=False)
md5sum_proc = cmd("md5sum %s" % src)
rsync_proc = cmd("rsync -a %s %s" % (src, dst))
results = loop.run_until_complete(asyncio.gather(md5sum_proc, rsync_proc))
checksum = results[0][1][: 32].decode()
copy_ret_code = results[1][0]
return copy_ret_code, checksum
\ No newline at end of file
......@@ -297,6 +297,7 @@ class TapeManager:
return self.drive_state
def identify_tape(self):
self.scan()
if self.get_drive_state() is DriveState.empty:
raise Exception("Tape drive is empty")
elif self.get_drive_state() is DriveState.cleaning_tape_inside:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment