Recursive ls seems to works

This commit is contained in:
2019-09-28 23:14:16 +01:00
parent 115b235256
commit e8ffe8cb78
4 changed files with 84 additions and 186 deletions

83
main.py
View File

@@ -1,79 +1,55 @@
#!/usr/bin/env python #!/usr/bin/env python
import os
import sys
import json
import argparse import argparse
import time import json
import webbrowser
import logging
import queue import queue
import threading
import signal import signal
import threading
import time
import logging
from boxsdk import Client
from pathlib import Path
import src.setup # pylint: disable=unused-import from src.job import Job
from src.setup import setup_logger
from src.auth_helper import init_oauth
from src.const import SETTING_FILE
from src.worker import Worker from src.worker import Worker
from src.token_manager import TokenManager
from src.auth_helper import get_sign_in_url, get_token_from_code
from src.drive_helper import DriveHelper, get_user
from src.job import JobDirectory
def interactive_confirm():
inp = input("Confirm? (y/N): ")
if inp.lower() != 'y':
print('Exiting')
sys.exit(1)
def main(): def main():
with open('sync_settings.json') as f: setup_logger()
SETTINGS = json.load(f) with open(SETTING_FILE) as f:
logging.info('Loaded Settings: %s', SETTINGS) settings = json.load(f)
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("baseItemId", nargs='?', default='', help="base itemId (ABC12345!00001)") parser.add_argument("itemId", nargs='?', default=None, help="Item ID to download, use 0 for root")
parser.add_argument("remote", nargs='?', default='', help="remote path to sync") parser.add_argument("localDirectory", nargs='?', default='', help="Local path of the item")
parser.add_argument("local", nargs='?', default='', help="local path to sync")
parser.add_argument("-y", "--yes", help="skip confirmation dialogue", action="store_true") parser.add_argument("-y", "--yes", help="skip confirmation dialogue", action="store_true")
args = parser.parse_args() args = parser.parse_args()
client = Client(init_oauth())
q = queue.Queue() q = queue.Queue()
if args.baseItemId: if args.itemId is not None:
remote = args.remote.rstrip('/') local = Path('args.localDirectory')
local = os.path.expanduser(args.local.rstrip('/')) folder = client.folder(args.itemId).get(['name', 'id', 'size', 'modified_at', 'path_collection'])
print('baseItemId: [{0}]'.format(args.baseItemId)) q.put(Job(folder, local))
print('driveRoot: [{0}]'.format(args.driveRoot))
print('Syncing Remote: [{0}]'.format(remote))
print('With Local: [{0}]'.format(local))
q.put(JobDirectory(args.baseItemId, remote, local))
else: else:
print('Not implemented reading from settings yet, using test data')
local = Path('Temp')
folder = client.folder('0').get(['name', 'id', 'size', 'modified_at', 'path_collection'])
q.put(Job(folder, local))
'''
for job in SETTINGS.get('jobs', []): for job in SETTINGS.get('jobs', []):
q.put(JobDirectory(job['itemId'], job['remote'], job['local'])) q.put(JobDirectory(job['itemId'], job['remote'], job['local']))
print('Processing jobs in setting file') print('Processing jobs in setting file')
'''
if not (args.yes or SETTINGS.get('defaultYes', False)):
interactive_confirm()
try:
token_manager = TokenManager('token.json')
get_user(token_manager.get_token()) # Check token validity
except Exception:
logging.warning('Token not working, logging in')
sign_in_url, state = get_sign_in_url()
webbrowser.open(sign_in_url, new=2)
print('After logging in, please paste the entire callback URL (such as http://localhost:8000/......)')
callback_url = input('Paste here: ')
token = get_token_from_code(callback_url, state)
token_manager = TokenManager('token.json', token)
logging.info('Token successfully loaded')
threads = [] threads = []
drive_helper = DriveHelper(token_manager)
interrupt_flag = {'exit': False} interrupt_flag = {'exit': False}
worker_object = Worker(q, drive_helper, SETTINGS.get('blacklist', []), interrupt_flag) worker_object = Worker(q, client, settings.get('blacklist', []), interrupt_flag)
thread_count = SETTINGS.get('thread_count', 4) thread_count = settings.get('thread_count', 4)
logging.info('Launching %s threads', thread_count) logging.info('Launching %s threads', thread_count)
original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
for _ in range(thread_count): for _ in range(thread_count):
@@ -109,9 +85,6 @@ def main():
for f in sorted(worker_object.downloaded): for f in sorted(worker_object.downloaded):
print('-', f) print('-', f)
print(f'ls API call count: {drive_helper.ls_call_counts}')
print(f'dl API call count: {drive_helper.dl_call_counts}')
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@@ -1,41 +0,0 @@
from urllib.parse import quote
from requests_oauthlib import OAuth2Session
import requests
from src.job import JobFile, JobDirectory
graph_url = 'https://graph.microsoft.com/v1.0'
def get_user(token):
graph_client = OAuth2Session(token=token)
# Send GET to /me
user = graph_client.get('{0}/me'.format(graph_url))
# Return the JSON result
return user.json()
class DriveHelper:
def __init__(self, token_manager):
# /me/drive/sharedWithMe?select=name,parentReference,remoteItem
# driveId = parentReference.driveId
# driveRoot = remoteItem.id
self.token_manager = token_manager
self.ls_call_counts = 0
self.dl_call_counts = 0
def get_dir(self, job: JobDirectory):
graph_client = OAuth2Session(token=self.token_manager.get_token())
self.ls_call_counts += 1
return graph_client.get(graph_url + job.get_url()).json()['value']
def download_file(self, job: JobFile):
# remote = ':/' + remote + ':'
# url = '{0}/drives/{2}/items/{3}{1}/content'.format(graph_url, quote(remote), self.driveId, self.driveRoot)
self.dl_call_counts = 0
with requests.get(job.url, stream=True) as r:
r.raise_for_status()
with open(job.local, 'wb') as f:
for chunk in r.iter_content(chunk_size=1048576):
if chunk: # filter out keep-alive new chunks
f.write(chunk)

