Spaces:
Sleeping
Sleeping
import streamlit as st | |
import google.generativeai as genai | |
import os | |
from groq import Groq, GroqError | |
import whisper | |
from gtts import gTTS | |
import tempfile | |
import logging | |
import numpy as np | |
from pydub import AudioSegment | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Securely configure Google API Key for generative AI (set this in the deployment environment) | |
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
genai.configure(api_key=GOOGLE_API_KEY) | |
# Set up Groq API key for audio processing | |
groq_api_key = os.getenv('groq_api_key') | |
if not groq_api_key: | |
raise ValueError("GROQ_API_KEY is not set.") | |
try: | |
groq_client = Groq(api_key=groq_api_key) | |
logger.info("Groq API key is set and client is initialized.") | |
except GroqError as e: | |
logger.error(f"Failed to initialize Groq client: {e}") | |
raise | |
try: | |
# Load Whisper model for audio transcription | |
whisper_model = whisper.load_model("base") | |
logger.info("Whisper model loaded successfully.") | |
except Exception as e: | |
logger.error(f"Failed to load Whisper model: {e}") | |
raise | |
# Initialize the Generative Model for heart health chatbot | |
model = genai.GenerativeModel( | |
'gemini-1.5-flash', | |
system_instruction=( | |
"Persona: You are Dr. Assad Siddiqui, a heart specialist. Only provide information related to heart health, symptoms, and advice. " | |
"Ask users about their heart-related symptoms and provide consultation and guidance based on their input. " | |
"Always provide brief answers. If the inquiry is not related to heart health, politely say that you can only provide heart-related information. " | |
"Responses should be in Urdu written in English and in English." | |
) | |
) | |
# Function to get response from the Generative AI chatbot | |
def get_chatbot_response(user_input): | |
response = model.generate_content(user_input) | |
return response.text.strip() | |
# Function to process audio using Whisper, Groq, and gTTS | |
def process_audio(audio_file): | |
try: | |
# Transcribe audio using Whisper | |
result = whisper_model.transcribe(audio_file) | |
user_text = result['text'] | |
logger.info(f"Transcription successful: {user_text}") | |
except Exception as e: | |
logger.error(f"Error in transcribing audio: {e}") | |
return "Error in transcribing audio.", None | |
try: | |
# Generate response using Groq API | |
chat_completion = groq_client.chat.completions.create( | |
messages=[ | |
{"role": "user", "content": user_text} | |
], | |
model="llama3-8b-8192", | |
) | |
response_text = chat_completion.choices[0].message.content | |
logger.info(f"Received response from Groq API: {response_text}") | |
except GroqError as e: | |
logger.error(f"Error in generating response with Groq API: {e}") | |
return "Error in generating response with Groq API.", None | |
try: | |
# Convert response text to speech using gTTS | |
tts = gTTS(text=response_text, lang='en') | |
audio_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') | |
tts.save(audio_file.name) | |
logger.info("Text-to-speech conversion successful.") | |
except Exception as e: | |
logger.error(f"Error in text-to-speech conversion: {e}") | |
return "Error in text-to-speech conversion.", None | |
return response_text, audio_file.name | |
# Streamlit page configuration | |
st.set_page_config( | |
page_title="Heart Health & Audio Chatbot", | |
page_icon="π¨ββοΈ", | |
layout="centered", | |
initial_sidebar_state="collapsed", | |
) | |
# Background image and custom CSS | |
st.markdown(""" | |
<style> | |
.stApp { | |
background-image: url('https://cdn.wallpapersafari.com/29/34/8Ak1Sf.png'); | |
background-size: cover; | |
background-position: center; | |
background-attachment: fixed; | |
} | |
.chat-bubble { | |
background-color: #128c7E; | |
color: white; | |
padding: 10px; | |
border-radius: 10px; | |
max-width: 70%; | |
} | |
.user-bubble { | |
background-color: #075e54; | |
color: white; | |
padding: 10px; | |
border-radius: 10px; | |
max-width: 70%; | |
align-self: flex-end; | |
} | |
img.avatar { | |
width: 50px; | |
height: 50px; | |
border-radius: 50%; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Custom header | |
def load_header(): | |
st.markdown(""" | |
<div style="padding:10px;text-align:center;color:white;"> | |
<h1>Heart Health & Audio Chatbot π«</h1> | |
<p>Ask me anything about heart diseases or process audio queries!</p> | |
</div> | |
""", unsafe_allow_html=True) | |
# Initialize session state for chat history | |
if "history" not in st.session_state: | |
st.session_state.history = [] | |
# Avatar URLs | |
user_avatar_url = "https://img.freepik.com/free-photo/sad-cartoon-anatomical-heart_23-2149767987.jpg" | |
bot_avatar_url = "https://img.freepik.com/premium-photo/3d-render-man-doctor-avatar-round-sticker-with-cartoon-character-face-user-id-thumbnail-modern-69_1181551-3160.jpg" | |
# Function to display chat history | |
def display_chat_history(): | |
for chat in st.session_state.history: | |
if chat["role"] == "user": | |
st.markdown(f""" | |
<div style="display: flex; justify-content: flex-end; align-items: center; margin-bottom: 10px;"> | |
<div class="user-bubble"> | |
<p><b>You:</b> {chat['content']}</p> | |
</div> | |
<img src="{user_avatar_url}" class="avatar"/> | |
</div> | |
""", unsafe_allow_html=True) | |
else: | |
st.markdown(f""" | |
<div style="display: flex; align-items: center; margin-bottom: 10px;"> | |
<img src="{bot_avatar_url}" class="avatar"> | |
<div class="chat-bubble"> | |
<p><b>Bot:</b> {chat['content']}</p> | |
</div> | |
</div> | |
""", unsafe_allow_html=True) | |
# Main application layout | |
def main(): | |
load_header() | |
with st.container(): | |
display_chat_history() | |
# User input for text-based chat | |
with st.form(key="user_input_form", clear_on_submit=True): | |
user_input = st.text_input( | |
label="Type your message...", | |
placeholder="Ask about heart health...", | |
max_chars=500 | |
) | |
submit_button = st.form_submit_button(label="Send") | |
if submit_button and user_input.strip(): | |
with st.spinner("Thinking..."): | |
bot_response = get_chatbot_response(user_input) | |
# Update chat history | |
st.session_state.history.append({"role": "user", "content": user_input}) | |
st.session_state.history.append({"role": "bot", "content": bot_response}) | |
# Refresh chat display | |
display_chat_history() | |
# File uploader for audio processing | |
st.subheader("Upload audio for processing") | |
audio_file = st.file_uploader("Choose an audio file", type=["mp3", "wav"]) | |
if audio_file is not None: | |
with st.spinner("Processing audio..."): | |
response_text, audio_response = process_audio(audio_file) | |
st.text(f"Response: {response_text}") | |
st.audio(audio_response) | |
# Run the app | |
if __name__ == "__main__": | |
main() | |