import os
import shutil
import hmac
import hashlib
import base64
import subprocess
import time
from mysite.logger import logger
import async_timeout
import asyncio
import mysite.interpreter.interpreter_config
from models.ride import test_set_lide
from mysite.libs.github import github
import requests
import json
from mysite.logger import log_error
GENERATION_TIMEOUT_SEC=60
BASE_PATH = "/home/user/app/controllers/"
def set_environment_variables():
os.environ["OPENAI_API_BASE"] = "https://api.groq.com/openai/v1"
os.environ["OPENAI_API_KEY"] = os.getenv("api_key")
os.environ["MODEL_NAME"] = "llama3-8b-8192"
os.environ["LOCAL_MODEL"] = "true"
def convert_newlines_to_google_chat_format(text):
# 改行文字を
タグに置き換える
return text.replace('\\n', '\\\n')
def send_google_chat_card(webhook_url, title, subtitle, link_text, link_url):
headers = {
'Content-Type': 'application/json; charset=UTF-8'
}
subtitle = convert_newlines_to_google_chat_format(subtitle)
card_message = {
"cards": [
{
"header": {
"title": title,
},
"sections": [
{
"widgets": [
{
"textParagraph": {
"text": subtitle
}
},
{
"textParagraph": {
"text": "{}".format(link_text)
}
},
{
"buttons": [
{
"textButton": {
"text": "Open Link",
"onClick": {
"openLink": {
"url": link_url
}
}
}
}
]
},
{
"textParagraph": {
"text": "{}".format("ステップ2 テスト google apps script(その場で誰でもさくっと動くレベルがいい)")
}
},
{
"buttons": [
{
"textButton": {
"text": "Open Link",
"onClick": {
"openLink": {
"url": "https://script.google.com/d/1YJCQDrzGnrJiRcJcRjX9RuKR3gGrIj84p5kp4ZMiY-ls9UafjhDEUXIk/edit?usp=sharing"
}
}
}
}
]
},
{
"textParagraph": {
"text": "{}".format("ステップ3 テスト google colab(その場で誰でもさくっと動くレベルがいい)")
}
},
{
"buttons": [
{
"textButton": {
"text": "Open Link",
"onClick": {
"openLink": {
"url": "https://colab.research.google.com/drive/1LNR_Y9sk9DGYnWTxHP09s9x4JLe3GmqS?hl=ja"
}
}
}
}
]
},
{
"textParagraph": {
"text": "{}".format("ステップ4 workflow に組み込み(うまくいった 小さなものを組み合わせる)")
}
},
{
"buttons": [
{
"textButton": {
"text": "Open Link",
"onClick": {
"openLink": {
"url": "https://bpmboxesscom-46463613.hubspotpagebuilder.com/ja/"
}
}
}
}
]
},
{
"textParagraph": {
"text": "{}".format("ステップ5 カスタマーサービス内部で相談")
}
},
{
"buttons": [
{
"textButton": {
"text": "Open Link",
"onClick": {
"openLink": {
"url": "https://bpmboxesscom-46463613.hubspotpagebuilder.com/ja/"
}
}
}
}
]
},
{
"textParagraph": {
"text": "{}".format("ステップ6 過去のテンプレートを元に思ったものを作ってみる コピーペースト 繰り返し")
}
},
{
"buttons": [
{
"textButton": {
"text": "Open Link",
"onClick": {
"openLink": {
"url": "https://kenken999-php.hf.space/main_list.php"
}
}
}
}
]
},
{
"textParagraph": {
"text": "{}".format("リファスタチャットボット確認")
}
},
{
"buttons": [
{
"textButton": {
"text": "Open Link",
"onClick": {
"openLink": {
"url": "https://www.coze.com/store/bot/7372534558954176520?panel=1&bid=6cqv73j0k7g0i"
}
}
}
}
]
},
]
}
]
}
]
}
response = requests.post(webhook_url, headers=headers, data=json.dumps(card_message))
if response.status_code == 200:
print("Message posted successfully.")
else:
print(f"Failed to post message: {response.status_code}, {response.text}")
def validate_signature(body: str, signature: str, secret: str) -> bool:
if secret is None:
logger.error("Secret is None")
return False
hash = hmac.new(
secret.encode("utf-8"), body.encode("utf-8"), hashlib.sha256
).digest()
expected_signature = base64.b64encode(hash).decode("utf-8")
return hmac.compare_digest(expected_signature, signature)
#プロセスの実行
def no_process_file(prompt, foldername):
set_environment_variables()
try:
proc = subprocess.Popen(["mkdir", f"{BASE_PATH}{foldername}"])
except subprocess.CalledProcessError as e:
return f"Processed Content:\n{e.stdout}\n\nMake Command Error:\n{e.stderr}"
no_extension_path = f"{BASE_PATH}{foldername}/prompt"
time.sleep(1)
with open(no_extension_path, "a") as f:
f.write(prompt)
time.sleep(1)
try:
prompt_file_path = no_extension_path
with open(prompt_file_path, "a") as prompt_file:
prompt_file.write(prompt)
except Exception as e:
return f"Error writing prompt to file: {str(e)}"
time.sleep(1)
try:
proc = subprocess.Popen(
["make", "run", foldername],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
stdout, stderr = proc.communicate(input="n\ny\ny\n")
webhook_url = os.getenv("chat_url")
token = os.getenv("token")
#githubでのソース作成
#log_error("github でエラーが起きました")
try:
url = github(token,foldername)
except Exception as e:
log_error(e)
title = """ラインで作るオープンシステム
お客様の質問内容の
プログラムを作成しました"""
subtitle = prompt
link_text = "ステップ1 githubで プログラムを確認する githubだと相談者が沢山いるのでわからなければ聞いてみる "
link_url = url
send_google_chat_card(webhook_url, title, subtitle, link_text, link_url)
#insert data to db
#test_set_lide(prompt,url)
print(f"Processed Content:success ")
return f"Processed Content:success "#\n{stdout}\n\nMake Command Output:\n{stdout}\n\nMake Command Error:\n{stderr}"
except subprocess.CalledProcessError as e:
print(f"Processed Content:false ")
return f"Processed Content:error"# {str(e)}"#\n{e.stdout}\n\nMake Command Error:\n{e.stderr}"
def process_file(fileobj, prompt, foldername,token=None):
set_environment_variables()
try:
proc = subprocess.Popen(["mkdir", f"{BASE_PATH}{foldername}"])
except subprocess.CalledProcessError as e:
return f"Processed Content:\n{e.stdout}\n\nMake Command Error:\n{e.stderr}"
time.sleep(2)
path = f"{BASE_PATH}{foldername}/" + os.path.basename(fileobj)
shutil.copyfile(fileobj.name, path)
base_name = os.path.splitext(os.path.basename(fileobj))[0]
no_extension_path = f"{BASE_PATH}{foldername}/{base_name}"
shutil.copyfile(fileobj, no_extension_path)
with open(no_extension_path, "a") as f:
f.write(prompt)
try:
prompt_file_path = no_extension_path
with open(prompt_file_path, "w") as prompt_file:
prompt_file.write(prompt)
except Exception as e:
return f"Error writing prompt to file: {str(e)}"
time.sleep(1)
#foldernameの登録
test_set_lide(prompt,foldername)
try:
proc = subprocess.Popen(
["make", "run", foldername],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
stdout, stderr = proc.communicate(input="n\ny\ny\n")
test_set_lide(prompt,url)
return f"Processed url:{url}\nContent:\n{stdout}\n\nMake Command Output:\n{stdout}\n\nMake Command Error:\n{stderr}"
except subprocess.CalledProcessError as e:
return f"Processed url:{url}\nContent:\n{stdout}\n\nMake Command Error:\n{e.stderr}"