View File

@@ -1,50 +1,20 @@
import copy from pathlib import PurePosixPath, Path
from urllib.parse import quote
from boxsdk.object.item import Item
def get_remote_path(box_item: Item):
p = PurePosixPath('')
for entry in box_item.path_collection['entries']:
if entry.id == '0':
continue
p = p / entry.name
p = p / box_item.name
return str(p)
class Job: class Job:
def __init__(self, base_item_id, remote, local): def __init__(self, box_item: Item, local_path: Path):
self.base_item_id = base_item_id self.box_item = box_item
self.remote = remote self.local_path = local_path
self.local = local self.remote_path = get_remote_path(box_item)
@staticmethod
def dir_concat(d, f):
d = d.rstrip('/')
return f if d == '' else d + '/' + f
class JobDirectory(Job):
def __init__(self, base_item_id, remote, local, item_id=None):
super().__init__(base_item_id, remote, local)
self.item_id = item_id
def process_child(self, child):
new_copy = copy.deepcopy(self)
new_copy.remote = self.dir_concat(self.remote, child['name'])
new_copy.local = self.dir_concat(self.local, child['name'])
if 'folder' in child:
new_copy.item_id = child['id']
else:
new_copy.__class__ = JobFile
new_copy.file_size = child['size']
new_copy.url = child['@microsoft.graph.downloadUrl']
return new_copy
def get_url(self):
if self.item_id is None:
item_id = '{}{}'.format(self.base_item_id, quote(':/' + self.remote + ':' if self.remote else ''))
else:
item_id = self.item_id
return '/drives/{0}/items/{1}/children' \
'?select=name,folder,size,id,createdDateTime,@microsoft.graph.downloadUrl' \
'&top=1000' \
.format(self.base_item_id.split('!')[0], item_id)
class JobFile(Job):
def __init__(self, base_item_id, remote, local, file_size, url=None):
super().__init__(base_item_id, remote, local)
self.file_size = file_size
self.url = url

View File

