from flask import Flask, request, render_template_string, send_from_directory, jsonify from flask import render_template import sqlite3 import os import uuid import json import base64 import unittest import requests own_url = os.getenv('own_url') key_d = os.getenv('gc_api') test_url = os.getenv('gc_url') import json from datetime import datetime import whatsapp_api_webhook_server_python.webhooksHandler as handler app = Flask(__name__, template_folder="./") app.config['DEBUG'] = True UPLOAD_FOLDER = 'static' # Создание директории, если она не существует if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) # Создание базы данных и таблицы def init_db(): try: conn = sqlite3.connect('data.db') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS contacts ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, phone TEXT NOT NULL, email TEXT NOT NULL ) ''') conn.commit() conn.close() except Exception as e: print(f"Error initializing database: {e}") # Вызов функции для инициализации базы данных init_db() @app.route('/settings', methods=['GET']) def settings(): return render_template('settings.html') @app.route('/online', methods=['GET']) def onli(): return render_template('online.html') @app.route('/ver', methods=['GET']) def veref(): return render_template('ver.html') @app.route('/se_mes', methods=['GET']) def se_mes(): return render_template('se_mes.html') @app.route('/se_mes_im', methods=['GET']) def se_mes_im(): return render_template('se_mes_im.html') @app.route('/se_mes_im_ran', methods=['GET']) def se_mes_im_ran(): return render_template('se_mes_im_ran.html') @app.route('/se_mes_im2', methods=['GET']) def se_mes_im2(): return render_template('se_mes_im2.html') @app.route('/se_mes_f', methods=['GET']) def se_mes_f(): return render_template('se_mes_f.html') @app.route('/up_gr', methods=['GET']) def up_gr(): return render_template('up_gr.html') @app.route('/up_user_gp', methods=['GET']) def up_user_gp(): return render_template('up_user_gp.html') @app.route('/del_user_gp', methods=['GET']) def del_user_gp(): return render_template('del_user_gp.html') @app.route('/up_ad', methods=['GET']) def up_ad(): return render_template('up_ad.html') @app.route('/del_ad', methods=['GET']) def del_ad(): return render_template('del_ad.html') @app.route('/se_opr', methods=['GET']) def se_opr(): return render_template('se_opr.html') @app.route('/online', methods=['GET']) def online(): return render_template('online.html') @app.route('/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return "No file part", 400 file = request.files['file'] if file.filename == '': return "No selected file", 400 # Генерация уникального имени файла unique_filename = str(uuid.uuid4()) + os.path.splitext(file.filename)[1] save_path = os.path.join(UPLOAD_FOLDER, unique_filename) file.save(save_path) # Возвращаем полный URL загруженного файла с протоколом https full_url = request.url_root.replace('http://', 'https://') + 'uploads/' + unique_filename return f"File uploaded successfully and saved to {full_url}", 200 @app.route('/uploads/', methods=['GET']) def uploaded_file(filename): return send_from_directory(UPLOAD_FOLDER, filename) @app.route('/up_fa', methods=['GET']) def up_fa(): return render_template('up_fa.html') # Маршрут для обработки GET-запроса из gc @app.route('/add_contact', methods=['GET']) def add_contact(): try: name = request.args.get('name') phone = request.args.get('phone') email = request.args.get('email') if not name or not phone or not email: return "Parameters 'name', 'phone', and 'email' are required.", 400 conn = sqlite3.connect('data.db') cursor = conn.cursor() cursor.execute('INSERT INTO contacts (name, phone, email) VALUES (?, ?, ?)', (name, phone, email)) conn.commit() conn.close() return f"Contact added: {name} - {phone} - {email}", 200 except Exception as e: print(f"Error adding contact: {e}") return "Internal Server Error", 500 # Маршрут для отображения таблицы контактов из gc @app.route('/contacts') def show_contacts(): try: conn = sqlite3.connect('data.db') cursor = conn.cursor() cursor.execute('SELECT name, phone, email FROM contacts') contacts = cursor.fetchall() conn.close() # HTML-шаблон для отображения таблицы html = ''' Contacts

Contacts

{% for contact in contacts %} {% endfor %}
Name Phone Email
{{ contact[0] }} {{ contact[1] }} {{ contact[2] }}
''' return render_template_string(html, contacts=contacts) except Exception as e: print(f"Error showing contacts: {e}") return "Internal Server Error", 500 # Переменные с данными action_d = "add" params_d = "" name_d = "" email_d = "" phone_d = "" pr1_d = "" pr2_d = "" pr3_d = "" @app.route('/gc_db', methods=['GET']) def gc_db(): # Чтение параметров из GET-запроса name_d = request.args.get('name', '') email_d = request.args.get('email', '') phone_d = request.args.get('phone', '') pr1_d = request.args.get('pr1', '') pr2_d = request.args.get('pr2', '') pr3_d = request.args.get('pr3', '') # Формирование JSON json_data = { "user": { "email": email_d, "phone": phone_d, "first_name": name_d, "addfields": { "pr1": pr1_d, "pr2": pr2_d, "pr3": pr3_d } }, "system": { "refresh_if_exists": 1 }, "session": { "utm_source": "", "utm_medium": "", "utm_content": "", "utm_campaign": "", "utm_group": "", "gcpc": "", "gcao": "", "referer": "" } } # Конвертация JSON в Base64 json_str = json.dumps(json_data) params_d = base64.b64encode(json_str.encode('utf-8')).decode('utf-8') # Данные для отправки в теле запроса data = { 'key': key_d, 'action': action_d, 'params': params_d } # Отправка POST-запроса с данными в формате "form-data" response = requests.post(test_url, data=data) # Возвращаем ответ от тестового адреса return { 'status_code': response.status_code, 'response_body': response.text } if __name__ == '__main__': app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 7860)))