"""Convenience class methods to use google drive apis
- google REST api's
#pip install google-api-python-client
Author: Terence Lim
License: MIT
"""
import pickle
import os.path
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import time
#to_url = "https://drive.google.com/uc?export=download&id=".format
# If modifying scopes, delete the file token.pkl.
_SCOPES = ['https://www.googleapis.com/auth/drive']
_FOLDER = 'application/vnd.google-apps.folder' # mimeType of folders
_HOME = 'root' # fileId of root folder
[docs]class GDrive:
"""Base class provides basic interface to essential google drive api's
Attributes
----------
to_url : str formatter
prepend to shared google drive file_id to construct url for download
Examples
--------
# init() prompts for googleapi authorization, which is stored in a tokenfile
# in current folder so that not prompted in subsequent executions
g = GDrive()
_ = g.ls() # display list of folders and files (and types)
# NOTE: in all methods, set silent=True to turn off raise exceptions
# and return None on error (letting caller handle errors gracefully)
items = g.ls(silent=True) # returns files+folders and their fields
g.cd() # returns remote current working directory
g.mkdir('/new_folder') # create new folder with absolute path name
g.cd('new_folder') # change remote cwd
g.mkdir('another_folder') # create new folder in remote cwd
g.put('local.txt', 'another_folder/newfile.txt') # upload, relative path
g.put('local.txt', '/new_folder') # upload, absolute path, infer filename
g.put('local.txt') # upload to remote cwd, infer filename
g.get('/new_folder/local.txt', 'localfile.txt') # upload, absolute path
g.get('another_folder/newfile.txt') # upload, relative path, infer filename
g.rm('/new_folder/local.txt') # remove, absolute path
g.rm('another_folder/newfile.txt') # remove, relative path
g.rmdir('another_folder') # remove folder from remote cwd
g.rmdir('/new_folder') # remove folder with absolute path name
Notes
-----
See https://developers.google.com/drive/api/v3/quickstart/python
to turn on the Drive API. In resulting dialog click DOWNLOAD CLIENT
CONFIGURATION and save the file credentials.json to your working directory.
Initially, the class will attempt to open browser to prompt for
authorization (information is stored in tokenfile, so subsequent executions
will not prompt). If this fails, copy the URL from the console and manually
open it in your browser.
Requirements
------------
pip install --upgrade \
google-api-python-client google-auth-httplib2 google-auth-oauthlib
"""
# to_url = "https://drive.google.com/uc?export=download&id={}".format
def __init__(self, tokenfile='token.pkl'):
"""Prompt for authorization, then save for subsequent executions
Parameters
----------
tokenfile : str, default is 'token.pkl'
locally stores authorization information for subsequent executions
"""
creds = None
# The file token.pkl stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the
# first time. See https://developers.google.com/drive/api/v3/
if os.path.exists(tokenfile):
with open(tokenfile, 'rb') as token:
creds = pickle.load(token)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', _SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open(tokenfile, 'wb') as token:
pickle.dump(creds, token)
self.service = build('drive', 'v3', credentials=creds)
self.pwd = '/' # initial remote current working directory
def _fullpath(self, path):
"""Helper method to clean and complete path str"""
if path == '.':
path = self.pwd
if path == '..':
path = os.path.abspath(os.path.join(self.pwd, path))
while len(path)> 1 and path.endswith('/'): # no trailing '/'
path = path[:-1]
if not path.startswith('/'): # prepend cwd if not absolute path name
path = os.path.join(self.pwd, path)
while path.startswith('//'): # remove repeated '//'
path = path[1:]
return os.path.abspath(path) if len(path) else '/'
[docs] def fetchall(self, q=f"'{_HOME}' in parents",
filters={'ownedByMe': True, 'trashed':False},
fields=['id','name','parents','mimeType'], silent=True):
"""Helper method to fetch list of all files
Parameters
----------
q : str, default "'root' in parents" (select files in top-level folder)
Search query string with which to call service.files().list()
filters : list of dict, default is {'ownedByMe': True, 'trashed':False}
Files metadata properties to filter on, see
https://developers.google.com/drive/api/v3/reference/files
fields : list of str, default is ['id','name','parents','mimeType']
Specific fields of files to return, see
https://developers.google.com/drive/api/v3/fields-parameter
Returns
-------
result : list of dicts
dicts of specified fields for list of files that pass filter
Notes
-----
See https://developers.google.com/drive/api/v3/search-files
"""
results = []
page_token = None
fields = ",".join(set(fields).union(set(filters.keys())))
while True: # max pageSize 1000, so loop until done (page_token None)
r = self.service.files().list(
q=q, pageSize=1000, pageToken=page_token,
spaces='drive', # not photos nor appDataFolder
fields=f'nextPageToken, files({fields})').execute(num_retries=9)
items = r.get('files', [])
results += items
page_token = r.get('nextPageToken')
if page_token is None:
break
return [r for r in results if all(r[k]==v for k,v in filters.items())]
[docs] def fetchone(self, path, field=None):
"""Fetch metadata for one file given its path name
Parameters
----------
path : str
remote path name
field : str in {'id', 'name', 'mimeType', 'parents'}, default is None
specific metadata field to return. None to return all fields
Returns
-------
match : value, or dict of values, or None
value or dict of metadata values of matched file. None if not found
"""
def parts(p):
"""helper method to split path into parts (why not in os.path???)"""
p, t = os.path.split(p)
return ((parts(p) if p != '/' else ['/']) + ([t] if t else []))
path = self._fullpath(path)
match = {'id': _HOME, 'name': '/', 'mimeType': _FOLDER, 'parents': []}
for p in parts(path)[1:]: # sequentially try to match path name parts
items = self.fetchall(f"'{match['id']}' in parents",
fields=match.keys())
matches = [item for item in items if item['name'] == p]
if matches:
match = matches[0]
else: # this part of path name has no match, so exit the method
return None
return match if field is None else match[field]
[docs] def ls(self, path='.', silent=False):
"""Return list of files in remote folder"""
path = self._fullpath(path)
item = self.fetchone(path)
if item is None:
if not silent: # raise Exception if verbose
raise Exception(f"Cannot ls {path}")
return None # else silently return None
elif item['mimeType'] != _FOLDER:
files = [item]
if not silent:
print(f"{path} {item['mimeType']}")
else:
files = self.fetchall(f"'{item['id']}' in parents")
if not silent:
maxlen, ndirs, nfiles = 0, 0, 0
for item in files:
if item['mimeType'] == _FOLDER:
print(os.path.join(path, item['name']) + '/')
ndirs += 1
else:
maxlen = max(maxlen, len(item['name']))
nfiles += 1
for item in files:
if item['mimeType'] != _FOLDER:
print("{:{width}} {}".format(
item['name'], item['mimeType'], width=maxlen))
print(f" '{path}' has {ndirs} folders, {nfiles} files")
return files
[docs] def dir(self, path='.', silent=False):
"""Return list of files in remote folder"""
return self.ls(path=path, silent=silent)
[docs] def cd(self, path=None, silent=False):
"""Change remote working directory"""
if path is not None: # return cwd if no args
path = self._fullpath(path)
if self.fetchone(path, field='mimeType') != _FOLDER:
if not silent: # raise Exception if verbose
raise Exception('Cannot cd ' + path)
return None # else silently return
self.pwd = path
return self.pwd
[docs] def mkdir(self, path, silent=False):
"""Create remote folder"""
path = self._fullpath(path)
fileId = self.fetchone(path, field='id') # check if already exists
folder, base = os.path.split(path) # try to create new in parent folder
folderId = self.fetchone(folder, field='id')
if folderId is None or fileId is not None:
if not silent:
raise Exception(f"Cannot mkdir {path}")
return None
body = {'name': base, "parents": [folderId], 'mimeType': _FOLDER}
self.service.files().create(body=body).execute()
[docs] def md(self, path, silent=False):
"""Create remote folder"""
return self.mkdir(path=path, silent=silent)
[docs] def rmdir(self, path, silent=False):
"""Remove remote folder and all its contents, bypassing trash bin"""
path = self._fullpath(path)
fileId = self.fetchone(path, field='id')
if fileId is None:
if not silent: # raise Exception if verbose
raise Exception(f"Cannot remove {path}")
return None # else silently return None
self.service.files().delete(fileId=fileId).execute()
if not silent:
print(f"{path} '{fileId}' deleted")
if path == self.pwd: # if deleted cwd, then cd to parent
self.pwd = os.path.dirname(path)
[docs] def rm(self, path, silent=False):
"""Remove remote file, bypassing trash bin"""
if self.fetchone(path, field='mimeType') == _FOLDER:
if not silent: # raise Exception, else silently return None
raise Exception(f"Use rmdir to rm {path} and all its contents")
return self.rmdir(path)
[docs] def delete(self, path, silent=False):
"""Remove remote file, bypassing trash bin"""
return self.rm(path=path, silent=silent)
[docs] def get(self, path, filename='', silent=False):
"""Download a remote file to local filename
Parameters
----------
path : str
relative or absolute path name of remote file
filename : str, default is ''
Output filename. If blank: infer basename, save in curr local dir
silent : bool, default False
if True, print file statistics if successful, raise exception if not
Returns
-------
total_file_size : int
or None if unsuccessful and silent
Notes
-----
see https://developers.google.com/drive/api/v3/manage-downloads
"""
path = self._fullpath(path)
fileId = self.fetchone(path, field='id')
tic = time.time()
media = self.service.files().get_media(fileId=fileId)
if not filename:
filename = os.path.basename(path)
with open(filename,'wb') as f:
downloader = MediaIoBaseDownload(f, media)
done = False
while done is False:
try:
status, done = downloader.next_chunk()
except Exception as e:
if not silent: # raise Exception if verbose
raise Exception(str(e))
return None # else silently return
if not silent:
print("Retrieved to {}, len={}, in {:.0f} secs".format(
filename, status.total_size, time.time()-tic))
return status.total_size
[docs] def put(self, filename, path='', silent=False):
"""Upload a local filename to remote folder file
Parameters
----------
filename : str
Input local filename
path : str, default is ''
Absolute or relative output filename or folder (basename inferred
from local filename). If blank: upload to current remote folder
silent : bool, default False
if False, print file statistics if successful
Returns
-------
total_file_size : int
or None if unsuccessful and silent flag is set to True
Notes
-----
see https://developers.google.com/drive/api/v3/manage-uploads
"""
if not path: # if path not specified, put in current remote folder
path = self.pwd
path = self._fullpath(path)
if self.fetchone(path, field='mimeType') == _FOLDER:
base = os.path.basename(filename) # infer base from input filename
folder = path # if output path is folder
else:
folder, base = os.path.split(path)
media_body = MediaFileUpload(filename, resumable=True)
fileId = self.fetchone(os.path.join(folder, base), field='id')
tic = time.time()
if fileId: # fileId already exists, so use update method
self.service.files().update(
fileId=fileId, media_body=media_body).execute()
else:
folderId = self.fetchone(folder, field='id') # try to get folder ID
if folderId is None:
if not silent: # raise Exception if verbose
raise Exception('Cannot put ' + path)
return None # else silently return
self.service.files().create( # method creates new file in folder
body={'name': base, "parents": [folderId]},
media_body=media_body).execute()
if not silent:
print("{} to {}, len={}, in {:.0f} secs".format(
"Updated" if fileId else "Saved",
os.path.join(folder, base), media_body.size(), time.time()-tic))
return media_body.size()
[docs] def chmod(self, path, action='list', role='reader', email=None,
silent=False):
"""List, delete or create sharing permissions for a remote file
Parameters
----------
path : str
name of remote file to change permissions of
action : str in {'list' (default), 'create', 'delete'}
permissions action to execute
role : str in {'reader' (default), 'writer', 'owner', 'commenter'}
role to permit
Returns
-------
result: int or str
number of permissions listed or deleted, or shareable link created
"""
path = self._fullpath(path)
fileId = self.fetchone(path, field='id')
if fileId is None:
if not silent:
raise Exception(f"Cannot chmod {path}")
return None
items = self.service.permissions().list(
fileId=fileId).execute().get('permissions', [])
if action[0].lower() == 'c': # action is create
roles = ['reader', 'writer', 'owner', 'commenter']
role = {r[0].lower(): r for r in roles}.get(role[0].lower())
if role is None:
if not silent:
raise Exception(f'chmod invalid role not in {roles}')
return None
body = {'role': role,
'type': 'anyone' if email is None else "user",
'value': email,
'emailAddress': email}
item = self.service.permissions().create(
fileId=fileId, body=body).execute()
return self.service.files().get(
fileId=fileId, fields='webContentLink').execute()
elif action[0].lower() == 'd': # action is delete
for item in items:
if item['role'] != 'owner':
self.service.permissions().delete(
fileId=fileId, permissionId=item['id']).execute()
if not silent:
print(item)
else: # action is list
if not silent:
for item in items:
print(item)
print(f"{len(items)} permissions exist in {path}")
return items
if __name__ == '__main__':
g = GDrive()
silent=False
_ = g.ls(silent=silent)