AMGPT / google_api_deploy.py
achuthc1298's picture
Update google_api_deploy.py
ffb160e verified
raw
history blame contribute delete
No virus
2.76 kB
import os
import time
import pickle
import io
import json
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.http import MediaIoBaseUpload
# If modifying these SCOPES, delete the file token.pickle.
SCOPES = ['https://www.googleapis.com/auth/drive']
def authenticate_google_drive():
"""Authenticate and create the Google Drive service."""
creds = None
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES)
creds = flow.run_console()
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
service = build('drive', 'v3', credentials=creds)
return service
def read_google_doc(service, file_id):
"""Reads the content of a Google Doc as a string."""
try:
doc = service.files().export(fileId=file_id, mimeType='text/plain').execute()
return doc.decode('utf-8')
except Exception as e:
print(f"An error occurred: {e}")
return ""
def write_google_doc(service, file_id, content):
"""Writes content to a Google Doc."""
try:
media = MediaIoBaseUpload(io.BytesIO(content.encode()), mimetype='text/plain')
service.files().update(fileId=file_id, media_body=media).execute()
print("Content written successfully.")
except Exception as e:
print(f"An error occurred: {e}")
def write_google_doc_add(service, file_id, new_content):
"""Appends new content to a Google Doc."""
try:
# Read the current content
current_content = read_google_doc(service, file_id)
# Append the new content
updated_content = current_content + "\n" + new_content
# Write the updated content back to the document
media = MediaIoBaseUpload(io.BytesIO(updated_content.encode()), mimetype='text/plain')
service.files().update(fileId=file_id, media_body=media).execute()
print("Content appended successfully.")
except Exception as e:
print(f"An error occurred: {e}")
def monitor_google_doc(service, file_id):
"""Monitors the Google Doc for changes."""
last_content = read_google_doc(service, file_id)
while True:
current_content = read_google_doc(service, file_id)
if current_content != last_content:
return current_content
last_content = current_content
time.sleep(10)