diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a86648e --- /dev/null +++ b/.gitignore @@ -0,0 +1,119 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +token.json +sync_settings.json diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..5c98b42 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,2 @@ +# Default ignored files +/workspace.xml \ No newline at end of file diff --git a/.idea/box-dl.iml b/.idea/box-dl.iml new file mode 100644 index 0000000..8557075 --- /dev/null +++ b/.idea/box-dl.iml @@ -0,0 +1,11 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..a8b5c69 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..47e4ef0 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/lint.bat b/lint.bat new file mode 100644 index 0000000..daf9c4d --- /dev/null +++ b/lint.bat @@ -0,0 +1 @@ +autopep8 --recursive --aggressive --aggressive --in-place --exclude env . diff --git a/main.py b/main.py new file mode 100644 index 0000000..f8c5963 --- /dev/null +++ b/main.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python +import os +import sys +import json +import argparse +import time +import webbrowser +import logging +import queue +import threading +import signal + +import src.setup # pylint: disable=unused-import +from src.worker import Worker +from src.token_manager import TokenManager +from src.auth_helper import get_sign_in_url, get_token_from_code +from src.drive_helper import DriveHelper, get_user +from src.job import JobDirectory + + +def interactive_confirm(): + inp = input("Confirm? (y/N): ") + if inp.lower() != 'y': + print('Exiting') + sys.exit(1) + + +def main(): + with open('sync_settings.json') as f: + SETTINGS = json.load(f) + logging.info('Loaded Settings: %s', SETTINGS) + + parser = argparse.ArgumentParser() + parser.add_argument("baseItemId", nargs='?', default='', help="base itemId (ABC12345!00001)") + parser.add_argument("remote", nargs='?', default='', help="remote path to sync") + parser.add_argument("local", nargs='?', default='', help="local path to sync") + parser.add_argument("-y", "--yes", help="skip confirmation dialogue", action="store_true") + args = parser.parse_args() + + q = queue.Queue() + + if args.baseItemId: + remote = args.remote.rstrip('/') + local = os.path.expanduser(args.local.rstrip('/')) + print('baseItemId: [{0}]'.format(args.baseItemId)) + print('driveRoot: [{0}]'.format(args.driveRoot)) + print('Syncing Remote: [{0}]'.format(remote)) + print('With Local: [{0}]'.format(local)) + q.put(JobDirectory(args.baseItemId, remote, local)) + else: + for job in SETTINGS.get('jobs', []): + q.put(JobDirectory(job['itemId'], job['remote'], job['local'])) + print('Processing jobs in setting file') + + if not (args.yes or SETTINGS.get('defaultYes', False)): + interactive_confirm() + + try: + token_manager = TokenManager('token.json') + get_user(token_manager.get_token()) # Check token validity + except Exception: + logging.warning('Token not working, logging in') + sign_in_url, state = get_sign_in_url() + webbrowser.open(sign_in_url, new=2) + print('After logging in, please paste the entire callback URL (such as http://localhost:8000/......)') + callback_url = input('Paste here: ') + token = get_token_from_code(callback_url, state) + token_manager = TokenManager('token.json', token) + logging.info('Token successfully loaded') + + threads = [] + drive_helper = DriveHelper(token_manager) + interrupt_flag = {'exit': False} + worker_object = Worker(q, drive_helper, SETTINGS.get('blacklist', []), interrupt_flag) + + thread_count = SETTINGS.get('thread_count', 4) + logging.info('Launching %s threads', thread_count) + original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) + for _ in range(thread_count): + t = threading.Thread(target=worker_object.work) + t.start() + threads.append(t) + signal.signal(signal.SIGINT, original_sigint_handler) + + try: + # block until all tasks are done + while not q.empty(): + time.sleep(1) # Interruptable + q.join() + # stop workers + for _ in range(thread_count): + q.put(None) + except KeyboardInterrupt: + print('Keyboard Interrupt, waiting for current operations to finish') + interrupt_flag['exit'] = True + for t in threads: + t.join() + + # Print all failed stuff + worker_object.errors.sort() + if worker_object.errors: + print("Encountered error on the following files/dir:") + for f in worker_object.errors: + print('-', f) + + # Print all Downloaded stuff + if worker_object.downloaded: + print("Downloaded files in the following dir:") + for f in sorted(worker_object.downloaded): + print('-', f) + + print(f'ls API call count: {drive_helper.ls_call_counts}') + print(f'dl API call count: {drive_helper.dl_call_counts}') + + +if __name__ == '__main__': + main() diff --git a/pylintrc b/pylintrc new file mode 100644 index 0000000..c5f6227 --- /dev/null +++ b/pylintrc @@ -0,0 +1,8 @@ +[MESSAGES CONTROL] +disable= + missing-docstring, + invalid-name + +[FORMAT] +expected-line-ending-format=LF +max-line-length=120 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..145bfab --- /dev/null +++ b/requirements.txt @@ -0,0 +1,19 @@ +astroid==2.2.5 +autopep8==1.4.4 +certifi==2019.6.16 +chardet==3.0.4 +colorama==0.4.1 +idna==2.8 +isort==4.3.20 +lazy-object-proxy==1.4.1 +mccabe==0.6.1 +oauthlib==3.0.1 +pycodestyle==2.5.0 +pylint==2.3.1 +PyYAML==5.1.1 +requests==2.22.0 +requests-oauthlib==1.2.0 +six==1.12.0 +typed-ast==1.4.0 +urllib3==1.25.3 +wrapt==1.11.2 diff --git a/src/auth_helper.py b/src/auth_helper.py new file mode 100644 index 0000000..8b47dba --- /dev/null +++ b/src/auth_helper.py @@ -0,0 +1,47 @@ +import os +import yaml +from requests_oauthlib import OAuth2Session + +# This is necessary for testing with non-HTTPS localhost +# Remove this if deploying to production +os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' + +# This is necessary because Azure does not guarantee +# to return scopes in the same case and order as requested +os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1' +os.environ['OAUTHLIB_IGNORE_SCOPE_CHANGE'] = '1' + +# Load the oauth_settings.yml file +stream = open('oauth_settings.yml', 'r') +settings = yaml.load(stream, Loader=yaml.BaseLoader) +authorize_url = '{0}{1}'.format(settings['authority'], settings['authorize_endpoint']) +token_url = '{0}{1}'.format(settings['authority'], settings['token_endpoint']) + +# Method to generate a sign-in url + + +def get_sign_in_url(): + # Initialize the OAuth client + aad_auth = OAuth2Session(settings['app_id'], + scope=settings['scopes'], + redirect_uri=settings['redirect']) + + sign_in_url, state = aad_auth.authorization_url(authorize_url, prompt='login') + + return sign_in_url, state + +# Method to exchange auth code for access token + + +def get_token_from_code(callback_url, expected_state): + # Initialize the OAuth client + aad_auth = OAuth2Session(settings['app_id'], + state=expected_state, + scope=settings['scopes'], + redirect_uri=settings['redirect']) + + token = aad_auth.fetch_token(token_url, + client_secret=settings['app_secret'], + authorization_response=callback_url) + + return token diff --git a/src/drive_helper.py b/src/drive_helper.py new file mode 100644 index 0000000..a5d0a30 --- /dev/null +++ b/src/drive_helper.py @@ -0,0 +1,41 @@ +from urllib.parse import quote +from requests_oauthlib import OAuth2Session +import requests + +from src.job import JobFile, JobDirectory + +graph_url = 'https://graph.microsoft.com/v1.0' + + +def get_user(token): + graph_client = OAuth2Session(token=token) + # Send GET to /me + user = graph_client.get('{0}/me'.format(graph_url)) + # Return the JSON result + return user.json() + + +class DriveHelper: + def __init__(self, token_manager): + # /me/drive/sharedWithMe?select=name,parentReference,remoteItem + # driveId = parentReference.driveId + # driveRoot = remoteItem.id + self.token_manager = token_manager + self.ls_call_counts = 0 + self.dl_call_counts = 0 + + def get_dir(self, job: JobDirectory): + graph_client = OAuth2Session(token=self.token_manager.get_token()) + self.ls_call_counts += 1 + return graph_client.get(graph_url + job.get_url()).json()['value'] + + def download_file(self, job: JobFile): + # remote = ':/' + remote + ':' + # url = '{0}/drives/{2}/items/{3}{1}/content'.format(graph_url, quote(remote), self.driveId, self.driveRoot) + self.dl_call_counts = 0 + with requests.get(job.url, stream=True) as r: + r.raise_for_status() + with open(job.local, 'wb') as f: + for chunk in r.iter_content(chunk_size=1048576): + if chunk: # filter out keep-alive new chunks + f.write(chunk) diff --git a/src/job.py b/src/job.py new file mode 100644 index 0000000..e9ae268 --- /dev/null +++ b/src/job.py @@ -0,0 +1,50 @@ +import copy +from urllib.parse import quote + + +class Job: + def __init__(self, base_item_id, remote, local): + self.base_item_id = base_item_id + self.remote = remote + self.local = local + + @staticmethod + def dir_concat(d, f): + d = d.rstrip('/') + return f if d == '' else d + '/' + f + + +class JobDirectory(Job): + def __init__(self, base_item_id, remote, local, item_id=None): + super().__init__(base_item_id, remote, local) + self.item_id = item_id + + def process_child(self, child): + new_copy = copy.deepcopy(self) + new_copy.remote = self.dir_concat(self.remote, child['name']) + new_copy.local = self.dir_concat(self.local, child['name']) + if 'folder' in child: + new_copy.item_id = child['id'] + else: + new_copy.__class__ = JobFile + new_copy.file_size = child['size'] + new_copy.url = child['@microsoft.graph.downloadUrl'] + return new_copy + + def get_url(self): + if self.item_id is None: + item_id = '{}{}'.format(self.base_item_id, quote(':/' + self.remote + ':' if self.remote else '')) + else: + item_id = self.item_id + + return '/drives/{0}/items/{1}/children' \ + '?select=name,folder,size,id,createdDateTime,@microsoft.graph.downloadUrl' \ + '&top=1000' \ + .format(self.base_item_id.split('!')[0], item_id) + + +class JobFile(Job): + def __init__(self, base_item_id, remote, local, file_size, url=None): + super().__init__(base_item_id, remote, local) + self.file_size = file_size + self.url = url diff --git a/src/setup.py b/src/setup.py new file mode 100644 index 0000000..3e0a367 --- /dev/null +++ b/src/setup.py @@ -0,0 +1,11 @@ +import logging +import sys + +root = logging.getLogger() +root.setLevel(logging.WARNING) + +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(levelname)s - %(message)s') +handler.setFormatter(formatter) +root.addHandler(handler) diff --git a/src/token_manager.py b/src/token_manager.py new file mode 100644 index 0000000..901a1be --- /dev/null +++ b/src/token_manager.py @@ -0,0 +1,53 @@ +import json +import time +import threading +import logging +from requests_oauthlib import OAuth2Session + +from .auth_helper import settings, token_url + + +class TokenManager: + + def __init__(self, filename, token=None): + self.lock = threading.Lock() + self.filename = filename + + if token is not None: + self.__save_token(token) + else: + self.__load_token() + + def __save_token(self, new_token): + self.token = new_token + with open(self.filename, 'w') as outfile: + json.dump(new_token, outfile, indent=4, sort_keys=True) + + def __load_token(self): + with open(self.filename) as f: + self.token = json.load(f) + + def get_token(self): + with self.lock: + now = time.time() + # Subtract 5 minutes from expiration to account for clock skew + expire_time = self.token['expires_at'] - 300 + + if now >= expire_time: # Refresh the token + logging.warning('Refreshing OAuth2 Token') + aad_auth = OAuth2Session(settings['app_id'], + token=self.token, + scope=settings['scopes'], + redirect_uri=settings['redirect']) + + refresh_params = { + 'client_id': settings['app_id'], + 'client_secret': settings['app_secret'], + } + new_token = aad_auth.refresh_token(token_url, **refresh_params) + + self.__save_token(new_token) + return new_token + + else: + return self.token diff --git a/src/worker.py b/src/worker.py new file mode 100644 index 0000000..fd542a7 --- /dev/null +++ b/src/worker.py @@ -0,0 +1,104 @@ +import logging +import os.path +from fnmatch import fnmatch + +from src.job import JobDirectory, JobFile + + +def dir_get_parent(d): + slash = d.rstrip('/').rfind('/') + if slash > 0: + return d[0:slash] + return '' + + +def human_readable_bytes(B): + """Return the given bytes as a human friendly KB, MB, GB, or TB string""" + B = float(B) + KB = float(1024) + MB = float(KB ** 2) # 1,048,576 + GB = float(KB ** 3) # 1,073,741,824 + TB = float(KB ** 4) # 1,099,511,627,776 + if B < KB: + return '{0} {1}'.format(B, 'Bytes' if 0 == B > 1 else 'Byte') + if KB <= B < MB: + return '{0:.2f} KB'.format(B / KB) + if MB <= B < GB: + return '{0:.2f} MB'.format(B / MB) + if GB <= B < TB: + return '{0:.2f} GB'.format(B / GB) + return '{0:.2f} TB'.format(B / TB) + + +class Worker: + def __init__(self, queue, drive_helper, blacklist, interrupt_flag): + self.queue = queue + self.drive = drive_helper + self.blacklist = blacklist or [] + self.errors = [] + self.downloaded = set() + self.interrupt_flag = interrupt_flag + + def work(self): + logging.warning('A worker thread launched') + while not self.interrupt_flag['exit']: + item = self.queue.get() + + if item is None: + # logging.warning('A worker thread exited') + break + elif isinstance(item, JobDirectory): + self.do_folder(item) + else: + self.do_file(item) + self.queue.task_done() + + def do_folder(self, job: JobDirectory): + if any(fnmatch(job.remote, x) for x in self.blacklist): + logging.info('Skipping folder [%s]', job.remote) + return None + + try: + logging.info('Fetching folder [%s]', job.remote) + children = self.drive.get_dir(job) + except Exception as e: + logging.error('Fail to ls [%s]: %s: %s', job.remote, type(e).__name__, e) + self.errors.append(job.remote) + return None + + try: + for child in children: + self.queue.put(job.process_child(child)) + except Exception as e: + logging.error( + 'Fail to process directory [%s]: %s', job.remote, e) + self.errors.append(job.remote) + return None + + def do_file(self, job: JobFile): + if any(fnmatch(job.remote, x) for x in self.blacklist): + logging.info('Skipping file [%s]', job.remote) + return None + + try: + if os.path.isfile(job.local): + local_size = os.path.getsize(job.local) + if local_size == job.file_size: + logging.info('Skipping file [%s], already exists', job.remote) + return + logging.warning( + 'Downloading file [%s], due to different size (%s | %s)', + job.remote, + human_readable_bytes(job.file_size), + human_readable_bytes(local_size)) + else: + logging.warning('Downloading file [%s]', job.remote) + + parent_dir = dir_get_parent(job.local) + if parent_dir != '': + os.makedirs(parent_dir, exist_ok=True) + self.drive.download_file(job) + self.downloaded.add(dir_get_parent(job.remote)) + except Exception as e: + logging.error('Fail to Download [%s]: %s: %s', job.remote, type(e).__name__, e) + self.errors.append(job.remote)