@@ -1,8 +1,10 @@
import logging import logging
import os.path import os.path
from fnmatch import fnmatch from fnmatch import fnmatch
from boxsdk import Client
from boxsdk.object.folder import Folder
from src.job import JobDirectory, JobFile from .job import Job
def dir_get_parent(d): def dir_get_parent(d):
@@ -12,74 +14,68 @@ def dir_get_parent(d):
return '' return ''
def human_readable_bytes(B): def human_readable_bytes(b):
"""Return the given bytes as a human friendly KB, MB, GB, or TB string""" """Return the given bytes as a human friendly KB, MB, GB, or TB string"""
B = float(B) b = float(b)
KB = float(1024) kb = float(1024)
MB = float(KB ** 2) # 1,048,576 mb = float(kb ** 2) # 1,048,576
GB = float(KB ** 3) # 1,073,741,824 gb = float(kb ** 3) # 1,073,741,824
TB = float(KB ** 4) # 1,099,511,627,776 tb = float(kb ** 4) # 1,099,511,627,776
if B < KB: if b < kb:
return '{0} {1}'.format(B, 'Bytes' if 0 == B > 1 else 'Byte') return '{0} {1}'.format(b, 'Bytes' if 0 == b > 1 else 'Byte')
if KB <= B < MB: if kb <= b < mb:
return '{0:.2f} KB'.format(B / KB) return '{0:.2f} KB'.format(b / kb)
if MB <= B < GB: if mb <= b < gb:
return '{0:.2f} MB'.format(B / MB) return '{0:.2f} MB'.format(b / mb)
if GB <= B < TB: if gb <= b < tb:
return '{0:.2f} GB'.format(B / GB) return '{0:.2f} GB'.format(b / gb)
return '{0:.2f} TB'.format(B / TB) return '{0:.2f} TB'.format(b / tb)
class Worker: class Worker:
def __init__(self, queue, drive_helper, blacklist, interrupt_flag): def __init__(self, queue, client: Client, blacklist, interrupt_flag):
self.queue = queue self.queue = queue
self.drive = drive_helper self.client = client
self.blacklist = blacklist or [] self.blacklist = blacklist or []
self.errors = []
self.downloaded = set()
self.interrupt_flag = interrupt_flag self.interrupt_flag = interrupt_flag
self.downloaded = set()
self.errors = []
def work(self): def work(self):
logging.warning('A worker thread launched') logging.warning('A worker thread launched')
while not self.interrupt_flag['exit']: while not self.interrupt_flag['exit']:
item = self.queue.get() job = self.queue.get()
if item is None: if (job is None) or (any(fnmatch(job.remote_path, x) for x in self.blacklist)):
# logging.warning('A worker thread exited') logging.info('A worker thread exited')
break break
elif isinstance(item, JobDirectory): elif isinstance(job.box_item, Folder):
self.do_folder(item) self.do_folder(job)
else: else:
self.do_file(item) self.do_file(job)
self.queue.task_done() self.queue.task_done()
def do_folder(self, job: JobDirectory): def do_folder(self, job: Job):
if any(fnmatch(job.remote, x) for x in self.blacklist):
logging.info('Skipping folder [%s]', job.remote)
return None
try: try:
logging.info('Fetching folder [%s]', job.remote) logging.info('Fetching folder [%s]', job.remote_path)
children = self.drive.get_dir(job) children = job.box_item.get_items(limit=5000, fields=['name', 'id', 'size', 'modified_at', 'path_collection'])
except Exception as e: except Exception as e:
logging.error('Fail to ls [%s]: %s: %s', job.remote, type(e).__name__, e) logging.error('Fail to ls [%s]: %s: %s', job.remote_path, type(e).__name__, e)
self.errors.append(job.remote) self.errors.append(job.remote_path)
return None return None
try: try:
for child in children: for child in children:
self.queue.put(job.process_child(child)) self.queue.put(Job(child, job.local_path / child.name))
except Exception as e: except Exception as e:
logging.error( logging.error(
'Fail to process directory [%s]: %s', job.remote, e) 'Fail to process directory [%s]: %s', job.remote_path, e)
self.errors.append(job.remote) self.errors.append(job.remote_path)
return None return None
def do_file(self, job: JobFile): def do_file(self, job: Job):
if any(fnmatch(job.remote, x) for x in self.blacklist): print(f'Supposed to download file {job.remote_path} to {job.local_path}')
logging.info('Skipping file [%s]', job.remote)
return None return None
try: try:
if os.path.isfile(job.local): if os.path.isfile(job.local):
local_size = os.path.getsize(job.local) local_size = os.path.getsize(job.local)