|
from authlib.integrations.flask_client import OAuth |
|
from authlib.common.security import generate_token |
|
import ffmpeg |
|
from flask import Flask, render_template, request, jsonify, url_for, redirect, session |
|
from functools import wraps |
|
import os |
|
import streamlink |
|
import threading |
|
import time |
|
from faster_whisper import WhisperModel |
|
import subprocess |
|
from datetime import datetime as dt |
|
from datetime import timedelta, timezone |
|
from apiclient import discovery |
|
from google.oauth2 import service_account |
|
import json |
|
|
|
print('here') |
|
|
|
client_secret = json.loads(os.environ.get("client_secret")) |
|
print(client_secret, type(client_secret)) |
|
gdoc_id = os.environ.get("gdoc_id") |
|
print(gdoc_id) |
|
GOOGLE_CLIENT_SECRET = os.environ.get("GOOGLE_CLIENT_SECRET") |
|
GOOGLE_CLIENT_ID = os.environ.get("GOOGLE_CLIENT_ID") |
|
allowed_users = os.environ.get("allowed_users") |
|
|
|
|
|
model_size = 'small' |
|
beamsize = 2 |
|
wmodel = WhisperModel(model_size, device="cpu", compute_type="int8") |
|
|
|
|
|
if not os.path.exists('transcription_files'): os.makedirs('transcription_files') |
|
for f in os.listdir('transcription_files/'): os.remove(os.path.join('transcription_files/', f)) |
|
|
|
with open("client_secret.json", "w") as json_file: json.dump(client_secret, json_file, indent=4) |
|
|
|
scopes = ["https://www.googleapis.com/auth/documents", "https://www.googleapis.com/auth/drive.file"] |
|
credentials = service_account.Credentials.from_service_account_file('client_secret.json', scopes=scopes) |
|
service = discovery.build('docs', 'v1', credentials=credentials) |
|
|
|
local_tz = 5.5 |
|
local_transcript = 'transcription_files/tr.txt' |
|
pid_file = 'transcription_files/pid.txt' |
|
|
|
|
|
if not os.path.exists('mp3'): os.makedirs('mp3') |
|
|
|
for f in os.listdir('mp3/'): os.remove(os.path.join('mp3/', f)) |
|
|
|
app = Flask(__name__, static_url_path='/static') |
|
app.secret_key = os.urandom(12) |
|
|
|
oauth = OAuth(app) |
|
|
|
|
|
stream_process = None |
|
recording = False |
|
mp3_extraction_process = None |
|
|
|
def update_gdoc(text, gdoc_id): |
|
print('Updating Google Doc', gdoc_id) |
|
doc = service.documents().get(documentId=gdoc_id).execute() |
|
endindex = [p['endIndex'] for p in doc['body']['content'] if 'paragraph' in p][-1] |
|
|
|
try: |
|
body = {'requests': [{'insertText': {'location': {'index': endindex-1,}, 'text': ' ' + text}}]} |
|
result = service.documents().batchUpdate(documentId=gdoc_id, body=body).execute() |
|
print(result) |
|
|
|
except Exception as e: |
|
print(e) |
|
|
|
def process_complete_callback(retcode, **kwargs): |
|
if retcode == 0: |
|
print("FFmpeg process completed successfully!") |
|
else: |
|
print("FFmpeg process encountered an error.") |
|
|
|
def transcribe_audio(latest_file, time_counter): |
|
print('transcribing ', latest_file) |
|
segments, info = wmodel.transcribe(f"{latest_file}", beam_size=beamsize) |
|
text = '' |
|
|
|
for segment in segments: |
|
text += segment.text |
|
transcribed = text.replace('\n', ' ').replace(' ', ' ') |
|
if time_counter%5 == 0: |
|
transcribed_sents = transcribed.split('. ') |
|
transcribed = transcribed_sents[0] + '\nTime ' + str((dt.now(timezone.utc) + timedelta(hours=local_tz)).strftime('%H:%M:%S')) + '\n' + '. '.join(transcribed_sents[1:]) |
|
|
|
time_counter += 1 |
|
return transcribed, time_counter |
|
|
|
def save_audio(youtube_url): |
|
global stream_process, recording, mp3_extraction_process |
|
try: |
|
streams = streamlink.streams(youtube_url) |
|
|
|
|
|
|
|
stream_url = streams["144p"].url |
|
time_counter = 0 |
|
while recording: |
|
|
|
|
|
saved_mp3 = f"mp3/audio_{int(time.time())}.mp3" |
|
mp3_extraction_process = ( |
|
ffmpeg |
|
.input(stream_url, t=30) |
|
.audio |
|
|
|
.output(saved_mp3) |
|
.overwrite_output() |
|
.global_args('-loglevel', 'panic') |
|
.run_async() |
|
) |
|
|
|
print('pid', mp3_extraction_process.pid) |
|
|
|
with open(pid_file, 'w') as f: f.write(str(mp3_extraction_process.pid)) |
|
|
|
|
|
mp3files = [f for f in os.listdir('mp3') if f.endswith('.mp3')] |
|
if len(mp3files) < 2: |
|
print('Sleeping for 30s as only one mp3 file in folder') |
|
time.sleep(30) |
|
else: |
|
starttime = time.time() |
|
file_to_transcribe = [f for f in mp3files if f != os.path.basename(saved_mp3)][0] |
|
print('Working on ', file_to_transcribe) |
|
transcribed, time_counter = transcribe_audio(f'mp3/{file_to_transcribe}', time_counter) |
|
os.remove(f'mp3/{file_to_transcribe}') |
|
|
|
update_gdoc(transcribed, gdoc_id) |
|
with open(local_transcript, 'a', encoding='utf-8', errors='ignore') as f: f.write(transcribed) |
|
|
|
elapsed_time = time.time() - starttime |
|
print('Time to transcribe:', elapsed_time, 'seconds') |
|
if elapsed_time < 30: |
|
print(f'Sleeping for {30-elapsed_time} as there are more than one mp3 files in folder') |
|
time.sleep(30-elapsed_time) |
|
|
|
|
|
except Exception as e: |
|
recording = False |
|
print('exception', str(e)) |
|
return str(e) |
|
|
|
@app.route("/start_process", methods=["POST"]) |
|
def start_process(): |
|
if not os.path.isfile(local_transcript): |
|
global recording, stream_process |
|
with open(local_transcript, 'a', encoding='utf-8', errors='ignore') as f: f.write('') |
|
|
|
youtube_url = request.form.get("url") |
|
if not youtube_url: |
|
return jsonify({"message": "Please provide a valid YouTube URL."}), 400 |
|
|
|
if recording: |
|
return jsonify({"message": "A recording is already in progress."}), 400 |
|
|
|
print('In start process') |
|
recording = True |
|
stream_process = threading.Thread(target=save_audio, args=(youtube_url,)) |
|
stream_process.start() |
|
|
|
return jsonify({"message": "Recording started."}), 200 |
|
|
|
else: return jsonify({"message": "Recording is already in progress."}), 400 |
|
|
|
|
|
@app.route("/stop_process", methods=["POST"]) |
|
def stop_process(): |
|
global recording, stream_process, mp3_extraction_process |
|
|
|
if not recording: |
|
return jsonify({"message": "No recording is currently in progress."}), 400 |
|
print('In stop process') |
|
recording = False |
|
stream_process.join() |
|
stream_process = None |
|
mp3_extraction_process.terminate() |
|
mp3_extraction_process = None |
|
for f in os.listdir('mp3/'): os.remove(os.path.join('mp3/', f)) |
|
if os.path.isfile(local_transcript): os.remove(local_transcript) |
|
|
|
if os.path.isfile(pid_file): |
|
with open(pid_file, 'r') as f: pid = int(f.read()) |
|
try: os.kill(pid, 9) |
|
except: |
|
try: |
|
process = subprocess.Popen(["taskkill", "/F", "/PID", str(pid)], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
process.communicate() |
|
print("Process terminated successfully.") |
|
except Exception as e: |
|
print("Error:", e) |
|
os.remove(pid_file) |
|
|
|
return jsonify({"message": "Recording stopped."}), 200 |
|
|
|
@app.route('/google/') |
|
def google(): |
|
CONF_URL = 'https://accounts.google.com/.well-known/openid-configuration' |
|
oauth.register( |
|
name='google', |
|
client_id=GOOGLE_CLIENT_ID, |
|
client_secret=GOOGLE_CLIENT_SECRET, |
|
server_metadata_url=CONF_URL, |
|
client_kwargs={"scope": "openid email profile"} |
|
) |
|
|
|
|
|
redirect_uri = url_for('google_auth', _external=True) |
|
session['nonce'] = generate_token() |
|
return oauth.google.authorize_redirect(redirect_uri, nonce=session['nonce']) |
|
|
|
@app.route('/google/auth/') |
|
def google_auth(): |
|
token = oauth.google.authorize_access_token() |
|
user = oauth.google.parse_id_token(token, nonce=session['nonce']) |
|
session['user'] = user |
|
print('USER', user) |
|
|
|
return redirect('/home') |
|
|
|
def is_not_logged_in(): |
|
return session.get('user') is None or session.get('nonce') is None |
|
|
|
|
|
def login_required(f): |
|
@wraps(f) |
|
def decorated_function(*args, **kwargs): |
|
if is_not_logged_in(): |
|
return redirect('/login') |
|
return f(*args, **kwargs) |
|
return decorated_function |
|
|
|
@app.route("/home") |
|
@login_required |
|
def home(): |
|
return render_template("home.html") |
|
|
|
@app.route("/", methods=["GET"]) |
|
@app.route("/login", methods=["GET"]) |
|
def login(): |
|
if not is_not_logged_in(): |
|
return redirect("/home") |
|
return render_template("login.html") |
|
|
|
if __name__ == "__main__": |
|
app.run(host="0.0.0.0", debug=True, port=8081) |