From e8ffe8cb7860d306c8873627f14dc0ead2afa641 Mon Sep 17 00:00:00 2001 From: Brian Date: Sat, 28 Sep 2019 23:14:16 +0100 Subject: [PATCH] Recursive ls seems to works --- main.py | 83 +++++++++++++++------------------------------ src/drive_helper.py | 41 ---------------------- src/job.py | 64 ++++++++++------------------------ src/worker.py | 82 +++++++++++++++++++++----------------------- 4 files changed, 84 insertions(+), 186 deletions(-) delete mode 100644 src/drive_helper.py diff --git a/main.py b/main.py index f8c5963..3b8833e 100644 --- a/main.py +++ b/main.py @@ -1,79 +1,55 @@ #!/usr/bin/env python -import os -import sys -import json import argparse -import time -import webbrowser -import logging +import json import queue -import threading import signal +import threading +import time +import logging +from boxsdk import Client +from pathlib import Path -import src.setup # pylint: disable=unused-import +from src.job import Job +from src.setup import setup_logger +from src.auth_helper import init_oauth +from src.const import SETTING_FILE from src.worker import Worker -from src.token_manager import TokenManager -from src.auth_helper import get_sign_in_url, get_token_from_code -from src.drive_helper import DriveHelper, get_user -from src.job import JobDirectory - - -def interactive_confirm(): - inp = input("Confirm? (y/N): ") - if inp.lower() != 'y': - print('Exiting') - sys.exit(1) def main(): - with open('sync_settings.json') as f: - SETTINGS = json.load(f) - logging.info('Loaded Settings: %s', SETTINGS) + setup_logger() + with open(SETTING_FILE) as f: + settings = json.load(f) parser = argparse.ArgumentParser() - parser.add_argument("baseItemId", nargs='?', default='', help="base itemId (ABC12345!00001)") - parser.add_argument("remote", nargs='?', default='', help="remote path to sync") - parser.add_argument("local", nargs='?', default='', help="local path to sync") + parser.add_argument("itemId", nargs='?', default=None, help="Item ID to download, use 0 for root") + parser.add_argument("localDirectory", nargs='?', default='', help="Local path of the item") parser.add_argument("-y", "--yes", help="skip confirmation dialogue", action="store_true") args = parser.parse_args() + client = Client(init_oauth()) q = queue.Queue() - if args.baseItemId: - remote = args.remote.rstrip('/') - local = os.path.expanduser(args.local.rstrip('/')) - print('baseItemId: [{0}]'.format(args.baseItemId)) - print('driveRoot: [{0}]'.format(args.driveRoot)) - print('Syncing Remote: [{0}]'.format(remote)) - print('With Local: [{0}]'.format(local)) - q.put(JobDirectory(args.baseItemId, remote, local)) + if args.itemId is not None: + local = Path('args.localDirectory') + folder = client.folder(args.itemId).get(['name', 'id', 'size', 'modified_at', 'path_collection']) + q.put(Job(folder, local)) else: + print('Not implemented reading from settings yet, using test data') + local = Path('Temp') + folder = client.folder('0').get(['name', 'id', 'size', 'modified_at', 'path_collection']) + q.put(Job(folder, local)) + ''' for job in SETTINGS.get('jobs', []): q.put(JobDirectory(job['itemId'], job['remote'], job['local'])) print('Processing jobs in setting file') - - if not (args.yes or SETTINGS.get('defaultYes', False)): - interactive_confirm() - - try: - token_manager = TokenManager('token.json') - get_user(token_manager.get_token()) # Check token validity - except Exception: - logging.warning('Token not working, logging in') - sign_in_url, state = get_sign_in_url() - webbrowser.open(sign_in_url, new=2) - print('After logging in, please paste the entire callback URL (such as http://localhost:8000/......)') - callback_url = input('Paste here: ') - token = get_token_from_code(callback_url, state) - token_manager = TokenManager('token.json', token) - logging.info('Token successfully loaded') + ''' threads = [] - drive_helper = DriveHelper(token_manager) interrupt_flag = {'exit': False} - worker_object = Worker(q, drive_helper, SETTINGS.get('blacklist', []), interrupt_flag) + worker_object = Worker(q, client, settings.get('blacklist', []), interrupt_flag) - thread_count = SETTINGS.get('thread_count', 4) + thread_count = settings.get('thread_count', 4) logging.info('Launching %s threads', thread_count) original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) for _ in range(thread_count): @@ -109,9 +85,6 @@ def main(): for f in sorted(worker_object.downloaded): print('-', f) - print(f'ls API call count: {drive_helper.ls_call_counts}') - print(f'dl API call count: {drive_helper.dl_call_counts}') - if __name__ == '__main__': main() diff --git a/src/drive_helper.py b/src/drive_helper.py deleted file mode 100644 index a5d0a30..0000000 --- a/src/drive_helper.py +++ /dev/null @@ -1,41 +0,0 @@ -from urllib.parse import quote -from requests_oauthlib import OAuth2Session -import requests - -from src.job import JobFile, JobDirectory - -graph_url = 'https://graph.microsoft.com/v1.0' - - -def get_user(token): - graph_client = OAuth2Session(token=token) - # Send GET to /me - user = graph_client.get('{0}/me'.format(graph_url)) - # Return the JSON result - return user.json() - - -class DriveHelper: - def __init__(self, token_manager): - # /me/drive/sharedWithMe?select=name,parentReference,remoteItem - # driveId = parentReference.driveId - # driveRoot = remoteItem.id - self.token_manager = token_manager - self.ls_call_counts = 0 - self.dl_call_counts = 0 - - def get_dir(self, job: JobDirectory): - graph_client = OAuth2Session(token=self.token_manager.get_token()) - self.ls_call_counts += 1 - return graph_client.get(graph_url + job.get_url()).json()['value'] - - def download_file(self, job: JobFile): - # remote = ':/' + remote + ':' - # url = '{0}/drives/{2}/items/{3}{1}/content'.format(graph_url, quote(remote), self.driveId, self.driveRoot) - self.dl_call_counts = 0 - with requests.get(job.url, stream=True) as r: - r.raise_for_status() - with open(job.local, 'wb') as f: - for chunk in r.iter_content(chunk_size=1048576): - if chunk: # filter out keep-alive new chunks - f.write(chunk) diff --git a/src/job.py b/src/job.py index e9ae268..2c80dbe 100644 --- a/src/job.py +++ b/src/job.py @@ -1,50 +1,20 @@ -import copy -from urllib.parse import quote +from pathlib import PurePosixPath, Path + +from boxsdk.object.item import Item + + +def get_remote_path(box_item: Item): + p = PurePosixPath('') + for entry in box_item.path_collection['entries']: + if entry.id == '0': + continue + p = p / entry.name + p = p / box_item.name + return str(p) class Job: - def __init__(self, base_item_id, remote, local): - self.base_item_id = base_item_id - self.remote = remote - self.local = local - - @staticmethod - def dir_concat(d, f): - d = d.rstrip('/') - return f if d == '' else d + '/' + f - - -class JobDirectory(Job): - def __init__(self, base_item_id, remote, local, item_id=None): - super().__init__(base_item_id, remote, local) - self.item_id = item_id - - def process_child(self, child): - new_copy = copy.deepcopy(self) - new_copy.remote = self.dir_concat(self.remote, child['name']) - new_copy.local = self.dir_concat(self.local, child['name']) - if 'folder' in child: - new_copy.item_id = child['id'] - else: - new_copy.__class__ = JobFile - new_copy.file_size = child['size'] - new_copy.url = child['@microsoft.graph.downloadUrl'] - return new_copy - - def get_url(self): - if self.item_id is None: - item_id = '{}{}'.format(self.base_item_id, quote(':/' + self.remote + ':' if self.remote else '')) - else: - item_id = self.item_id - - return '/drives/{0}/items/{1}/children' \ - '?select=name,folder,size,id,createdDateTime,@microsoft.graph.downloadUrl' \ - '&top=1000' \ - .format(self.base_item_id.split('!')[0], item_id) - - -class JobFile(Job): - def __init__(self, base_item_id, remote, local, file_size, url=None): - super().__init__(base_item_id, remote, local) - self.file_size = file_size - self.url = url + def __init__(self, box_item: Item, local_path: Path): + self.box_item = box_item + self.local_path = local_path + self.remote_path = get_remote_path(box_item) diff --git a/src/worker.py b/src/worker.py index fd542a7..d3626b5 100644 --- a/src/worker.py +++ b/src/worker.py @@ -1,8 +1,10 @@ import logging import os.path from fnmatch import fnmatch +from boxsdk import Client +from boxsdk.object.folder import Folder -from src.job import JobDirectory, JobFile +from .job import Job def dir_get_parent(d): @@ -12,74 +14,68 @@ def dir_get_parent(d): return '' -def human_readable_bytes(B): +def human_readable_bytes(b): """Return the given bytes as a human friendly KB, MB, GB, or TB string""" - B = float(B) - KB = float(1024) - MB = float(KB ** 2) # 1,048,576 - GB = float(KB ** 3) # 1,073,741,824 - TB = float(KB ** 4) # 1,099,511,627,776 - if B < KB: - return '{0} {1}'.format(B, 'Bytes' if 0 == B > 1 else 'Byte') - if KB <= B < MB: - return '{0:.2f} KB'.format(B / KB) - if MB <= B < GB: - return '{0:.2f} MB'.format(B / MB) - if GB <= B < TB: - return '{0:.2f} GB'.format(B / GB) - return '{0:.2f} TB'.format(B / TB) + b = float(b) + kb = float(1024) + mb = float(kb ** 2) # 1,048,576 + gb = float(kb ** 3) # 1,073,741,824 + tb = float(kb ** 4) # 1,099,511,627,776 + if b < kb: + return '{0} {1}'.format(b, 'Bytes' if 0 == b > 1 else 'Byte') + if kb <= b < mb: + return '{0:.2f} KB'.format(b / kb) + if mb <= b < gb: + return '{0:.2f} MB'.format(b / mb) + if gb <= b < tb: + return '{0:.2f} GB'.format(b / gb) + return '{0:.2f} TB'.format(b / tb) class Worker: - def __init__(self, queue, drive_helper, blacklist, interrupt_flag): + def __init__(self, queue, client: Client, blacklist, interrupt_flag): self.queue = queue - self.drive = drive_helper + self.client = client self.blacklist = blacklist or [] - self.errors = [] - self.downloaded = set() self.interrupt_flag = interrupt_flag + self.downloaded = set() + self.errors = [] def work(self): logging.warning('A worker thread launched') while not self.interrupt_flag['exit']: - item = self.queue.get() + job = self.queue.get() - if item is None: - # logging.warning('A worker thread exited') + if (job is None) or (any(fnmatch(job.remote_path, x) for x in self.blacklist)): + logging.info('A worker thread exited') break - elif isinstance(item, JobDirectory): - self.do_folder(item) + elif isinstance(job.box_item, Folder): + self.do_folder(job) else: - self.do_file(item) + self.do_file(job) self.queue.task_done() - def do_folder(self, job: JobDirectory): - if any(fnmatch(job.remote, x) for x in self.blacklist): - logging.info('Skipping folder [%s]', job.remote) - return None - + def do_folder(self, job: Job): try: - logging.info('Fetching folder [%s]', job.remote) - children = self.drive.get_dir(job) + logging.info('Fetching folder [%s]', job.remote_path) + children = job.box_item.get_items(limit=5000, fields=['name', 'id', 'size', 'modified_at', 'path_collection']) except Exception as e: - logging.error('Fail to ls [%s]: %s: %s', job.remote, type(e).__name__, e) - self.errors.append(job.remote) + logging.error('Fail to ls [%s]: %s: %s', job.remote_path, type(e).__name__, e) + self.errors.append(job.remote_path) return None try: for child in children: - self.queue.put(job.process_child(child)) + self.queue.put(Job(child, job.local_path / child.name)) except Exception as e: logging.error( - 'Fail to process directory [%s]: %s', job.remote, e) - self.errors.append(job.remote) - return None - - def do_file(self, job: JobFile): - if any(fnmatch(job.remote, x) for x in self.blacklist): - logging.info('Skipping file [%s]', job.remote) + 'Fail to process directory [%s]: %s', job.remote_path, e) + self.errors.append(job.remote_path) return None + def do_file(self, job: Job): + print(f'Supposed to download file {job.remote_path} to {job.local_path}') + return None try: if os.path.isfile(job.local): local_size = os.path.getsize(job.local)