Updated: 2026-01-14
Update Note: Initial. Need to clean up and remove extraneous, trial/error stuff.
use Microsoft's MSAL for Python to take care of the auth flow.
the OneDrive implementation below uses REST calls; this ended up being not ideal. When doing big synchs (ie, lots of folders, files), stuff would stop working (ie, hang while calling API). Wasnt' sure about the root cause, eg, home network, API, etc, but was a pain. That's why most REST calls to the API are wrapped in a retry decorator - need to look at this again
{
"GRAPH_API_ENDPOINT": "https://graph.microsoft.com/v1.0/",
"config": {
"client_id": "xxxxx(see OneDrive Setup)",
"authority": "https://login.microsoftonline.com/consumers",
"scope": [
"Files.ReadWrite",
"User.Read"
]
},
"retries":2
}
this is the config for OneDrive
this file should be in the "onedrive" folder
the "onedrive" folder is at the same level as the python file
import json
import subprocess, os, platform
import time
import datetime
import sys
from flask import request
import requests
from requests.exceptions import ConnectionError, Timeout, RequestException
import msal
import json
import webbrowser
import atexit
import msal
from msgraph import GraphServiceClient
import asyncio
import functools
import urllib.parse
import mimetypes
''' This is intended to be a standalone class to push/pull from OneDrive
This uses a config file located under a child folder (of where the script resides)
called "onedrive"
The config file is "_onedrive_config.json"
'''
class OneDrive:
def __init__(self, account):
self._account = account
self.folder_items = {}
script_path = os.path.abspath(__file__)
script_directory = os.path.dirname(script_path)
with open(f"{script_directory}/onedrive/_onedrive_config.json") as _f:
config_json = json.loads(_f.read())
self._onedrive_config = config_json
print("Config: ",self._onedrive_config)
''' Retry handler
Sometimes, we have rando issue from requests... eg, max retries
Turns out that it was "ConnectionError"
We do a retry handler
'''
def dec_run_w_retry(func):
@functools.wraps(func)
def wrapper (*args,**kwargs):
self=args[0]
GRAPH_API_ENDPOINT = self._onedrive_config['GRAPH_API_ENDPOINT']
config = self._onedrive_config['config']
retries = self._onedrive_config['retries']
value = True
counter = 0
loop = True
while loop:
start_time = time.perf_counter_ns()
try:
value = func(*args,**kwargs)
except ConnectionError as e:
time.sleep(5)
print(f"Connection Error (e.g., DNS failure, refused connection): {e}")
value = False
except Timeout as e:
print(f"Timeout Error: {e}")
time.sleep(5)
value = False
except RequestException as e:
# Catches any other general requests exception
print(f"A general requests error occurred: {e}")
time.sleep(5)
value = False
except Exception as error_object:
print("An exception occurred:", type(error_object).__name__)
print("Error message:", error_object)
print(f"error in getRoot")
time.sleep(5)
value = False
except:
time.sleep(5)
print ("Something happened")
value = False
end_time = time.perf_counter_ns()
run_time = end_time - start_time
if value !=False:
loop = False
else:
counter+=1
time.sleep(5)
if counter == retries:
raise ValueError (f"max retries in dec_run_w_retry for {func.__name__}")
break
print(f"{self._account} dec_run_w_retry [{func.__name__}]: {str(run_time/1000000000)}")
return value
return wrapper
@dec_run_w_retry
def getAccessToken(self):
...
GRAPH_API_ENDPOINT = self._onedrive_config['GRAPH_API_ENDPOINT']
config = self._onedrive_config['config']
retries = self._onedrive_config['retries']
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
me = self._account
returnObject = {}
returnObject['journal'] = []
returnObject['access_token'] = ""
returnObject['journal'].append(f"me is {me}")
if me=="":
returnObject['journal'].append(f"there is no me. returning")
return returnObject
# Get the absolute path of the current script file
script_path = os.path.abspath(__file__)
# Get the directory containing the script
script_directory = os.path.dirname(script_path)
# Name of the cache file
# print("cache filename ",me, f"{script_directory}/my_msal_cache_{me}.bin")
cache_filename = f"{script_directory}/onedrive/my_msal_cache_{me}.bin"
# Initialize a serializable token cache
access_token_cache = msal.SerializableTokenCache()
# Attempt to load the cache from the file if it exists
if os.path.exists(cache_filename):
returnObject['journal'].append("Attempting to load cache from file")
try:
with open(cache_filename, "r") as f:
access_token_cache.deserialize(f.read())
except Exception as e:
returnObject['journal'].append("error loading cache - starting with empty cache")
print(f"Error loading cache: {e}. Starting with an empty cache.")
# Define the application
app = msal.PublicClientApplication(
config["client_id"],
authority=config["authority"],
token_cache=access_token_cache,
)
result = None
# First, try to get a token from the cache.
accounts = app.get_accounts()
if accounts:
# Use the first account to acquire a token silently.
# According to
result = app.acquire_token_silent(config["scope"], account=accounts[0])
if not result:
# If a token is not available in the cache, use the device flow to acquire a new token.
flow = app.initiate_device_flow(scopes=config["scope"])
print(flow["message"])
# TODO if we're here in an ajax call, we really should have the callback
# retrigger the device authentication flow
# Open the browser automatically
webbrowser.open(flow["verification_uri"])
result = app.acquire_token_by_device_flow(flow)
returnObject['journal'].append(flow['message'])
# Use the access token to call the Microsoft Graph API.
if "access_token" in result:
access_token = result["access_token"]
returnObject['access_token'] = access_token
returnObject['outcome'] = "OK"
if access_token_cache.has_state_changed:
open(cache_filename, "w").write(access_token_cache.serialize())
else:
returnObject['journal'].append("Error authenticating with onedrive")
error = result.get("error")
returnObject['outcome'] = "Not OK"
if error == "invalid_client":
print("Invalid client ID. Please check your Azure AD application configuration")
else:
print(error)
return returnObject
@dec_run_w_retry
def getItem(self,item_id):
...
GRAPH_API_ENDPOINT = self._onedrive_config['GRAPH_API_ENDPOINT']
config = self._onedrive_config['config']
retries = self._onedrive_config['retries']
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
returnObject = {}
access_token = self.getAccessToken()
headers = {'Authorization': 'Bearer ' + access_token['access_token']}
response = requests.get(
GRAPH_API_ENDPOINT + f'me/drive/items/{item_id}',
headers=headers
)
returnObject['response'] = response.json()
return returnObject
@dec_run_w_retry
def getItemByPath(self,path):
...
GRAPH_API_ENDPOINT = self._onedrive_config['GRAPH_API_ENDPOINT']
config = self._onedrive_config['config']
retries = self._onedrive_config['retries']
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
returnObject = {}
access_token = self.getAccessToken()
headers = {'Authorization': 'Bearer ' + access_token['access_token']}
# eg, call = GRAPH_API_ENDPOINT + f'me/drive/root:/LinuxBackups/_Python Flask File Manager/Data'
call = GRAPH_API_ENDPOINT + f'me/drive/root:/{urllib.parse.quote(path)}'
response = requests.get(
call,
headers=headers
)
call_response = response.json()
if 'error' in call_response:
...
# An error is not necessarily a bad thing - it just means the record does not exist in onedrive yet
# print(call_response['error'])
returnObject['status']="error"
returnObject['response'] = call_response
return returnObject
# write_file_log(filename=f"{path}",filecontents_raw=call_response,parsejson=True)
returnObject['status']="OK"
returnObject['response'] = call_response
return returnObject
@dec_run_w_retry
def getRootChildren(self):
...
GRAPH_API_ENDPOINT = self._onedrive_config['GRAPH_API_ENDPOINT']
config = self._onedrive_config['config']
retries = self._onedrive_config['retries']
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
returnObject = {}
access_token = self.getAccessToken()
headers = {'Authorization': 'Bearer ' + access_token['access_token']}
response = requests.get(
GRAPH_API_ENDPOINT + f'me/drive/root/children',
headers=headers
)
returnObject['response'] = response.json()
return returnObject
@dec_run_w_retry
def getChildrenByFolderID(self, folder_id):
...
GRAPH_API_ENDPOINT = self._onedrive_config['GRAPH_API_ENDPOINT']
config = self._onedrive_config['config']
retries = self._onedrive_config['retries']
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
returnObject = {}
access_token = self.getAccessToken()
headers = {'Authorization': 'Bearer ' + access_token['access_token']}
response = requests.get(
GRAPH_API_ENDPOINT + f'me/drive/items/{folder_id}/children',
headers=headers
)
call_response = response.json()
if 'error' in call_response:
...
# An error is not necessarily a bad thing - it just means the record does not exist in onedrive yet
# print(call_response['error'])
returnObject['status']="error"
returnObject['response'] = call_response
return returnObject
returnObject['status'] = "OK"
returnObject['response'] = response.json()
script_path = os.path.abspath(__file__)
# Get the directory containing the script
script_directory = os.path.dirname(script_path)
# with open(f"{script_directory}/log/getChildrenByFolderID {folder_id}.json","wt") as _f:
# _f.writelines(json.dumps(returnObject['response']['value']))
return returnObject
@dec_run_w_retry
def createFolder(self,parent_folder_id,folder_name):
...
GRAPH_API_ENDPOINT = self._onedrive_config['GRAPH_API_ENDPOINT']
config = self._onedrive_config['config']
retries = self._onedrive_config['retries']
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
returnObject = {}
access_token = self.getAccessToken()
headers = {'Authorization': 'Bearer ' + access_token['access_token']}
json_payload = {"name": folder_name,
"folder": { },
"@microsoft.graph.conflictBehavior": "rename"}
response = requests.post(
GRAPH_API_ENDPOINT + f'me/drive/items/{parent_folder_id}/children',
headers=headers,
json=json_payload
)
call_response = response.json()
if 'error' in call_response:
...
print(call_response['error'])
returnObject['status']="error"
return returnObject
returnObject['status']="OK"
returnObject['response'] = call_response
return returnObject
@dec_run_w_retry
def downloadLargeFile(self,onedrive_id,output_directory,item_name, item_size ):
...
GRAPH_API_ENDPOINT = self._onedrive_config['GRAPH_API_ENDPOINT']
config = self._onedrive_config['config']
retries = self._onedrive_config['retries']
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
access_token = self.getAccessToken()
headers = {'Authorization': 'Bearer ' + access_token['access_token']}
# get an item
response = requests.get(
GRAPH_API_ENDPOINT + f'me/drive/items/{onedrive_id}',
headers=headers
)
response_json = response.json()
if response.status_code != 200:
...
if "name" not in response_json:
# Get the absolute path of the current script file
script_path = os.path.abspath(__file__)
# Get the directory containing the script
script_directory = os.path.dirname(script_path)
with open(f"{script_directory}/log/error {onedrive_id}.json","wt") as _f:
_f.writelines(json.dumps(response.json()))
raise ValueError("Error")
...
name = response_json['name']
download_url = response_json['@microsoft.graph.downloadUrl']
# print("downloadurl",download_url)
chunk_counter=0
with requests.get(download_url, stream=True,verify=True) as r:
with open(output_directory+"/"+name, 'wb') as fd:
# we pick 8 MB chunk size
for chunk in r.iter_content(chunk_size=8*1024*1024):
chunk_counter+=1
if chunk_counter%10==0:
print("chunk%10",chunk_counter, item_size)
fd.write(chunk)
return True
@dec_run_w_retry
def createLargeFile(self,parent_item_id,file_name,local_path):
...
GRAPH_API_ENDPOINT = self._onedrive_config['GRAPH_API_ENDPOINT']
config = self._onedrive_config['config']
retries = self._onedrive_config['retries']
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
returnObject = {}
file_size = os.path.getsize(local_path)
access_token = self.getAccessToken()
headers = {'Authorization': 'Bearer ' + access_token['access_token']}
# if file size = 0, we run into issues
if file_size==0:
...
# PUT /me/drive/items/{parent-id}:/{filename}:/content
headers['Content-Type']="text/plain"
headers['Content-Length'] = "0"
response = requests.put(
GRAPH_API_ENDPOINT + f'me/drive/items/{parent_item_id}:/{file_name}:/content',
headers=headers,data=None
)
returnObject['status'] = "OK"
returnObject['response'] = response.json()
if "error" in returnObject['response']:
raise ValueError(f"updateLargeFile {response.json()}")
return returnObject
mime_type, encoding = mimetypes.guess_type(file_name)
if mime_type==None:
mime_type = "application/octet-stream"
headers['Content-Type'] = mime_type
# we need to url encode the file name
file_name = urllib.parse.quote(file_name)
if file_size<4*1024*1024:
...
# PUT /me/drive/items/{parent-id}:/{filename}:/content
bytes = None
with open(local_path, 'rb') as f:
...
bytes = f.read()
response = requests.put(
GRAPH_API_ENDPOINT + f'me/drive/items/{parent_item_id}:/{file_name}:/content',
headers=headers,data=bytes
)
returnObject['status'] = "OK"
returnObject['response'] = response.json()
if "error" in returnObject['response']:
raise ValueError(f"updateLargeFile {response.json()}")
return returnObject
# get an item
response = requests.post(
GRAPH_API_ENDPOINT + f'me/drive/items/{parent_item_id}:/{file_name}:/createUploadSession',
headers=headers
)
upload_url = (response.json())['uploadUrl']
chunk_size = 8*1024*1024 # 10 MB chunks
responsecounter=0
with open(local_path, 'rb') as f:
start_byte = 0
while start_byte < file_size:
end_byte = min(start_byte + chunk_size, file_size)
chunk = f.read(end_byte - start_byte)
headers = {
'Content-Length': str(len(chunk)),
'Content-Range': f'bytes {start_byte}-{end_byte - 1}/{file_size}'
# 'Authorization': f'Bearer {access_token}'
}
response = requests.put(upload_url, headers=headers, data=chunk)
responsecounterstring = str(responsecounter)
lname=f"msal2/{local_path.replace("/","_")} responsecounter {responsecounterstring}.json"
responsecounter+=1
if responsecounter%10==0:
print("chunk%10",responsecounter, file_size)
response.raise_for_status() # Raise an exception for bad status codes
start_byte = end_byte
returnObject['status'] = "OK"
returnObject['response'] = response.json()
if "error" in returnObject['response']:
raise ValueError(f"updateLargeFile {response.json()}")
return returnObject
@dec_run_w_retry
def updateLargeFile(self,item_id,file_name,local_path):
...
GRAPH_API_ENDPOINT = self._onedrive_config['GRAPH_API_ENDPOINT']
config = self._onedrive_config['config']
retries = self._onedrive_config['retries']
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
returnObject = {}
local_path_reference = local_path
file_size = os.path.getsize(local_path)
access_token = self.getAccessToken()
headers = {'Authorization': 'Bearer ' + access_token['access_token']}
# if file size = 0, we run into issues
if file_size==0:
...
# PUT /me/drive/items/{parent-id}:/{filename}:/content
headers['Content-Type']="text/plain"
headers['Content-Length'] = "0"
response = requests.put(
GRAPH_API_ENDPOINT + f'me/drive/items/{item_id}/content',
headers=headers,data=None
)
returnObject['status'] = "OK"
returnObject['response'] = response.json()
if "error" in returnObject['response']:
raise ValueError(f"updateLargeFile {response.json()}")
return returnObject
if file_size<8*1024*1024:
...
# PUT /me/drive/items/{parent-id}:/{filename}:/content
bytes = None
with open(local_path, 'rb') as f:
...
bytes = f.read()
response = requests.put(
GRAPH_API_ENDPOINT + f'me/drive/items/{item_id}/content',
headers=headers,data=bytes
)
returnObject['status'] = "OK"
returnObject['response'] = response.json()
if "error" in returnObject['response']:
raise ValueError(f"updateLargeFile {response.json()}")
return returnObject
# get an item
response = requests.post(
GRAPH_API_ENDPOINT + f'me/drive/items/{item_id}/createUploadSession',
headers=headers
)
upload_url = (response.json())['uploadUrl']
chunk_size = 8*1024*1024 # 10 MB chunks
responsecounter=0
with open(local_path, 'rb') as f:
start_byte = 0
while start_byte < file_size:
end_byte = min(start_byte + chunk_size, file_size)
chunk = f.read(end_byte - start_byte)
headers = {
'Content-Length': str(len(chunk)),
'Content-Range': f'bytes {start_byte}-{end_byte - 1}/{file_size}'
# 'Authorization': f'Bearer {access_token}'
}
response = requests.put(upload_url, headers=headers, data=chunk)
responsecounterstring = str(responsecounter)
responsecounter+=1
if responsecounter%10==0:
print("chunk%10",responsecounter, file_size)
response.raise_for_status() # Raise an exception for bad status codes
start_byte = end_byte
returnObject['status'] = "OK"
returnObject['response'] = response.json()
if "error" in returnObject['response']:
raise ValueError(f"updateLargeFile {response.json()}")
return returnObject
@dec_run_w_retry
def getRoot(self):
...
GRAPH_API_ENDPOINT = self._onedrive_config['GRAPH_API_ENDPOINT']
config = self._onedrive_config['config']
retries = self._onedrive_config['retries']
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
returnObject = {}
access_token = self.getAccessToken()
headers = {'Authorization': 'Bearer ' + access_token['access_token']}
response = requests.get(
GRAPH_API_ENDPOINT + f'me/drive/root',
headers=headers
)
returnObject['call'] = GRAPH_API_ENDPOINT + f'me/drive/root'
call_response = response.json()
if 'error' in call_response:
...
print(call_response['error'])
returnObject['status']="error"
return returnObject
returnObject['status']="OK"
returnObject['response'] = call_response
return returnObject
# Note that behavior of the OneDrive get_create_folderByPath is different than GoogleDrive
@dec_run_w_retry
def get_create_folderByPath(self,path,parent_folder_id):
...
result = self.getItemByPath(path=path)
folder_id=""
if result['status'] != "OK":
...
folder_name = path.replace(os.path.dirname(path),"")
if folder_name[0]=="/":
folder_name=folder_name[1:]
# we need to create the folder
create_result = self.createFolder(parent_folder_id=parent_folder_id,folder_name=folder_name)
folder_id = create_result['response']['id']
print(f"{self._account} get_create_folderByPath - create onedrive folder: {path}")
else:
folder_id = result['response']['id']
print(f"{self._account} get_create_folderByPath - get onedrive folder: {path}")
return {"status":"OK","folder_id":folder_id}
@dec_run_w_retry
def update_create_fileByPath(self,onedrive_path,parent_folder_id, file_name, local_path,local_size):
...
# this is the original v1 - we used to call the api for every file
# result = self.getItemByPath(path=onedrive_path)
# this is v2 - we have, from the previous step, a list of
action=""
result = None
if parent_folder_id in self.folder_items:
folder_items = self.folder_items[parent_folder_id]
try:
result = next(item for item in folder_items if item["name"]==file_name)
action = "update"
except StopIteration:
action = "create"
...
file_id=""
if action=="create":
...
# create the file
print(f"{self._account} update_create_fileByPath - create in onedrive: {local_path}")
result = self.createLargeFile(parent_item_id=parent_folder_id, file_name=file_name, local_path=local_path)
file_id = result['response']['id']
else:
...
# update the file
file_id = result['id']
# if the files have the exact same size, chances are they are the same - this is good enough
if result['size'] == local_size:
print(f"{self._account} update_create_fileByPath - skipped onedrive for: {local_path}")
result = {"status":"skipped"}
else:
print(f"{self._account} update_create_fileByPath - update in onedrive: {local_path}")
result = self.updateLargeFile(item_id=file_id,file_name=file_name, local_path=local_path )
return {"status":"OK","parent_folder_id":parent_folder_id, "file_id":file_id}
@staticmethod
def driveExists(path):
pieces = path.split('/')
# print(pieces)
if len(pieces)<4:
print("exiting")
sys.exit()
newpath="/"
for i in range(1,4):
newpath+=(pieces[i]+"/")
return (os.path.exists(newpath))
def check_make_directory_local_only(self,directory,**kwargs):
''' Checks's the directory passed in and if it doesn't exist, creates it
We expect the first positional argument to be directory
'''
log=0
if "log" in kwargs:
log=1
pieces = directory.split("/")
# ignore the first piece since it's blank
initial = 1
directory_path = ""
for piece in pieces:
if initial:
initial = 0
continue
directory_path += ("/" + piece)
if not os.path.exists(directory_path):
...
os.mkdir(directory_path)
def reset_global_variable(self):
self.listdir_items = []
self.listdir_directories = {}
def clean_string_for_onedrive(self,stringtoclean):
...
stringtoclean = stringtoclean.replace("\\","")
stringtoclean = stringtoclean.replace("/","")
stringtoclean = stringtoclean.replace(":","")
stringtoclean = stringtoclean.replace("<","")
stringtoclean = stringtoclean.replace(">","")
stringtoclean = stringtoclean.replace(":","")
return stringtoclean
# Get the files
def get_local_children(self,directory,include_directories, **kwargs):
''' File System - get children'''
self.listdir_items = []
self.listdir_directories = {}
pieces = directory.split("/")
newpath="/"
end = (len(pieces)-1)
for i in range(1,end):
newpath+=(pieces[i]+"/")
# so i don't forget... this is to ensure
self.listdir_directories[newpath[:-1]] = ""
log=0
if "log" in kwargs:
log=1
directory_only = 0
if include_directories==0:
directory_only = 1
directories_to_process = []
directories_to_process.append(directory)
self.listdir_directories[directory] = ""
limit = 1000000
counter = 0
while (len(directories_to_process)>0):
counter +=1
if counter > limit:
break
current_directory = directories_to_process.pop()
files = os.listdir(current_directory)
for file in files:
item={}
item['name']= self.clean_string_for_onedrive(file)
if file !=item['name']:
os.rename(f"{current_directory}/{file}",f"{current_directory}/{item['name']}")
item['onedrive_id'] = ""
item['fullPath'] = f"{current_directory}/{item['name']}"
item['directory'] = f"{current_directory}"
if os.path.isfile(f"{current_directory}/{item['name']}"):
item['isFile'] = 1
stats = os.stat(f"{current_directory}/{item['name']}")
item['size'] = stats.st_size
self.listdir_items.append(item)
else:
item['isFile'] = 0
item['szie'] = -1
# Exclusion list - we don't need to back up this folder
if item['name']==".venv" or item['name']=="__pycache__" :
continue
if not directory_only:
directories_to_process.append(f"{current_directory}/{item['name']}")
self.listdir_directories[f"{current_directory}/{item['name']}"] = ""
self.listdir_items.append(item)
return {"listdir_items":self.listdir_items,"listdir_directories":self.listdir_directories,}
''' We get the onedrive folder IDs
'''
def get_local_children_onedrive_folders (self,local_directory,onedrive_reference_directory, **kwargs):
...
GRAPH_API_ENDPOINT = self._onedrive_config['GRAPH_API_ENDPOINT']
config = self._onedrive_config['config']
retries = self._onedrive_config['retries']
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
returnObject = {}
returnObject['queries'] = []
onedrive_reference_directory_id=""
log=0
if "log" in kwargs:
log=1
# Design Note:
# We have to do three checks:
# 1. does the onedrive reference directory exist
# 2. check ancestor of the local directory and local directory, eg, if local directory is /home/Documents/test... we need to make sure that home, Documents, and test exist
# 3. then we can check the descendents of the local directory
# This is check 1.
result = self.getItemByPath(path=onedrive_reference_directory)
if result['status'] != "OK":
...
# if it does not exist we need to create it
# first, get the root id
result = self.getRoot()
root_id = result['response']['id']
if result['status'] != "OK":
raise ValueError(f"Unable to find root in method update_upload_db_from_onedrive_v2", result)
# then, create the folder
result = self.createFolder( parent_folder_id=root_id,folder_name=onedrive_reference_directory)
if result['status'] != "OK":
raise ValueError(f"Unable to create onedrive reference directory {onedrive_reference_directory} in update_upload_db_from_onedrive_v2", result)
onedrive_reference_directory_id = result['response']['id']
else:
onedrive_reference_directory_id = result['response']['id']
onedriveids_dictionary = {}
index = 0
for item in self.listdir_directories:
...
currentpath = ""
if index==0:
...
# we need to make sure that the ancestors exist
pieces = item.split("/")
parent_folder_id = onedrive_reference_directory_id
newpath=onedrive_reference_directory+"/"
end = (len(pieces))
for i in range(1,end):
newpath+=(pieces[i]+"/")
currentpath = newpath[:-1]
# print("currentpath",currentpath)
result = self.get_create_folderByPath(path=currentpath,parent_folder_id = parent_folder_id)
parent_folder_id = result['folder_id']
onedriveids_dictionary[currentpath] = parent_folder_id
self.listdir_directories[item] = parent_folder_id
# so i don't forget... this is to ensure
else:
...
# if i am here, this means my parent will always exist
currentpath = onedrive_reference_directory+""+item
ancestorpath = os.path.dirname(currentpath)
parent_folder_id = onedriveids_dictionary[ancestorpath]
# print("currentpath",currentpath)
result = self.get_create_folderByPath(path=currentpath,parent_folder_id = parent_folder_id)
parent_folder_id = result['folder_id']
onedriveids_dictionary[currentpath] = parent_folder_id
self.listdir_directories[item] = parent_folder_id
print(f"{self._account} get_local_children_onedrive_folders - folder: {currentpath}")
# we want to improve performance - so we do a get child of each folder
# this is an extra step in the directory phase, but it should give us big
# dividends for folders that have lots of files
if not parent_folder_id in self.folder_items:
raw = self.getChildrenByFolderID(parent_folder_id)
vals = raw['response']['value']
self.folder_items[parent_folder_id] = []
for val in vals:
item = {}
item['name'] = val['name']
if "file" in val:
item['isFile']=1
else:
item['isFile']=0
item['size'] = val['size']
if "parentReference" in val:
item['parent'] = val['parentReference']['id']
item['id'] = val['id']
self.folder_items[parent_folder_id].append(item)
index +=1
self.onedriveids_dictionary_folders = onedriveids_dictionary
return returnObject
def get_local_children_onedrive_files (self,local_directory,onedrive_reference_directory, **kwargs):
''' This method reads onedrive - and updates the OnedriveFolders table with the directories; in v2 we are more focused
'''
...
GRAPH_API_ENDPOINT = self._onedrive_config['GRAPH_API_ENDPOINT']
config = self._onedrive_config['config']
retries = self._onedrive_config['retries']
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
returnObject = {}
returnObject['queries'] = []
account = self._account
# We don't want to synch all onedrive with no control so we pass in the folder
onedrive_reference_directory_id=""
log=0
if "log" in kwargs:
log=1
# Design Note:
# Pre-req: update_onedrive_folders_from_upload_db
# 1. get a list of folders to process from the folders table
# 2. for each folder, get child files from onedrive and compare against db version
# 3. create/update onedrive as required
# to help with debugging
# onedrive_general.write_file_log(f"onedrive_ids",onedriveids_dictionary,True)
# for each folder id (from our db),
# get a list of children from onedrive
# get the files from the db
# cross reference to see what we need to update
for listdir_item in self.listdir_items:
...
if listdir_item['isFile']==0:
continue
newpath=onedrive_reference_directory+""+listdir_item['fullPath']
parent_folder_id = self.listdir_directories[listdir_item['directory']]
result = self.update_create_fileByPath(newpath,parent_folder_id,listdir_item['name'], listdir_item['fullPath'], listdir_item['size'])
...
return returnObject
# BUSINESS PROCESS
# Pushes a local folder to onedrive target directory
def push_local_to_onedrive(self,local_source,onedrive_reference_directory,include_directories ):
...
self.get_local_children(local_source, include_directories)
self.get_local_children_onedrive_folders(local_source,onedrive_reference_directory)
self.get_local_children_onedrive_files(local_source,onedrive_reference_directory)
...
# Pushes a onedrive folder to a local target directory - gen 1
def push_onedrive_to_local (self,onedrive_start_directory, local_target_start_directory, include_directories,**kwargs):
''' This method reads onedrive - and updates the download_db file and folder table
'''
...
GRAPH_API_ENDPOINT = self._onedrive_config['GRAPH_API_ENDPOINT']
config = self._onedrive_config['config']
retries = self._onedrive_config['retries']
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
returnObject={}
total_counter = 0
# make sure the local_target_start_directory exists
self.check_make_directory_local_only(local_target_start_directory)
self.check_make_directory_local_only(f"{local_target_start_directory}/{onedrive_start_directory}")
log=0
if "log" in kwargs:
log=1
directories_to_process = []
directories_to_process.append(onedrive_start_directory)
while len(directories_to_process)>0:
current_path = directories_to_process.pop(0)
print(f"{self._account} processing {current_path}")
result = self.getItemByPath(current_path)
if result['status'] !="OK":
print(f"{self._account} get_Onedrive_Children - error getting {current_path}")
continue
# now that we have an ID, we can get the children
folder_id = result['response']['id']
result = self.getChildrenByFolderID(folder_id)
# onedrive_general.write_file_log(f"{current_path.replace("/","_")} children",result,True)
if result['status'] !="OK":
print(f"{self._account} get_Onedrive_Children - error getting children of {current_path}")
continue
# if we are here, we have no error and can parse
result_list = result['response']['value']
result_counter = 0
for item in result_list:
# we want to separate
total_counter +=1
if "folder" in item:
# the item is a folder
if include_directories==0:
continue
onedrive_folder_path = f"{current_path}/{item['name']}"
onedrive_folder = {}
onedrive_folder['name'] = item['name']
onedrive_folder['id'] = item['id']
onedrive_folder['localpath'] = local_target_start_directory+"/"+current_path+"/"+item['name']
directories_to_process.append(onedrive_folder_path)
self.check_make_directory_local_only(onedrive_folder['localpath'])
...
if "file" in item:
# the item is a file
onedrive_file = {}
onedrive_file['name'] = item['name']
onedrive_file['id'] = item['id']
onedrive_file['localpath'] = local_target_start_directory+"/"+current_path+"/"+item['name']
onedrive_file['localdirectory'] = local_target_start_directory+"/"+current_path
onedrive_file['size'] = item['size']
timestamp = str(int(datetime.datetime.now().timestamp()))
if os.path.exists(onedrive_file['localpath']):
stat = os.stat(onedrive_file['localpath'])
local_size = stat.st_size
if local_size==onedrive_file['size']:
continue
os.rename(onedrive_file['localpath'],onedrive_file['localpath']+"."+timestamp+".bk")
start_time = time.perf_counter_ns()
print (total_counter, result_counter,f"getting result ",onedrive_file['localdirectory'],item['name'])
if item['size']<(4*1024*1024):
result = self.downloadLargeFile(onedrive_file['id'],onedrive_file['localdirectory'],item['name'],item['size'])
else:
result = self.downloadLargeFile(onedrive_file['id'],onedrive_file['localdirectory'],item['name'],item['size'])
end_time = time.perf_counter_ns()
print(">> duration: ",(end_time - start_time)/1000000000)
...
result_counter+=1
...
...
# TODO - nothing done yet - just copy/paste
# Synch onedrive to local means remove that any onedrive files that don't have corresponding
def onedrive_orphan_purge (self,onedrive_start_directory, local_target_start_directory, include_directories,**kwargs):
''' This method reads onedrive - and updates the download_db file and folder table
'''
...
GRAPH_API_ENDPOINT = self._onedrive_config['GRAPH_API_ENDPOINT']
config = self._onedrive_config['config']
retries = self._onedrive_config['retries']
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
returnObject={}
total_counter = 0
# make sure the local_target_start_directory exists
self.check_make_directory_local_only(local_target_start_directory)
self.check_make_directory_local_only(f"{local_target_start_directory}/{onedrive_start_directory}")
log=0
if "log" in kwargs:
log=1
directories_to_process = []
directories_to_process.append(onedrive_start_directory)
while len(directories_to_process)>0:
current_path = directories_to_process.pop(0)
print(f"{self._account} processing {current_path}")
result = self.getItemByPath(current_path)
if result['status'] !="OK":
print(f"{self._account} get_Onedrive_Children - error getting {current_path}")
continue
# now that we have an ID, we can get the children
folder_id = result['response']['id']
result = self.getChildrenByFolderID(folder_id)
# onedrive_general.write_file_log(f"{current_path.replace("/","_")} children",result,True)
if result['status'] !="OK":
print(f"{self._account} get_Onedrive_Children - error getting children of {current_path}")
continue
# if we are here, we have no error and can parse
result_list = result['response']['value']
result_counter = 0
for item in result_list:
# we want to separate
total_counter +=1
if "folder" in item:
# the item is a folder
if include_directories==0:
continue
onedrive_folder_path = f"{current_path}/{item['name']}"
onedrive_folder = {}
onedrive_folder['name'] = item['name']
onedrive_folder['id'] = item['id']
onedrive_folder['localpath'] = local_target_start_directory+"/"+current_path+"/"+item['name']
directories_to_process.append(onedrive_folder_path)
self.check_make_directory_local_only(onedrive_folder['localpath'])
...
if "file" in item:
# the item is a file
onedrive_file = {}
onedrive_file['name'] = item['name']
onedrive_file['id'] = item['id']
onedrive_file['localpath'] = local_target_start_directory+"/"+current_path+"/"+item['name']
onedrive_file['localdirectory'] = local_target_start_directory+"/"+current_path
onedrive_file['size'] = item['size']
timestamp = str(int(datetime.datetime.now().timestamp()))
if os.path.exists(onedrive_file['localpath']):
stat = os.stat(onedrive_file['localpath'])
local_size = stat.st_size
if local_size==onedrive_file['size']:
continue
os.rename(onedrive_file['localpath'],onedrive_file['localpath']+"."+timestamp+".bk")
start_time = time.perf_counter_ns()
print (total_counter, result_counter,f"getting result ",onedrive_file['localdirectory'],item['name'])
if item['size']<(4*1024*1024):
result = self.downloadLargeFile(onedrive_file['id'],onedrive_file['localdirectory'],item['name'],item['size'])
else:
result = self.downloadLargeFile(onedrive_file['id'],onedrive_file['localdirectory'],item['name'],item['size'])
end_time = time.perf_counter_ns()
print(">> duration: ",(end_time - start_time)/1000000000)
...
result_counter+=1
...
...
if __name__ == "__main__":
...
Note: This is a Google Site. Feedback here.