|
from lollms.config import TypedConfig, BaseConfig, ConfigTemplate, InstallOption |
|
from lollms.types import MSG_TYPE |
|
from lollms.personality import APScript, AIPersonality |
|
from lollms.helpers import ASCIIColors |
|
|
|
import json |
|
from pathlib import Path |
|
|
|
|
|
class Text2Paragraphs: |
|
def __init__(self, database_path=None, max_chunk_size=2000): |
|
self.paragraphs = [] |
|
self.database_path = database_path |
|
self.max_chunk_size = max_chunk_size |
|
if database_path is not None: |
|
if Path(database_path).exists(): |
|
self.load_from_json() |
|
|
|
def chunk_text(self, text): |
|
paragraphs = text.split("\n") |
|
if self.max_chunk_size is not None: |
|
current_chunk = [] |
|
current_chunk_size = 0 |
|
for paragraph in paragraphs: |
|
if current_chunk_size + len(paragraph) <= self.max_chunk_size: |
|
current_chunk.append(paragraph) |
|
current_chunk_size += len(paragraph) |
|
else: |
|
self.paragraphs.append("\n".join(current_chunk)) |
|
current_chunk = [paragraph] |
|
current_chunk_size = len(paragraph) |
|
if current_chunk: |
|
self.paragraphs.append("\n".join(current_chunk)) |
|
else: |
|
self.paragraphs.extend(paragraphs) |
|
|
|
def load_from_json(self, filename=None): |
|
if filename is None: |
|
filename = self.database_path |
|
with open(filename, "r") as file: |
|
data = json.load(file) |
|
self.paragraphs = data["paragraphs"] |
|
|
|
def save_to_json(self, filename=None): |
|
if filename is None: |
|
filename = self.database_path |
|
data = {"paragraphs": self.paragraphs} |
|
with open(filename, "w") as file: |
|
json.dump(data, file) |
|
|
|
|
|
class Processor(APScript): |
|
""" |
|
A class that processes model inputs and outputs. |
|
|
|
Inherits from APScript. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
personality: AIPersonality |
|
) -> None: |
|
|
|
self.word_callback = None |
|
|
|
personality_config_template = ConfigTemplate( |
|
[ |
|
{"name":"database_path","type":"str","value":f"{personality.name}_db.json", "help":"Path to the database"}, |
|
{"name":"max_chunk_size","type":"int","value":512, "min":10, "max":personality.config["ctx_size"],"help":"Maximum size of text chunks to vectorize"}, |
|
{"name":"max_answer_size","type":"int","value":512, "min":10, "max":personality.config["ctx_size"],"help":"Maximum number of tokens to allow the generator to generate as an answer to your question"}, |
|
|
|
] |
|
) |
|
personality_config_vals = BaseConfig.from_template(personality_config_template) |
|
|
|
personality_config = TypedConfig( |
|
personality_config_template, |
|
personality_config_vals |
|
) |
|
super().__init__( |
|
personality, |
|
personality_config |
|
) |
|
|
|
self.state = 0 |
|
|
|
self.text_store = Text2Paragraphs( |
|
self.personality.lollms_paths.personal_data_path/self.personality_config.database_path, |
|
max_chunk_size=self.personality_config.max_chunk_size |
|
) |
|
|
|
@staticmethod |
|
def read_pdf_file(file_path): |
|
import PyPDF2 |
|
with open(file_path, 'rb') as file: |
|
pdf_reader = PyPDF2.PdfReader(file) |
|
text = "" |
|
for page in pdf_reader.pages: |
|
text += page.extract_text() |
|
return text |
|
|
|
@staticmethod |
|
def read_docx_file(file_path): |
|
from docx import Document |
|
doc = Document(file_path) |
|
text = "" |
|
for paragraph in doc.paragraphs: |
|
text += paragraph.text + "\n" |
|
return text |
|
|
|
@staticmethod |
|
def read_json_file(file_path): |
|
with open(file_path, 'r') as file: |
|
data = json.load(file) |
|
return data |
|
|
|
@staticmethod |
|
def read_csv_file(file_path): |
|
import csv |
|
with open(file_path, 'r') as file: |
|
csv_reader = csv.reader(file) |
|
lines = [row for row in csv_reader] |
|
return lines |
|
|
|
@staticmethod |
|
def read_html_file(file_path): |
|
from bs4 import BeautifulSoup |
|
with open(file_path, 'r') as file: |
|
soup = BeautifulSoup(file, 'html.parser') |
|
text = soup.get_text() |
|
return text |
|
@staticmethod |
|
def read_pptx_file(file_path): |
|
from pptx import Presentation |
|
prs = Presentation(file_path) |
|
text = "" |
|
for slide in prs.slides: |
|
for shape in slide.shapes: |
|
if shape.has_text_frame: |
|
for paragraph in shape.text_frame.paragraphs: |
|
for run in paragraph.runs: |
|
text += run.text |
|
return text |
|
@staticmethod |
|
def read_text_file(file_path): |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
content = file.read() |
|
return content |
|
|
|
def build_db(self): |
|
ASCIIColors.info("-> Vectorizing the database"+ASCIIColors.color_orange) |
|
for file in self.files: |
|
try: |
|
if Path(file).suffix==".pdf": |
|
text = Processor.read_pdf_file(file) |
|
elif Path(file).suffix==".docx": |
|
text = Processor.read_docx_file(file) |
|
elif Path(file).suffix==".docx": |
|
text = Processor.read_pptx_file(file) |
|
elif Path(file).suffix==".json": |
|
text = Processor.read_json_file(file) |
|
elif Path(file).suffix==".csv": |
|
text = Processor.read_csv_file(file) |
|
elif Path(file).suffix==".html": |
|
text = Processor.read_html_file(file) |
|
else: |
|
text = Processor.read_text_file(file) |
|
|
|
self.text_store.chunk_text(text) |
|
self.text_store.save_to_json() |
|
print(ASCIIColors.color_reset) |
|
ASCIIColors.success(f"File {file} vectorized successfully") |
|
except Exception as ex: |
|
ASCIIColors.error(f"Couldn't vectorize {file}: The vectorizer threw this exception:{ex}") |
|
|
|
def add_file(self, path): |
|
super().add_file(path) |
|
try: |
|
self.build_db() |
|
if self.word_callback: |
|
self.word_callback("File added successfully",MSG_TYPE.MSG_TYPE_STEP) |
|
return True |
|
except Exception as ex: |
|
ASCIIColors.error(f"Couldn't vectorize the database: The vectgorizer threw this exception: {ex}") |
|
return False |
|
|
|
def run_workflow(self, prompt, previous_discussion_text="", callback=None): |
|
""" |
|
Runs the workflow for processing the model input and output. |
|
|
|
This method should be called to execute the processing workflow. |
|
|
|
Args: |
|
generate_fn (function): A function that generates model output based on the input prompt. |
|
The function should take a single argument (prompt) and return the generated text. |
|
prompt (str): The input prompt for the model. |
|
previous_discussion_text (str, optional): The text of the previous discussion. Default is an empty string. |
|
|
|
Returns: |
|
None |
|
""" |
|
|
|
self.word_callback = callback |
|
output ="" |
|
if prompt.strip().lower()=="send_file": |
|
self.state = 1 |
|
print("Please provide the file name") |
|
if callback is not None: |
|
callback("Please provide the file path", MSG_TYPE.MSG_TYPE_FULL) |
|
output = "Please provide the file name" |
|
elif prompt.strip().lower()=="help": |
|
if callback: |
|
callback(self.personality.help,MSG_TYPE.MSG_TYPE_FULL) |
|
ASCIIColors.info(help) |
|
self.state = 0 |
|
elif prompt.strip().lower()=="show_database": |
|
if callback: |
|
callback("Current database\n",MSG_TYPE.MSG_TYPE_CHUNK) |
|
print("Current database\n") |
|
for chunk in self.text_store.paragraphs: |
|
if callback: |
|
callback(chunk+"\n",MSG_TYPE.MSG_TYPE_CHUNK) |
|
print(chunk) |
|
|
|
self.state = 0 |
|
elif prompt.strip().lower()=="clear_database": |
|
database_fill_path:Path = self.personality.lollms_paths.personal_data_path/self.personality_config["database_path"] |
|
if database_fill_path.exists(): |
|
database_fill_path.unlink() |
|
self.personality_config.database_path = prompt |
|
self.personality_config.save() |
|
self.text_store = Text2Paragraphs( |
|
self.personality.lollms_paths.personal_data_path/self.personality_config.database_path, |
|
max_chunk_size=self.personality_config.max_chunk_size |
|
) |
|
if callback is not None: |
|
callback("Database file cleared successfully", MSG_TYPE.MSG_TYPE_FULL) |
|
else: |
|
if callback is not None: |
|
callback("The database file does not exist yet, so you can't clear it", MSG_TYPE.MSG_TYPE_FULL) |
|
self.state = 0 |
|
elif prompt.strip().lower()=="set_database": |
|
print("Please provide the database file name") |
|
if callback is not None: |
|
callback("Please provide the database file path", MSG_TYPE.MSG_TYPE_FULL) |
|
output = "Please provide the database file name" |
|
self.state = 2 |
|
elif prompt.strip().lower()=="convert": |
|
if callback is not None: |
|
callback("# Full bullet points summary:\n", MSG_TYPE.MSG_TYPE_CHUNK) |
|
for i,chunk in enumerate(self.text_store.paragraphs): |
|
if len(chunk.split())<50: |
|
print(chunk) |
|
continue |
|
docs = '### Instruction: Summarize the following paragraph in the form of bullet points.\nParagraph:'+chunk+"\nBullet points:\n-" |
|
ASCIIColors.error("\n-------------- Documentation -----------------------") |
|
ASCIIColors.error(docs) |
|
ASCIIColors.error("----------------------------------------------------") |
|
print("Thinking") |
|
output = self.generate(docs, self.personality_config.max_answer_size) |
|
self.text_store.paragraphs[i]= output |
|
if callback is not None: |
|
callback(output, MSG_TYPE.MSG_TYPE_CHUNK) |
|
|
|
elif prompt.strip().lower()=="clear_database": |
|
(self.personality.lollms_paths.personal_data_path/self.personality_config["database_path"]).unlink() |
|
self.text_store = Text2Paragraphs( |
|
self.personality.lollms_paths.personal_data_path/self.personality_config.database_path, |
|
max_chunk_size=self.personality_config.max_chunk_size |
|
) |
|
self.state = 0 |
|
else: |
|
if self.state ==1: |
|
try: |
|
self.add_file(prompt) |
|
except Exception as ex: |
|
ASCIIColors.error(f"Exception: {ex}") |
|
output = str(ex) |
|
self.state=0 |
|
elif self.state ==2: |
|
try: |
|
new_db_path = Path(prompt) |
|
if new_db_path.exists(): |
|
self.personality_config.database_path = prompt |
|
self.text_store = Text2Paragraphs( |
|
self.personality.lollms_paths.personal_data_path/self.personality_config.database_path, |
|
max_chunk_size=self.personality_config.max_chunk_size |
|
) |
|
|
|
self.personality_config.config.save_config() |
|
else: |
|
output = "Database file not found.\nGoing back to default state." |
|
except Exception as ex: |
|
ASCIIColors.error(f"Exception: {ex}") |
|
output = str(ex) |
|
self.state=0 |
|
else: |
|
output = "unknown command" |
|
return output |
|
|
|
|
|
|
|
|