Updated: 2026-01-17
Update Note: Need refactoring - need to remove trial/error stuff. Fixed efect in getFolderByPath - need to update comment. Test of operative file push in progress.
we use the Python client library
TODO - do full drive download (to see if using client library mitigates against connectivity/API call issues
import os.path
import json
import sys
import mimetypes
import io
import datetime
import time
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google.auth.exceptions import RefreshError
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaFileUpload
from googleapiclient.http import MediaIoBaseDownload
class GoogleDrive:
def __init__(self,account):
# each instance of this class will be associated with a google account
self._account = account
self.folders = {}
self.folder_items = {}
# initialize values
self.root_id = None
self.gdrive_ref_folder_id = None
self.gdrive_paths = {}
# If modifying these scopes, delete the file token.json.
self.SCOPES = ["https://www.googleapis.com/auth/drive"]
def getAccessToken(self):
me = self._account
creds = None
# Get the absolute path of the current script file
script_path = os.path.abspath(__file__)
# Get the directory containing the script
script_directory = os.path.dirname(script_path)
# Name of the cache file
# We store in a google folder since this is GoogleDrive
cache_filename = f"{script_directory}/google/googledrivetoken_{me}.json"
creds_filename = f"{script_directory}/google/googledriveclientcredentials.json"
creds = None
if os.path.exists(cache_filename):
creds = Credentials.from_authorized_user_file(cache_filename, self.SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
except RefreshError:
print("The refresh token expired")
flow = InstalledAppFlow.from_client_secrets_file(
creds_filename, self.SCOPES
)
creds = flow.run_local_server(port=0)
else:
flow = InstalledAppFlow.from_client_secrets_file(
creds_filename, self.SCOPES
)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
# Note - should give some thought to encrypting this file
with open(cache_filename, "w") as token:
token.write(creds.to_json())
# we should always have creds returned
return creds
def createFolder(self, parent_id, folder_name):
...
account = self._account
creds = self.getAccessToken()
try:
service = build("drive", "v3", credentials=creds)
file_metadata = {
"name": f"{folder_name}",
"mimeType": "application/vnd.google-apps.folder",
"parents":[f"{parent_id}"]
}
# pylint: disable=maybe-no-member
file = service.files().create(body=file_metadata).execute()
# Get the absolute path of the current script file
script_path = os.path.abspath(__file__)
# Get the directory containing the script
script_directory = os.path.dirname(script_path)
return file
...
except HttpError as err:
print (f"Error {err}")
def createFile(self,local_path,parent_id):
...
creds = self.getAccessToken()
# Get the absolute path of the current script file
script_path = os.path.abspath(__file__)
# Get the directory containing the script
script_directory = os.path.dirname(script_path)
local_directory = os.path.dirname(local_path)
local_file_name = str(local_path).replace(local_directory+"/","")
try:
service = build("drive", "v3", credentials=creds)
file_metadata = {
"name": f"{local_file_name}",
"parents":[f"{parent_id}"]
}
mimetype = mimetypes.guess_type(f"{local_path}")
media_stuff = MediaFileUpload(filename=f"{local_path}", mimetype=mimetype[0], resumable=True)
# pylint: disable=maybe-no-member
file = (service.files().create(media_body = media_stuff, body=file_metadata, media_mime_type=mimetype[0]).execute())
# with open(f"{script_directory}/google_file.json","wt") as _f:
# _f.write(json.dumps(file))
...
return file
except HttpError as err:
print (f"Error {err}")
return None
def updateFile(self,local_path,file_id):
...
creds = self.getAccessToken()
# Get the absolute path of the current script file
script_path = os.path.abspath(__file__)
# Get the directory containing the script
script_directory = os.path.dirname(script_path)
local_directory = os.path.dirname(local_path)
local_file_name = str(local_path).replace(local_directory+"/","")
try:
service = build("drive", "v3", credentials=creds)
mimetype = mimetypes.guess_type(f"{local_path}")
media_stuff = MediaFileUpload(filename=f"{local_path}", mimetype=mimetype[0], resumable=True)
# pylint: disable=maybe-no-member
file = (service.files().update(fileId=file_id,media_body = media_stuff, media_mime_type=mimetype[0]).execute())
...
except HttpError as err:
print (f"Error {err}")
def download_file(self,file_id,localpath):
creds = self.getAccessToken()
# Get the absolute path of the current script file
script_path = os.path.abspath(__file__)
# Get the directory containing the script
script_directory = os.path.dirname(script_path)
real_file_id = file_id
# this is a slides doc
# real_file_id = "1gRnuD9BnCJvbe1dc_BnLyZdzNhPxqH2RfUzumJ8hE38"
try:
# create drive api client
service = build("drive", "v3", credentials=creds)
file_id = real_file_id
# pylint: disable=maybe-no-member
request = service.files().get_media(fileId=file_id)
# file = io.BytesIO()
file = io.FileIO(f"{localpath}", 'wb')
downloader = MediaIoBaseDownload(file, request)
done = False
while done is False:
status, done = downloader.next_chunk()
print(f"Download {int(status.progress() * 100)}.")
except HttpError as error:
print(f"An error occurred: {error}")
file = None
...
def export_file(self,file_id,localpath,mime_type,file_extension):
creds = self.getAccessToken()
# Get the absolute path of the current script file
script_path = os.path.abspath(__file__)
# Get the directory containing the script
script_directory = os.path.dirname(script_path)
real_file_id = file_id
# this is a slides doc
# real_file_id = "1gRnuD9BnCJvbe1dc_BnLyZdzNhPxqH2RfUzumJ8hE38"
try:
# create drive api client
service = build("drive", "v3", credentials=creds)
file_id = real_file_id
# pylint: disable=maybe-no-member
request = service.files().export_media(fileId=file_id,mimeType=mime_type)
# file = io.BytesIO()
file = io.FileIO(f"{localpath}.{file_extension}", 'wb')
downloader = MediaIoBaseDownload(file, request)
done = False
while done is False:
status, done = downloader.next_chunk()
print(f"Download {int(status.progress() * 100)}.")
except HttpError as error:
print(f"An error occurred: {error}")
file = None
...
def get_folders_of_parent(self, parent_id, parent_path):
"""Search file in drive location
Load pre-authorized user credentials from the environment.
TODO(developer) - See https://developers.google.com/identity
for guides on implementing OAuth2 for the application.
"""
creds = self.getAccessToken()
try:
# create drive api client
service = build("drive", "v3", credentials=creds)
files = []
page_token = None
while True:
# pylint: disable=maybe-no-member
response = (
service.files()
.list(
q=f"mimeType='application/vnd.google-apps.folder' and '{parent_id}' in parents",
spaces="drive",
fields="nextPageToken, files(id,name,parents,modifiedTime,size,mimeType)",
pageToken=page_token,
)
.execute()
)
for file in response.get("files", []):
# Process change
# print(f'get_folders_of_parent: {parent_path} - {file.get("name")}, {file.get("id")}, {file.get("mimeType")}')
name = file.get("name")
files.append(file)
...
# extend was from sample - we don't want to extend
# files.extend(response.get("files", []))
page_token = response.get("nextPageToken", None)
if page_token is None:
break
return files
except HttpError as error:
print(f"An error occurred: {error}")
files = None
return files
def get_files_of_parent(self, parent):
"""Search file in drive location
"""
creds = self.getAccessToken()
try:
# create drive api client
service = build("drive", "v3", credentials=creds)
files = []
page_token = None
while True:
# pylint: disable=maybe-no-member
# fields="nextPageToken, files(id,name,parents,mimeType,modifiedTime,size)",
response = (
service.files()
.list(
q=f"'{parent}' in parents",
spaces="drive",
fields="nextPageToken, files(id,name,parents,mimeType,modifiedTime,size,trashed)",
pageToken=page_token,
)
.execute()
)
for file in response.get("files", []):
# Process change
# The API will return trashed files - we don't care about pulling down trashed files
# so we only pull down trashed = False
if file['trashed']== False:
files.append(file)
...
# extend was from sample - we don't want to extend
# files.extend(response.get("files", []))
page_token = response.get("nextPageToken", None)
if page_token is None:
break
# self.write_file("get_files_of_parent",files,True)
return files
except HttpError as error:
print(f"An error occurred: {error}")
files = None
return files
# HELPER METHODS
@staticmethod
def driveExists(path):
pieces = path.split('/')
# print(pieces)
if len(pieces)<4:
print("exiting")
sys.exit()
newpath="/"
for i in range(1,4):
newpath+=(pieces[i]+"/")
return (os.path.exists(newpath))
def check_make_directory_local_only(self,directory,**kwargs):
''' Checks's the directory passed in and if it doesn't exist, creates it
We expect the first positional argument to be directory
'''
log=0
if "log" in kwargs:
log=1
pieces = directory.split("/")
# ignore the first piece since it's blank
initial = 1
directory_path = ""
for piece in pieces:
if initial:
initial = 0
continue
directory_path += ("/" + piece)
if not os.path.exists(directory_path):
...
os.mkdir(directory_path)
def clean_string_for_googledrive(self,stringtoclean):
...
stringtoclean = stringtoclean.replace("\\","")
stringtoclean = stringtoclean.replace("/","")
stringtoclean = stringtoclean.replace(":","")
stringtoclean = stringtoclean.replace("<","")
stringtoclean = stringtoclean.replace(">","")
stringtoclean = stringtoclean.replace(":","")
return stringtoclean
# this is a logging method
def write_file(self,filename,filecontents_raw, parsejson=True):
filecontents=""
if parsejson:
filecontents = json.dumps(filecontents_raw)
else:
filecontents = filecontents_raw
...
# Get the absolute path of the current script file
script_path = os.path.abspath(__file__)
# Get the directory containing the script
script_directory = os.path.dirname(script_path)
with open(f"{script_directory}/log/{filename}.json","wt") as _f:
_f.write(filecontents)
# IMPORTANT - This takes in the LOCAL path and
# when storing the file in google drive, stores it all in the GDRIVE_REF_FOLDER
def get_create_folderByPath(self,local_path,gdrive_ref_folder, create=True):
...
# If we don't have the gdrive_ref_folder_id, we get it
# The Google drive reference folder must always be a child of root - this is
# an implementation constraint for now
# a path looks like /home/XXXX/documents
# so when we split, we get an array ["","home","XXXX","documents"]
# on our google drive, the path will be /[gdrive_ref_folder]/home/XXXX/documents
pieces = str(local_path).split("/")
pieces[0] = gdrive_ref_folder
# make a gdrive path dict of ids...
# eg, {"root":"[root_id]",}
current_path = ""
gdrive_paths = {}
for i,piece in enumerate(pieces):
current_path += "/"+piece
if not current_path in self.gdrive_paths:
self.gdrive_paths[current_path] = ""
# we want a local grdive path because we will enumerate below
gdrive_paths[current_path] = self.gdrive_paths[current_path]
# Update - we don't really need this; we can refer to root as root
# Get the root id
# if self.root_id==None:
# self.root_id = self.get_root_id()
# we iterate through the gdrive_path dict
parent_folder_id = "root"
parent_folder_name = "root"
for i,gdrive_path in enumerate(gdrive_paths):
if len(self.gdrive_paths[gdrive_path])>0:
parent_folder_id = self.gdrive_paths[gdrive_path]
continue
# This here stores the result - this is so that we don't need to
# make multiple calls to get the same info over and over
# Should improve performance
if not parent_folder_id in self.folders:
self.folders[parent_folder_id] = self.get_folders_of_parent(parent_folder_id,gdrive_path)
print(f"{gdrive_path} folders from google api")
else:
print(f"{gdrive_path} folders from dict")
folders = self.folders[parent_folder_id]
# needle represents what we are looking for
needle = pieces[i]
needle_exists = True
result = None
try:
result = next(item for item in folders if item["name"]==needle)
except StopIteration:
# stop iteration is legit - this just means that the folder we're looking for
# does not exist - so we create the folder
if create:
result = self.createFolder(parent_folder_id, needle)
...
parent_folder_id = result['id']
self.gdrive_paths[gdrive_path] = parent_folder_id
temp = "/"+gdrive_ref_folder+local_path
if gdrive_path=="/"+gdrive_ref_folder+local_path:
break
...
return parent_folder_id
...
# IMPORTANT - This takes the absolute google path
def get_folderByPath(self,raw_google_path):
...
# If we don't have the gdrive_ref_folder_id, we get it
# The Google drive reference folder must always be a child of root - this is
# an implementation constraint for now
# a path looks like /home/XXXX/documents
# so when we split, we get an array ["","home","XXXX","documents"]
# on our google drive, the path will be /[gdrive_ref_folder]/home/XXXX/documents
# also, since this is a get folder by path, and we are calling get folders of parent
# we can stop our call at the last child's parent
# note: we need to do this forward slash check because we want our config files to be
# consistent. onedrive has a item by path method but googledrive does not. so
# in the config file, we don't put forward slashes... so for googledrive, we do it here.
if raw_google_path[0] !="/":
raw_google_path = "/" + raw_google_path
google_path = (raw_google_path)
# we append root because when we split, we have a blank first element
# so we make path also line up
pieces = str(google_path).split("/")
pieces[0] = "root"
# we want to append root
google_path = "/root"+google_path
# make a gdrive path dict of ids...
# eg, {"root":"[root_id]",}
current_path = ""
gdrive_paths = {}
for i,piece in enumerate(pieces):
current_path += "/"+piece
if not current_path in self.gdrive_paths:
self.gdrive_paths[current_path] = ""
# we want a local grdive path because we will enumerate below
gdrive_paths[current_path] = self.gdrive_paths[current_path]
# Update - we don't really need this; we can refer to root as root
# Get the root id
# if self.root_id==None:
# self.root_id = self.get_root_id()
# we iterate through the gdrive_path dict
parent_folder_id = "root"
parent_folder_name = "root"
# we are dealing with paths... so we stop at length -2 because
# we don't need to do a final get on the path
for i,gdrive_path in enumerate(gdrive_paths):
if len(self.gdrive_paths[gdrive_path])>0:
parent_folder_id = self.gdrive_paths[gdrive_path]
# also need to break here - first pass through is ok, but then on second pass,
# we have info - so if we don't break and continue, we will get index out of range at the pieces +1
# no need to proeed if i is len pieces -2
# no need to proeed if i is len pieces -2
# if i==len(pieces)-2:
# break
continue
# This here stores the result - this is so that we don't need to
# make multiple calls to get the same info over and over
# Should improve performance
if not parent_folder_id in self.folders:
self.folders[parent_folder_id] = self.get_folders_of_parent(parent_folder_id,gdrive_path)
print(f"{gdrive_path} folders from google api")
else:
print(f"{gdrive_path} folders from dict")
folders = self.folders[parent_folder_id]
# needle represents what we are looking for
# we look forward - note that we mitigate against index out of bounds by
# breaking at len(pieces-2)
# check
if i==(len(pieces)-1):
...
result = None
try:
result = next(item for item in folders if item["name"]==pieces[len(pieces)-1])
except StopIteration:
# in this context, we should not get no result - this is a
# get method - it is possible that we pass in bad path
return None
parent_folder_id = result['id']
self.gdrive_paths[gdrive_path] = parent_folder_id
...
else:
# we are at the last piece
needle = pieces[i+1]
needle_exists = True
result = None
try:
result = next(item for item in folders if item["name"]==needle)
except StopIteration:
# in this context, we should not get no result - this is a
# get method - it is possible that we pass in bad path
return None
...
parent_folder_id = result['id']
self.gdrive_paths[gdrive_path+"/"+needle] = parent_folder_id
if i==0:
self.gdrive_paths["/root"] = result['parents'][0]
...
...
# no need to proceed if i is len pieces -2
# if i==len(pieces)-1:
# break
...
return parent_folder_id
...
def update_create_fileByPath(self,local_path,gdrive_ref_folder):
...
# local_path is the local path tot he file so we need to get the directoy and file
local_directory = os.path.dirname(local_path)
local_filename = str(local_path).replace(local_directory+"/","")
folder_id = self.get_create_folderByPath(local_directory,gdrive_ref_folder)
# Now we get the files under this folder
if not folder_id in self.folder_items:
self.folder_items[folder_id] = self.get_files_of_parent(folder_id)
print(f"{local_path} calling google api")
else:
print(f"{local_path} getting from dict")
folder_items = self.folder_items[folder_id]
try:
result = next(item for item in folder_items if item['name']==local_filename)
if "size" in result:
stats = os.stat(local_path)
size = stats.st_size
if int(result['size']) != int(stats.st_size):
self.updateFile(local_path,result['id'])
print(f">> update file")
else:
print(f">> skipped - same size")
except StopIteration:
# if we are here, then we were unable to find the file name,
# which means that we need to create the file
self.createFile(local_path,folder_id)
print(f">> create file")
...
...
# Get the files
def get_local_children(self,directory,include_directories, **kwargs):
''' File System - get children'''
self.listdir_items = []
self.listdir_directories = {}
pieces = directory.split("/")
newpath="/"
end = (len(pieces)-1)
for i in range(1,end):
newpath+=(pieces[i]+"/")
# so i don't forget... this is to ensure
self.listdir_directories[newpath[:-1]] = ""
log=0
if "log" in kwargs:
log=1
directory_only = 0
if include_directories==0:
directory_only = 1
directories_to_process = []
directories_to_process.append(directory)
self.listdir_directories[directory] = ""
limit = 1000000
counter = 0
while (len(directories_to_process)>0):
counter +=1
if counter > limit:
break
current_directory = directories_to_process.pop()
files = os.listdir(current_directory)
for file in files:
item={}
item['name']= self.clean_string_for_googledrive(file)
if file !=item['name']:
os.rename(f"{current_directory}/{file}",f"{current_directory}/{item['name']}")
item['onedrive_id'] = ""
item['fullPath'] = f"{current_directory}/{item['name']}"
item['directory'] = f"{current_directory}"
if os.path.isfile(f"{current_directory}/{item['name']}"):
item['isFile'] = 1
stats = os.stat(f"{current_directory}/{item['name']}")
item['size'] = stats.st_size
self.listdir_items.append(item)
else:
item['isFile'] = 0
item['szie'] = -1
# Exclusion list - we don't need to back up this folder
if item['name']==".venv" or item['name']=="__pycache__" :
continue
if not directory_only:
directories_to_process.append(f"{current_directory}/{item['name']}")
self.listdir_directories[f"{current_directory}/{item['name']}"] = ""
self.listdir_items.append(item)
return {"listdir_items":self.listdir_items,"listdir_directories":self.listdir_directories,}
''' We get the onedrive folder IDs
'''
def get_local_children_googledrive_folders (self,local_directory, googledrive_reference_directory, **kwargs):
...
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
returnObject = {}
returnObject['queries'] = []
index = 0
for item in self.listdir_directories:
...
self.listdir_directories[item] = self.get_create_folderByPath(item,googledrive_reference_directory)
index +=1
return returnObject
def get_local_children_googledrive_files (self,local_directory,googledrive_reference_directory, **kwargs):
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
returnObject = {}
returnObject['queries'] = []
account = self._account
for listdir_item in self.listdir_items:
...
if listdir_item['isFile']==0:
continue
newpath=googledrive_reference_directory+""+listdir_item['fullPath']
parent_folder_id = self.listdir_directories[listdir_item['directory']]
result = self.update_create_fileByPath(listdir_item['fullPath'],googledrive_reference_directory)
...
return returnObject
# BUSINESS PROCESS
# Pushes a local folder to onedrive target directory
def push_local_to_googlerive(self,local_source,googledrive_reference_directory,include_directories ):
...
self.get_local_children(local_source, include_directories)
self.get_local_children_googledrive_folders(local_source, googledrive_reference_directory)
self.get_local_children_googledrive_files(local_source,googledrive_reference_directory)
# Pushes a googledrive folder to a local target directory - gen 1
def push_googledrive_to_local (self,googledrive_start_directory, local_target_start_directory, include_directories,**kwargs):
# Since we put this into an object, and we force instantiation with an account,
# we know that me will always be self._account
# keeping the placeholders
account = self._account
returnObject={}
total_counter = 0
# make sure the local_target_start_directory exists
self.check_make_directory_local_only(local_target_start_directory)
self.check_make_directory_local_only(f"{local_target_start_directory}/{googledrive_start_directory}")
directories_to_process = []
directories_to_process.append(googledrive_start_directory)
while len(directories_to_process)>0:
current_path = directories_to_process.pop(0)
print(f"{self._account} processing {current_path}")
result = self.get_folderByPath(current_path)
# Note - not sure if we need to do error handling here or not. Monitor.
if result==None:
print(f"{self._account} push_googledrive_to_local - error getting folder {current_path}")
continue
# now that we have an ID, we can get the children
folder_id = result
# this is an array of google files
result_list = self.get_files_of_parent(folder_id)
result_counter = 0
for item in result_list:
# we want to separate
total_counter +=1
if item['mimeType']=='application/vnd.google-apps.folder':
# the item is a folder
if include_directories==0:
continue
googledrive_folder_path = f"{current_path}/{item['name']}"
googledrive_folder = {}
googledrive_folder['name'] = item['name']
googledrive_folder['id'] = item['id']
googledrive_folder['localpath'] = local_target_start_directory+"/"+current_path+"/"+item['name']
directories_to_process.append(googledrive_folder_path)
self.check_make_directory_local_only(googledrive_folder['localpath'])
...
else:
# the item is a file
# Google sheet 'application/vnd.google-apps.spreadsheet'
# Google docs 'application/vnd.google-apps.document'
# Google slides 'application/vnd.google-apps.presentation'
googledrive_file = {}
googledrive_file['name'] = item['name']
googledrive_file['id'] = item['id']
googledrive_file['localpath'] = local_target_start_directory+"/"+current_path+"/"+item['name']
googledrive_file['localdirectory'] = local_target_start_directory+"/"+current_path
googledrive_file['size'] = item['size']
added_file_extension=""
# Original approach was to check the file size and only if different, save
# and make copy. These files are google files so we just overwrite because
# Google will always be the master.
if item['mimeType'] == 'application/vnd.google-apps.spreadsheet':
added_file_extension=".ods"
elif item['mimeType'] == 'application/vnd.google-apps.document':
added_file_extension=".odt"
elif item['mimeType'] == 'application/vnd.google-apps.presentation':
added_file_extension=".odp"
elif item['mimeType'] == 'application/vnd.google-apps.drawing':
added_file_extension=".png"
timestamp = str(int(datetime.datetime.now().timestamp()))
temp = f"{googledrive_file['localpath']}"
if os.path.exists(f"{googledrive_file['localpath']}"):
stat = os.stat(f"{googledrive_file['localpath']}")
local_size = stat.st_size
if int(local_size)==int(googledrive_file['size']):
print(f">> skipped {googledrive_file['localpath']}")
continue
if added_file_extension=="":
os.rename(googledrive_file['localpath'],googledrive_file['localpath']+"."+timestamp+".bk")
start_time = time.perf_counter_ns()
print (total_counter, result_counter,f"getting result ",googledrive_file['localdirectory'],item['name'])
print(f">> got {googledrive_file['localpath']}")
if item['mimeType'] == 'application/vnd.google-apps.spreadsheet':
...
self.export_file(googledrive_file['id'] , googledrive_file['localpath'], 'application/vnd.oasis.opendocument.spreadsheet','ods')
# application/vnd.oasis.opendocument.spreadsheet ods
elif item['mimeType'] == 'application/vnd.google-apps.document':
...
self.export_file(googledrive_file['id'] , googledrive_file['localpath'], 'application/vnd.oasis.opendocument.text','odt')
# application/vnd.oasis.opendocument.text odt
elif item['mimeType'] == 'application/vnd.google-apps.presentation':
...
self.export_file(googledrive_file['id'] , googledrive_file['localpath'], 'application/vnd.oasis.opendocument.presentation','odp')
elif item['mimeType'] == 'application/vnd.google-apps.drawing':
...
self.export_file(googledrive_file['id'] , googledrive_file['localpath'], 'image/png','png')
else:
self.download_file(googledrive_file['id'] , googledrive_file['localpath'] )
end_time = time.perf_counter_ns()
print(">> duration: ",(end_time - start_time)/1000000000)
...
result_counter+=1
...
...
# NOT USED SECTION
# this gets the root ID
def get_root_id(self):
creds = self.getAccessToken()
counter = 0
try:
# create drive api client
service = build("drive", "v3", credentials=creds)
# pylint: disable=maybe-no-member
response = service.files().get(fileId="root").execute()
if "id" in response:
self.root_id = response["id"]
return response["id"]
...
except HttpError as error:
print(f"An error occurred: {error}")
files = None
return files
# this is to see how stuff is returned
def getRootChildren(self, reference_folder, log):
# get creds
creds = self.getAccessToken()
service = build("drive", "v3", credentials=creds)
# Call the Drive v3 API
results = (
service.files()
.list(pageSize=1000, q="'root' in parents")
.execute()
)
items = results.get("files", [])
# Get the absolute path of the current script file
script_path = os.path.abspath(__file__)
# Get the directory containing the script
script_directory = os.path.dirname(script_path)
with open(f"{script_directory}/log/google_getrootchildren.json","wt") as _f:
_f.write(json.dumps(results))
...
return results
# this gets all folders in GoogleDrive
def get_all_folders(self):
creds = self.getAccessToken()
counter = 0
try:
# create drive api client
service = build("drive", "v3", credentials=creds)
files = {}
page_token = None
while True:
# pylint: disable=maybe-no-member
response = (
service.files()
.list(
q="mimeType='application/vnd.google-apps.folder'",
spaces="drive",
fields="nextPageToken, files(id,name,parents)",
pageToken=page_token,
)
.execute()
)
print (f"request [{counter}]")
for file in response.get("files", []):
# shared folders are also picked up by the query
# parents for shared folders do not exist - we only care about our folders.
if "parents" in file:
files[file['id']] = {"name":file['name'],"parent_id":file['parents'][0]}
print(f">> {file['name']}")
...
# files.extend(response.get("files", []))
page_token = response.get("nextPageToken", None)
counter +=1
if page_token is None:
break
except HttpError as error:
print(f"An error occurred: {error}")
files = None
return files
# to see how things worked re. folders
def build_all_folder_structure_from_file(self):
script_path = os.path.abspath(__file__)
script_directory = os.path.dirname(script_path)
folder_file = f"{script_directory}/log/google_folders.json";
if not os.path.exists(folder_file):
result = self.get_folders()
self.write_file("google_folders",result,True)
folder_dict = None
with open(folder_file,"rt") as _f:
folder_dict= json.loads(_f.read())
paths = {}
paths_file = f"{script_directory}/log/google_paths_file.json";
for folder in folder_dict:
path = folder_dict[folder]['name']
current_parent_id = folder_dict[folder]['parent_id']
loop = True
counter = 0
while loop:
counter +=1
if current_parent_id in folder_dict:
path = f"{folder_dict[current_parent_id]['name']}/{path}"
current_parent_id = folder_dict[current_parent_id]['parent_id']
else:
loop = False
paths[f"/{path}"]=folder
break
...
...
if counter>10:
...
with open (paths_file,"wt") as _f:
_f.writelines(json.dumps(paths))
...
if __name__ == "__main__":
...
Note: This is a Google Site. Feedback here.