Spaces:
Runtime error
Runtime error
import gradio as gr | |
from gradio_client import Client | |
import os | |
import logging | |
import requests | |
from huggingface_hub import login, whoami | |
import torch | |
from threading import Thread | |
from flask import Flask, send_from_directory | |
# Define your Flask server | |
app = Flask(__name__) | |
def privacy_policy(): | |
return send_from_directory('.', 'privacy_policy.html') | |
# Start Flask server in a separate thread | |
def run_flask(): | |
app.run(port=5000) # Change the port if necessary | |
# Start Flask server thread | |
Thread(target=run_flask, daemon=True).start() | |
webhook_server = os.getenv('webhook_server') | |
API_TOKEN = os.getenv('ZeroGPU') | |
#BASE_URL = os.getenv('SpaceURL') | |
# Get user information | |
#ser_info = whoami(token=API_TOKEN) | |
# Set your token | |
login(token=API_TOKEN, add_to_git_credential=True) | |
# Initialize the client for image generation "prithivMLmods/IMAGINEO-4K" "mukaist/DALLE-4K" | |
client_image = Client("measmonysuon/DALLE-4K") | |
# Get user information to confirm authentication | |
user_info = whoami(token=API_TOKEN) | |
print("User Info:", user_info) | |
print(f"Is CUDA available: {torch.cuda.is_available()}") | |
# Define resolutions and default style | |
resolutions = { | |
"896x1152": (896, 1152), | |
"1024x1024": (1024, 1024), | |
"1216x832": (1216, 832) | |
} | |
DEFAULT_STYLE = "3840 x 2160" | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def generate_image(prompt, resolution_key, style=DEFAULT_STYLE): | |
resolution = resolutions.get(resolution_key, (1024, 1024)) | |
width, height = resolution | |
full_prompt = f"{prompt}, Canon EOS R5, 4K, Photo-Realistic, appearing photorealistic with super fine details, high resolution, natural look, hyper realistic photography, cinematic lighting, --ar 64:37, --v 6.0, --style raw, --stylize 750" | |
try: | |
result = client_image.predict( | |
prompt=full_prompt, | |
negative_prompt="(deformed, distorted, disfigured:1.3), poorly drawn, bad anatomy, wrong anatomy, extra limb, missing limb, floating limbs, (mutated hands and fingers:1.4), disconnected limbs, mutation, mutated, ugly, disgusting, blurry, amputation", | |
use_negative_prompt=True, | |
style=style, | |
seed=0, | |
width=width, | |
height=height, | |
guidance_scale=5, | |
randomize_seed=True, | |
api_name="/run" | |
) | |
logger.info("Image generation successful.") | |
return result | |
except Exception as e: | |
logger.error(f"Error generating image: {e}") | |
return Non | |
def request_otp(user_chat_id): | |
try: | |
response = requests.post(f"{webhook_server}/request_otp", json={"user_chat_id": user_chat_id}) | |
response.raise_for_status() | |
logger.debug(f"Response status code: {response.status_code}") | |
logger.debug(f"Response content: {response.text}") | |
response_data = response.json() | |
if 'error' in response_data: | |
logger.error(f"Error response from /request_otp: {response_data['error']}") | |
return {"success": False, "error": response_data.get("error", "Unknown error")} | |
return {"success": response_data.get("success", False)} | |
except requests.RequestException as e: | |
logger.error(f"Request exception: {e}") | |
return {"success": False, "error": "Request exception occurred"} | |
except ValueError as e: | |
logger.error(f"JSON decode error: {e}") | |
return {"success": False, "error": "Invalid JSON response"} | |
def verify_otp(user_chat_id, otp): | |
try: | |
response = requests.post(f"{webhook_server}/verify_otp", json={"user_chat_id": user_chat_id, "otp": otp}) | |
response.raise_for_status() | |
return response.json().get("valid", False) | |
except requests.RequestException as e: | |
logger.error(f"Request exception: {e}") | |
return False | |
except ValueError as e: | |
logger.error(f"JSON decode error: {e}") | |
return False | |
def retrieve_user_points(user_chat_id, otp): | |
if not verify_otp(user_chat_id, otp): | |
return None, "Invalid or expired OTP. Please request a new OTP." | |
response = requests.get(f"{webhook_server}/get_points", params={"user_chat_id": user_chat_id}) | |
if response.status_code == 200: | |
data = response.json() | |
points = data.get("points", 0) | |
return points, "OTA verification and points retrieved successfully. Now, enter pormpt and generate image" | |
return None, "Error retrieving points. Please try again later." | |
def update_user_points(user_chat_id, points): | |
webhook_url = f"{webhook_server}/update_user_points" | |
payload = {'user_chat_id': user_chat_id, 'points': points} | |
try: | |
response = requests.post(webhook_url, json=payload) | |
response.raise_for_status() | |
logger.info(f"User with ID {user_chat_id} had their points updated by {points}. Webhook response: {response.status_code}") | |
except requests.RequestException as e: | |
logger.error(f"Error updating user points via webhook: {e}") | |
def get_user_points(user_chat_id): | |
try: | |
response = requests.get(f"{webhook_server}/get_points", params={'user_chat_id': user_chat_id}) | |
response.raise_for_status() | |
data = response.json() | |
if 'error' in data: | |
return "User not found" | |
return data.get('points', 0) | |
except requests.RequestException as e: | |
logger.error(f"Error retrieving user points: {e}") | |
return "Error retrieving points" | |
def notify_webhook(user_chat_id): | |
webhook_url = f"{webhook_server}/notify_update" | |
payload = {'user_chat_id': user_chat_id} | |
try: | |
response = requests.post(webhook_url, json=payload) | |
response.raise_for_status() | |
logger.info(f"Webhook notification sent successfully. Response: {response.status_code}") | |
except requests.RequestException as e: | |
logger.error(f"Error sending webhook notification: {e}") | |
def notify_webhook_with_file(user_chat_id, full_file_url): | |
webhook_url = f"{webhook_server}/notify_image" | |
payload = {'user_chat_id': user_chat_id, 'file_path': full_file_url} | |
try: | |
response = requests.post(webhook_url, json=payload) | |
response.raise_for_status() | |
logger.info(f"Webhook notification sent successfully for user {user_chat_id}. Response: {response.status_code}") | |
except requests.RequestException as e: | |
logger.error(f"Error sending webhook notification with file URL: {e}") | |
''' | |
def gradio_interface(prompt, resolution_key, user_chat_id): | |
if not user_chat_id.strip(): | |
return None, "User Chat ID cannot be blank. Please provide a valid User Chat ID." | |
result = generate_image(prompt, resolution_key) | |
if result and result[0]: | |
file_path = result[0][0].get('image') | |
if file_path and os.path.exists(file_path): | |
update_user_points(user_chat_id, -5) | |
notify_webhook_with_file(user_chat_id, file_path) # Notify webhook with file path | |
return file_path, "The image was generated successfully." | |
return None, "The image file is not available. Please try again later." | |
return None, "There was an error processing your photo. Please try again later." | |
''' | |
def gradio_interface(prompt, resolution_key, user_chat_id): | |
if not user_chat_id.strip(): | |
return None, "User Chat ID cannot be blank. Please provide a valid User Chat ID or register with @supportBot to get UID." | |
result = generate_image(prompt, resolution_key) | |
if result and result[0]: | |
file_path = result[0][0].get('image') | |
if file_path: | |
# Combine base URL with file path to create full URL | |
base_url = "https://measmonysuon-imagen.hf.space/file=" | |
full_file_url = base_url + file_path | |
if os.path.exists(file_path): | |
update_user_points(user_chat_id, -5) | |
notify_webhook_with_file(user_chat_id, full_file_url) # Notify webhook with full file URL | |
return full_file_url, "Check your telegram, The image was generated successfully." | |
return None, "The image file is not available. Please try again later." | |
return None, "There was an error processing your photo. Please try again later." | |
def handle_generate_image(prompt, resolution_key, user_chat_id): | |
points = get_user_points(user_chat_id) | |
if isinstance(points, str): | |
if points == "User not found": | |
return None, "Please register with https://t.me/myexpsupportBot" | |
return None, "Error retrieving points. Please try again later." | |
try: | |
points = int(points) | |
except ValueError: | |
return None, "Error processing points. Please try again later." | |
if points >= 5: | |
result = gradio_interface(prompt, resolution_key, user_chat_id) | |
if result[0]: | |
return result[0], "The image was generated successfully." | |
return None, "There was an error processing your photo. Please try again later." | |
return None, "Insufficient points. Please get more points before generating an image." | |
def request_otp_and_update_ui(user_chat_id): | |
# Validate user_chat_id | |
if not user_chat_id.strip(): | |
return "User Chat ID cannot be blank. Please provide a valid User Chat ID or register with @supportBot to get UID." | |
otp_response = request_otp(user_chat_id) | |
if otp_response.get("success"): | |
return "OTP has been sent to your Telegram" | |
return "Error requesting OTP. Please try again later." | |
def create_gradio_interface(): | |
with gr.Blocks() as interface: | |
gr.HTML(""" | |
<h1>AI ImaGen DALL·E 4K</h1> | |
<h3>Welcome! Please enter your UID and OTP to continue or Register with <a href="https://t.me/myexpsupportBot">@SupportBot</a> to get UID</h3> | |
""") | |
with gr.Row(): | |
user_chat_id_input = gr.Textbox(label="Your UID", placeholder="Enter your UID") | |
user_otp_input = gr.Textbox(label="OTP", type="password", placeholder="Enter your OTP") | |
points_output = gr.Textbox(label="Your Balance", placeholder="Your Points", interactive=False) | |
with gr.Row(): | |
request_otp_button = gr.Button("Request OTP") | |
get_points_button = gr.Button("Verify OTP") | |
message_output = gr.Textbox(label="Message", placeholder="Results and notifications will be shown here", interactive=False) | |
prompt_input = gr.Textbox(label="Prompt", placeholder="Enter your prompt here...") | |
resolution_dropdown = gr.Dropdown(choices=list(resolutions.keys()), label="Resolution", value="1024x1024") | |
generate_button = gr.Button("Generate") | |
result_output = gr.Image(label="Generated Image", type="pil") | |
generate_button.click( | |
fn=handle_generate_image, | |
inputs=[prompt_input, resolution_dropdown, user_chat_id_input], | |
outputs=[result_output, message_output] | |
) | |
get_points_button.click( | |
fn=retrieve_user_points, | |
inputs=[user_chat_id_input, user_otp_input], | |
outputs=[points_output, message_output] | |
) | |
request_otp_button.click( | |
fn=request_otp_and_update_ui, | |
inputs=[user_chat_id_input], | |
outputs=[message_output] | |
) | |
gr.HTML(""" | |
<style> | |
footer.svelte-1rjryqp { | |
display: none !important; | |
} | |
</style> | |
""") | |
return interface | |
def launch_gradio(): | |
interface = create_gradio_interface() | |
interface.queue(max_size=20, api_open=False) # Configure queue and API accessibility | |
interface.launch(show_error=True) # Enable verbose error reporting | |
if __name__ == '__main__': | |
launch_gradio() |