oceansweep commited on
Commit
30b5872
1 Parent(s): 2de654d

Update App_Function_Libraries/DB/Character_Chat_DB.py

Browse files
App_Function_Libraries/DB/Character_Chat_DB.py CHANGED
@@ -1,1059 +1,1058 @@
1
- # character_chat_db.py
2
- # Database functions for managing character cards and chat histories.
3
- # #
4
- # Imports
5
- import configparser
6
- import sqlite3
7
- import json
8
- import os
9
- import sys
10
- from typing import List, Dict, Optional, Tuple, Any, Union
11
-
12
- from App_Function_Libraries.Utils.Utils import get_database_dir, get_project_relative_path, get_database_path
13
- from Tests.Chat_APIs.Chat_APIs_Integration_test import logging
14
-
15
- #
16
- #######################################################################################################################
17
- #
18
- #
19
-
20
- def ensure_database_directory():
21
- os.makedirs(get_database_dir(), exist_ok=True)
22
-
23
- ensure_database_directory()
24
-
25
-
26
- # Construct the path to the config file
27
- config_path = get_project_relative_path('Config_Files/config.txt')
28
-
29
- # Read the config file
30
- config = configparser.ConfigParser()
31
- config.read(config_path)
32
-
33
- # Get the chat db path from the config, or use the default if not specified
34
- chat_DB_PATH = config.get('Database', 'chatDB_path', fallback=get_database_path('chatDB.db'))
35
- print(f"Chat Database path: {chat_DB_PATH}")
36
-
37
- ########################################################################################################
38
- #
39
- # Functions
40
-
41
- # FIXME - Setup properly and test/add documentation for its existence...
42
- def initialize_database():
43
- """Initialize the SQLite database with required tables and FTS5 virtual tables."""
44
- conn = None
45
- try:
46
- conn = sqlite3.connect(chat_DB_PATH)
47
- cursor = conn.cursor()
48
-
49
- # Enable foreign key constraints
50
- cursor.execute("PRAGMA foreign_keys = ON;")
51
-
52
- # Create CharacterCards table with V2 fields
53
- cursor.execute("""
54
- CREATE TABLE IF NOT EXISTS CharacterCards (
55
- id INTEGER PRIMARY KEY AUTOINCREMENT,
56
- name TEXT UNIQUE NOT NULL,
57
- description TEXT,
58
- personality TEXT,
59
- scenario TEXT,
60
- image BLOB,
61
- post_history_instructions TEXT,
62
- first_mes TEXT,
63
- mes_example TEXT,
64
- creator_notes TEXT,
65
- system_prompt TEXT,
66
- alternate_greetings TEXT,
67
- tags TEXT,
68
- creator TEXT,
69
- character_version TEXT,
70
- extensions TEXT,
71
- created_at DATETIME DEFAULT CURRENT_TIMESTAMP
72
- );
73
- """)
74
-
75
- # Create FTS5 virtual table for CharacterCards
76
- cursor.execute("""
77
- CREATE VIRTUAL TABLE IF NOT EXISTS CharacterCards_fts USING fts5(
78
- name,
79
- description,
80
- personality,
81
- scenario,
82
- system_prompt,
83
- content='CharacterCards',
84
- content_rowid='id'
85
- );
86
- """)
87
-
88
- # Create triggers to keep FTS5 table in sync with CharacterCards
89
- cursor.executescript("""
90
- CREATE TRIGGER IF NOT EXISTS CharacterCards_ai AFTER INSERT ON CharacterCards BEGIN
91
- INSERT INTO CharacterCards_fts(
92
- rowid,
93
- name,
94
- description,
95
- personality,
96
- scenario,
97
- system_prompt
98
- ) VALUES (
99
- new.id,
100
- new.name,
101
- new.description,
102
- new.personality,
103
- new.scenario,
104
- new.system_prompt
105
- );
106
- END;
107
-
108
- CREATE TRIGGER IF NOT EXISTS CharacterCards_ad AFTER DELETE ON CharacterCards BEGIN
109
- DELETE FROM CharacterCards_fts WHERE rowid = old.id;
110
- END;
111
-
112
- CREATE TRIGGER IF NOT EXISTS CharacterCards_au AFTER UPDATE ON CharacterCards BEGIN
113
- UPDATE CharacterCards_fts SET
114
- name = new.name,
115
- description = new.description,
116
- personality = new.personality,
117
- scenario = new.scenario,
118
- system_prompt = new.system_prompt
119
- WHERE rowid = new.id;
120
- END;
121
- """)
122
-
123
- # Create CharacterChats table
124
- cursor.execute("""
125
- CREATE TABLE IF NOT EXISTS CharacterChats (
126
- id INTEGER PRIMARY KEY AUTOINCREMENT,
127
- character_id INTEGER NOT NULL,
128
- conversation_name TEXT,
129
- chat_history TEXT,
130
- is_snapshot BOOLEAN DEFAULT FALSE,
131
- created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
132
- FOREIGN KEY (character_id) REFERENCES CharacterCards(id) ON DELETE CASCADE
133
- );
134
- """)
135
-
136
- # Create FTS5 virtual table for CharacterChats
137
- cursor.execute("""
138
- CREATE VIRTUAL TABLE IF NOT EXISTS CharacterChats_fts USING fts5(
139
- conversation_name,
140
- chat_history,
141
- content='CharacterChats',
142
- content_rowid='id'
143
- );
144
- """)
145
-
146
- # Create triggers to keep FTS5 table in sync with CharacterChats
147
- cursor.executescript("""
148
- CREATE TRIGGER IF NOT EXISTS CharacterChats_ai AFTER INSERT ON CharacterChats BEGIN
149
- INSERT INTO CharacterChats_fts(rowid, conversation_name, chat_history)
150
- VALUES (new.id, new.conversation_name, new.chat_history);
151
- END;
152
-
153
- CREATE TRIGGER IF NOT EXISTS CharacterChats_ad AFTER DELETE ON CharacterChats BEGIN
154
- DELETE FROM CharacterChats_fts WHERE rowid = old.id;
155
- END;
156
-
157
- CREATE TRIGGER IF NOT EXISTS CharacterChats_au AFTER UPDATE ON CharacterChats BEGIN
158
- UPDATE CharacterChats_fts SET conversation_name = new.conversation_name, chat_history = new.chat_history
159
- WHERE rowid = new.id;
160
- END;
161
- """)
162
-
163
- # Create ChatKeywords table
164
- cursor.execute("""
165
- CREATE TABLE IF NOT EXISTS ChatKeywords (
166
- chat_id INTEGER NOT NULL,
167
- keyword TEXT NOT NULL,
168
- FOREIGN KEY (chat_id) REFERENCES CharacterChats(id) ON DELETE CASCADE
169
- );
170
- """)
171
-
172
- # Create indexes for faster searches
173
- cursor.execute("""
174
- CREATE INDEX IF NOT EXISTS idx_chatkeywords_keyword ON ChatKeywords(keyword);
175
- """)
176
- cursor.execute("""
177
- CREATE INDEX IF NOT EXISTS idx_chatkeywords_chat_id ON ChatKeywords(chat_id);
178
- """)
179
-
180
- conn.commit()
181
- logging.info("Database initialized successfully.")
182
- except sqlite3.Error as e:
183
- logging.error(f"SQLite error occurred during database initialization: {e}")
184
- if conn:
185
- conn.rollback()
186
- raise
187
- except Exception as e:
188
- logging.error(f"Unexpected error occurred during database initialization: {e}")
189
- if conn:
190
- conn.rollback()
191
- raise
192
- finally:
193
- if conn:
194
- conn.close()
195
-
196
- # Call initialize_database() at the start of your application
197
- def setup_chat_database():
198
- try:
199
- initialize_database()
200
- except Exception as e:
201
- logging.critical(f"Failed to initialize database: {e}")
202
- sys.exit(1)
203
-
204
- setup_chat_database()
205
-
206
-
207
- ########################################################################################################
208
- #
209
- # Character Card handling
210
-
211
- def parse_character_card(card_data: Dict[str, Any]) -> Dict[str, Any]:
212
- """Parse and validate a character card according to V2 specification."""
213
- v2_data = {
214
- 'name': card_data.get('name', ''),
215
- 'description': card_data.get('description', ''),
216
- 'personality': card_data.get('personality', ''),
217
- 'scenario': card_data.get('scenario', ''),
218
- 'first_mes': card_data.get('first_mes', ''),
219
- 'mes_example': card_data.get('mes_example', ''),
220
- 'creator_notes': card_data.get('creator_notes', ''),
221
- 'system_prompt': card_data.get('system_prompt', ''),
222
- 'post_history_instructions': card_data.get('post_history_instructions', ''),
223
- 'alternate_greetings': json.dumps(card_data.get('alternate_greetings', [])),
224
- 'tags': json.dumps(card_data.get('tags', [])),
225
- 'creator': card_data.get('creator', ''),
226
- 'character_version': card_data.get('character_version', ''),
227
- 'extensions': json.dumps(card_data.get('extensions', {}))
228
- }
229
-
230
- # Handle 'image' separately as it might be binary data
231
- if 'image' in card_data:
232
- v2_data['image'] = card_data['image']
233
-
234
- return v2_data
235
-
236
-
237
- def add_character_card(card_data: Dict[str, Any]) -> Optional[int]:
238
- """Add or update a character card in the database."""
239
- conn = sqlite3.connect(chat_DB_PATH)
240
- cursor = conn.cursor()
241
- try:
242
- parsed_card = parse_character_card(card_data)
243
-
244
- # Check if character already exists
245
- cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (parsed_card['name'],))
246
- row = cursor.fetchone()
247
-
248
- if row:
249
- # Update existing character
250
- character_id = row[0]
251
- update_query = """
252
- UPDATE CharacterCards
253
- SET description = ?, personality = ?, scenario = ?, image = ?,
254
- post_history_instructions = ?, first_mes = ?, mes_example = ?,
255
- creator_notes = ?, system_prompt = ?, alternate_greetings = ?,
256
- tags = ?, creator = ?, character_version = ?, extensions = ?
257
- WHERE id = ?
258
- """
259
- cursor.execute(update_query, (
260
- parsed_card['description'], parsed_card['personality'], parsed_card['scenario'],
261
- parsed_card['image'], parsed_card['post_history_instructions'], parsed_card['first_mes'],
262
- parsed_card['mes_example'], parsed_card['creator_notes'], parsed_card['system_prompt'],
263
- parsed_card['alternate_greetings'], parsed_card['tags'], parsed_card['creator'],
264
- parsed_card['character_version'], parsed_card['extensions'], character_id
265
- ))
266
- else:
267
- # Insert new character
268
- insert_query = """
269
- INSERT INTO CharacterCards (name, description, personality, scenario, image,
270
- post_history_instructions, first_mes, mes_example, creator_notes, system_prompt,
271
- alternate_greetings, tags, creator, character_version, extensions)
272
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
273
- """
274
- cursor.execute(insert_query, (
275
- parsed_card['name'], parsed_card['description'], parsed_card['personality'],
276
- parsed_card['scenario'], parsed_card['image'], parsed_card['post_history_instructions'],
277
- parsed_card['first_mes'], parsed_card['mes_example'], parsed_card['creator_notes'],
278
- parsed_card['system_prompt'], parsed_card['alternate_greetings'], parsed_card['tags'],
279
- parsed_card['creator'], parsed_card['character_version'], parsed_card['extensions']
280
- ))
281
- character_id = cursor.lastrowid
282
-
283
- conn.commit()
284
- return character_id
285
- except sqlite3.IntegrityError as e:
286
- logging.error(f"Error adding character card: {e}")
287
- return None
288
- except Exception as e:
289
- logging.error(f"Unexpected error adding character card: {e}")
290
- return None
291
- finally:
292
- conn.close()
293
-
294
- # def add_character_card(card_data: Dict) -> Optional[int]:
295
- # """Add or update a character card in the database.
296
- #
297
- # Returns the ID of the inserted character or None if failed.
298
- # """
299
- # conn = sqlite3.connect(chat_DB_PATH)
300
- # cursor = conn.cursor()
301
- # try:
302
- # # Ensure all required fields are present
303
- # required_fields = ['name', 'description', 'personality', 'scenario', 'image', 'post_history_instructions', 'first_message']
304
- # for field in required_fields:
305
- # if field not in card_data:
306
- # card_data[field] = '' # Assign empty string if field is missing
307
- #
308
- # # Check if character already exists
309
- # cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (card_data['name'],))
310
- # row = cursor.fetchone()
311
- #
312
- # if row:
313
- # # Update existing character
314
- # character_id = row[0]
315
- # cursor.execute("""
316
- # UPDATE CharacterCards
317
- # SET description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
318
- # WHERE id = ?
319
- # """, (
320
- # card_data['description'],
321
- # card_data['personality'],
322
- # card_data['scenario'],
323
- # card_data['image'],
324
- # card_data['post_history_instructions'],
325
- # card_data['first_message'],
326
- # character_id
327
- # ))
328
- # else:
329
- # # Insert new character
330
- # cursor.execute("""
331
- # INSERT INTO CharacterCards (name, description, personality, scenario, image, post_history_instructions, first_message)
332
- # VALUES (?, ?, ?, ?, ?, ?, ?)
333
- # """, (
334
- # card_data['name'],
335
- # card_data['description'],
336
- # card_data['personality'],
337
- # card_data['scenario'],
338
- # card_data['image'],
339
- # card_data['post_history_instructions'],
340
- # card_data['first_message']
341
- # ))
342
- # character_id = cursor.lastrowid
343
- #
344
- # conn.commit()
345
- # return cursor.lastrowid
346
- # except sqlite3.IntegrityError as e:
347
- # logging.error(f"Error adding character card: {e}")
348
- # return None
349
- # except Exception as e:
350
- # logging.error(f"Unexpected error adding character card: {e}")
351
- # return None
352
- # finally:
353
- # conn.close()
354
-
355
-
356
- def get_character_cards() -> List[Dict]:
357
- """Retrieve all character cards from the database."""
358
- logging.debug(f"Fetching characters from DB: {chat_DB_PATH}")
359
- conn = sqlite3.connect(chat_DB_PATH)
360
- cursor = conn.cursor()
361
- cursor.execute("SELECT * FROM CharacterCards")
362
- rows = cursor.fetchall()
363
- columns = [description[0] for description in cursor.description]
364
- conn.close()
365
- characters = [dict(zip(columns, row)) for row in rows]
366
- #logging.debug(f"Characters fetched from DB: {characters}")
367
- return characters
368
-
369
-
370
- def get_character_card_by_id(character_id: Union[int, Dict[str, Any]]) -> Optional[Dict[str, Any]]:
371
- """
372
- Retrieve a single character card by its ID.
373
-
374
- Args:
375
- character_id: Can be either an integer ID or a dictionary containing character data.
376
-
377
- Returns:
378
- A dictionary containing the character card data, or None if not found.
379
- """
380
- conn = sqlite3.connect(chat_DB_PATH)
381
- cursor = conn.cursor()
382
- try:
383
- if isinstance(character_id, dict):
384
- # If a dictionary is passed, assume it's already a character card
385
- return character_id
386
- elif isinstance(character_id, int):
387
- # If an integer is passed, fetch the character from the database
388
- cursor.execute("SELECT * FROM CharacterCards WHERE id = ?", (character_id,))
389
- row = cursor.fetchone()
390
- if row:
391
- columns = [description[0] for description in cursor.description]
392
- return dict(zip(columns, row))
393
- else:
394
- logging.warning(f"Invalid type for character_id: {type(character_id)}")
395
- return None
396
- except Exception as e:
397
- logging.error(f"Error in get_character_card_by_id: {e}")
398
- return None
399
- finally:
400
- conn.close()
401
-
402
-
403
- def update_character_card(character_id: int, card_data: Dict) -> bool:
404
- """Update an existing character card."""
405
- conn = sqlite3.connect(chat_DB_PATH)
406
- cursor = conn.cursor()
407
- try:
408
- cursor.execute("""
409
- UPDATE CharacterCards
410
- SET name = ?, description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
411
- WHERE id = ?
412
- """, (
413
- card_data.get('name'),
414
- card_data.get('description'),
415
- card_data.get('personality'),
416
- card_data.get('scenario'),
417
- card_data.get('image'),
418
- card_data.get('post_history_instructions', ''),
419
- card_data.get('first_message', "Hello! I'm ready to chat."),
420
- character_id
421
- ))
422
- conn.commit()
423
- return cursor.rowcount > 0
424
- except sqlite3.IntegrityError as e:
425
- logging.error(f"Error updating character card: {e}")
426
- return False
427
- finally:
428
- conn.close()
429
-
430
-
431
- def delete_character_card(character_id: int) -> bool:
432
- """Delete a character card and its associated chats."""
433
- conn = sqlite3.connect(chat_DB_PATH)
434
- cursor = conn.cursor()
435
- try:
436
- # Delete associated chats first due to foreign key constraint
437
- cursor.execute("DELETE FROM CharacterChats WHERE character_id = ?", (character_id,))
438
- cursor.execute("DELETE FROM CharacterCards WHERE id = ?", (character_id,))
439
- conn.commit()
440
- return cursor.rowcount > 0
441
- except sqlite3.Error as e:
442
- logging.error(f"Error deleting character card: {e}")
443
- return False
444
- finally:
445
- conn.close()
446
-
447
-
448
- def add_character_chat(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]], keywords: Optional[List[str]] = None, is_snapshot: bool = False) -> Optional[int]:
449
- """
450
- Add a new chat history for a character, optionally associating keywords.
451
-
452
- Args:
453
- character_id (int): The ID of the character.
454
- conversation_name (str): Name of the conversation.
455
- chat_history (List[Tuple[str, str]]): List of (user, bot) message tuples.
456
- keywords (Optional[List[str]]): List of keywords to associate with this chat.
457
- is_snapshot (bool, optional): Whether this chat is a snapshot.
458
-
459
- Returns:
460
- Optional[int]: The ID of the inserted chat or None if failed.
461
- """
462
- conn = sqlite3.connect(chat_DB_PATH)
463
- cursor = conn.cursor()
464
- try:
465
- chat_history_json = json.dumps(chat_history)
466
- cursor.execute("""
467
- INSERT INTO CharacterChats (character_id, conversation_name, chat_history, is_snapshot)
468
- VALUES (?, ?, ?, ?)
469
- """, (
470
- character_id,
471
- conversation_name,
472
- chat_history_json,
473
- is_snapshot
474
- ))
475
- chat_id = cursor.lastrowid
476
-
477
- if keywords:
478
- # Insert keywords into ChatKeywords table
479
- keyword_records = [(chat_id, keyword.strip().lower()) for keyword in keywords]
480
- cursor.executemany("""
481
- INSERT INTO ChatKeywords (chat_id, keyword)
482
- VALUES (?, ?)
483
- """, keyword_records)
484
-
485
- conn.commit()
486
- return chat_id
487
- except sqlite3.Error as e:
488
- logging.error(f"Error adding character chat: {e}")
489
- return None
490
- finally:
491
- conn.close()
492
-
493
-
494
- def get_character_chats(character_id: Optional[int] = None) -> List[Dict]:
495
- """Retrieve all chats, or chats for a specific character if character_id is provided."""
496
- conn = sqlite3.connect(chat_DB_PATH)
497
- cursor = conn.cursor()
498
- if character_id is not None:
499
- cursor.execute("SELECT * FROM CharacterChats WHERE character_id = ?", (character_id,))
500
- else:
501
- cursor.execute("SELECT * FROM CharacterChats")
502
- rows = cursor.fetchall()
503
- columns = [description[0] for description in cursor.description]
504
- conn.close()
505
- return [dict(zip(columns, row)) for row in rows]
506
-
507
-
508
- def get_character_chat_by_id(chat_id: int) -> Optional[Dict]:
509
- """Retrieve a single chat by its ID."""
510
- conn = sqlite3.connect(chat_DB_PATH)
511
- cursor = conn.cursor()
512
- cursor.execute("SELECT * FROM CharacterChats WHERE id = ?", (chat_id,))
513
- row = cursor.fetchone()
514
- conn.close()
515
- if row:
516
- columns = [description[0] for description in cursor.description]
517
- chat = dict(zip(columns, row))
518
- chat['chat_history'] = json.loads(chat['chat_history'])
519
- return chat
520
- return None
521
-
522
-
523
- def search_character_chats(query: str, character_id: Optional[int] = None) -> Tuple[List[Dict], str]:
524
- """
525
- Search for character chats using FTS5, optionally filtered by character_id.
526
-
527
- Args:
528
- query (str): The search query.
529
- character_id (Optional[int]): The ID of the character to filter chats by.
530
-
531
- Returns:
532
- Tuple[List[Dict], str]: A list of matching chats and a status message.
533
- """
534
- if not query.strip():
535
- return [], "Please enter a search query."
536
-
537
- conn = sqlite3.connect(chat_DB_PATH)
538
- cursor = conn.cursor()
539
- try:
540
- if character_id is not None:
541
- # Search with character_id filter
542
- cursor.execute("""
543
- SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
544
- FROM CharacterChats_fts
545
- JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
546
- WHERE CharacterChats_fts MATCH ? AND CharacterChats.character_id = ?
547
- ORDER BY rank
548
- """, (query, character_id))
549
- else:
550
- # Search without character_id filter
551
- cursor.execute("""
552
- SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
553
- FROM CharacterChats_fts
554
- JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
555
- WHERE CharacterChats_fts MATCH ?
556
- ORDER BY rank
557
- """, (query,))
558
-
559
- rows = cursor.fetchall()
560
- columns = [description[0] for description in cursor.description]
561
- results = [dict(zip(columns, row)) for row in rows]
562
-
563
- if character_id is not None:
564
- status_message = f"Found {len(results)} chat(s) matching '{query}' for the selected character."
565
- else:
566
- status_message = f"Found {len(results)} chat(s) matching '{query}' across all characters."
567
-
568
- return results, status_message
569
- except Exception as e:
570
- logging.error(f"Error searching chats with FTS5: {e}")
571
- return [], f"Error occurred during search: {e}"
572
- finally:
573
- conn.close()
574
-
575
- def update_character_chat(chat_id: int, chat_history: List[Tuple[str, str]]) -> bool:
576
- """Update an existing chat history."""
577
- conn = sqlite3.connect(chat_DB_PATH)
578
- cursor = conn.cursor()
579
- try:
580
- chat_history_json = json.dumps(chat_history)
581
- cursor.execute("""
582
- UPDATE CharacterChats
583
- SET chat_history = ?
584
- WHERE id = ?
585
- """, (
586
- chat_history_json,
587
- chat_id
588
- ))
589
- conn.commit()
590
- return cursor.rowcount > 0
591
- except sqlite3.Error as e:
592
- logging.error(f"Error updating character chat: {e}")
593
- return False
594
- finally:
595
- conn.close()
596
-
597
-
598
- def delete_character_chat(chat_id: int) -> bool:
599
- """Delete a specific chat."""
600
- conn = sqlite3.connect(chat_DB_PATH)
601
- cursor = conn.cursor()
602
- try:
603
- cursor.execute("DELETE FROM CharacterChats WHERE id = ?", (chat_id,))
604
- conn.commit()
605
- return cursor.rowcount > 0
606
- except sqlite3.Error as e:
607
- logging.error(f"Error deleting character chat: {e}")
608
- return False
609
- finally:
610
- conn.close()
611
-
612
-
613
- def fetch_keywords_for_chats(keywords: List[str]) -> List[int]:
614
- """
615
- Fetch chat IDs associated with any of the specified keywords.
616
-
617
- Args:
618
- keywords (List[str]): List of keywords to search for.
619
-
620
- Returns:
621
- List[int]: List of chat IDs associated with the keywords.
622
- """
623
- if not keywords:
624
- return []
625
-
626
- conn = sqlite3.connect(chat_DB_PATH)
627
- cursor = conn.cursor()
628
- try:
629
- # Construct the WHERE clause to search for each keyword
630
- keyword_clauses = " OR ".join(["keyword = ?"] * len(keywords))
631
- sql_query = f"SELECT DISTINCT chat_id FROM ChatKeywords WHERE {keyword_clauses}"
632
- cursor.execute(sql_query, keywords)
633
- rows = cursor.fetchall()
634
- chat_ids = [row[0] for row in rows]
635
- return chat_ids
636
- except Exception as e:
637
- logging.error(f"Error in fetch_keywords_for_chats: {e}")
638
- return []
639
- finally:
640
- conn.close()
641
-
642
-
643
- def save_chat_history_to_character_db(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]]) -> Optional[int]:
644
- """Save chat history to the CharacterChats table.
645
-
646
- Returns the ID of the inserted chat or None if failed.
647
- """
648
- return add_character_chat(character_id, conversation_name, chat_history)
649
-
650
-
651
- def search_db(query: str, fields: List[str], where_clause: str = "", page: int = 1, results_per_page: int = 5) -> List[Dict[str, Any]]:
652
- """
653
- Perform a full-text search on specified fields with optional filtering and pagination.
654
-
655
- Args:
656
- query (str): The search query.
657
- fields (List[str]): List of fields to search in.
658
- where_clause (str, optional): Additional SQL WHERE clause to filter results.
659
- page (int, optional): Page number for pagination.
660
- results_per_page (int, optional): Number of results per page.
661
-
662
- Returns:
663
- List[Dict[str, Any]]: List of matching chat records with content and metadata.
664
- """
665
- if not query.strip():
666
- return []
667
-
668
- conn = sqlite3.connect(chat_DB_PATH)
669
- cursor = conn.cursor()
670
- try:
671
- # Construct the MATCH query for FTS5
672
- match_query = " AND ".join(fields) + f" MATCH ?"
673
- # Adjust the query with the fields
674
- fts_query = f"""
675
- SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
676
- FROM CharacterChats_fts
677
- JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
678
- WHERE {match_query}
679
- """
680
- if where_clause:
681
- fts_query += f" AND ({where_clause})"
682
- fts_query += " ORDER BY rank LIMIT ? OFFSET ?"
683
- offset = (page - 1) * results_per_page
684
- cursor.execute(fts_query, (query, results_per_page, offset))
685
- rows = cursor.fetchall()
686
- columns = [description[0] for description in cursor.description]
687
- results = [dict(zip(columns, row)) for row in rows]
688
- return results
689
- except Exception as e:
690
- logging.error(f"Error in search_db: {e}")
691
- return []
692
- finally:
693
- conn.close()
694
-
695
-
696
- def perform_full_text_search_chat(query: str, relevant_chat_ids: List[int], page: int = 1, results_per_page: int = 5) -> \
697
- List[Dict[str, Any]]:
698
- """
699
- Perform a full-text search within the specified chat IDs using FTS5.
700
-
701
- Args:
702
- query (str): The user's query.
703
- relevant_chat_ids (List[int]): List of chat IDs to search within.
704
- page (int): Pagination page number.
705
- results_per_page (int): Number of results per page.
706
-
707
- Returns:
708
- List[Dict[str, Any]]: List of search results with content and metadata.
709
- """
710
- try:
711
- # Construct a WHERE clause to limit the search to relevant chat IDs
712
- where_clause = " OR ".join([f"media_id = {chat_id}" for chat_id in relevant_chat_ids])
713
- if not where_clause:
714
- where_clause = "1" # No restriction if no chat IDs
715
-
716
- # Perform full-text search using FTS5
717
- fts_results = search_db(query, ["content"], where_clause, page=page, results_per_page=results_per_page)
718
-
719
- filtered_fts_results = [
720
- {
721
- "content": result['content'],
722
- "metadata": {"media_id": result['id']}
723
- }
724
- for result in fts_results
725
- if result['id'] in relevant_chat_ids
726
- ]
727
- return filtered_fts_results
728
- except Exception as e:
729
- logging.error(f"Error in perform_full_text_search_chat: {str(e)}")
730
- return []
731
-
732
-
733
- def fetch_all_chats() -> List[Dict[str, Any]]:
734
- """
735
- Fetch all chat messages from the database.
736
-
737
- Returns:
738
- List[Dict[str, Any]]: List of chat messages with relevant metadata.
739
- """
740
- try:
741
- chats = get_character_chats() # Modify this function to retrieve all chats
742
- return chats
743
- except Exception as e:
744
- logging.error(f"Error fetching all chats: {str(e)}")
745
- return []
746
-
747
-
748
- def search_character_chat(query: str, fts_top_k: int = 10, relevant_media_ids: List[str] = None) -> List[Dict[str, Any]]:
749
- """
750
- Perform a full-text search on the Character Chat database.
751
-
752
- Args:
753
- query: Search query string.
754
- fts_top_k: Maximum number of results to return.
755
- relevant_media_ids: Optional list of character IDs to filter results.
756
-
757
- Returns:
758
- List of search results with content and metadata.
759
- """
760
- if not query.strip():
761
- return []
762
-
763
- try:
764
- # Construct a WHERE clause to limit the search to relevant character IDs
765
- where_clause = ""
766
- if relevant_media_ids:
767
- placeholders = ','.join(['?'] * len(relevant_media_ids))
768
- where_clause = f"CharacterChats.character_id IN ({placeholders})"
769
-
770
- # Perform full-text search using existing search_db function
771
- results = search_db(query, ["conversation_name", "chat_history"], where_clause, results_per_page=fts_top_k)
772
-
773
- # Format results
774
- formatted_results = []
775
- for r in results:
776
- formatted_results.append({
777
- "content": r['chat_history'],
778
- "metadata": {
779
- "chat_id": r['id'],
780
- "conversation_name": r['conversation_name'],
781
- "character_id": r['character_id']
782
- }
783
- })
784
-
785
- return formatted_results
786
-
787
- except Exception as e:
788
- logging.error(f"Error in search_character_chat: {e}")
789
- return []
790
-
791
-
792
- def search_character_cards(query: str, fts_top_k: int = 10, relevant_media_ids: List[str] = None) -> List[Dict[str, Any]]:
793
- """
794
- Perform a full-text search on the Character Cards database.
795
-
796
- Args:
797
- query: Search query string.
798
- fts_top_k: Maximum number of results to return.
799
- relevant_media_ids: Optional list of character IDs to filter results.
800
-
801
- Returns:
802
- List of search results with content and metadata.
803
- """
804
- if not query.strip():
805
- return []
806
-
807
- try:
808
- conn = sqlite3.connect(chat_DB_PATH)
809
- cursor = conn.cursor()
810
-
811
- # Construct the query
812
- sql_query = """
813
- SELECT CharacterCards.id, CharacterCards.name, CharacterCards.description, CharacterCards.personality, CharacterCards.scenario
814
- FROM CharacterCards_fts
815
- JOIN CharacterCards ON CharacterCards_fts.rowid = CharacterCards.id
816
- WHERE CharacterCards_fts MATCH ?
817
- """
818
-
819
- params = [query]
820
-
821
- # Add filtering by character IDs if provided
822
- if relevant_media_ids:
823
- placeholders = ','.join(['?'] * len(relevant_media_ids))
824
- sql_query += f" AND CharacterCards.id IN ({placeholders})"
825
- params.extend(relevant_media_ids)
826
-
827
- sql_query += " LIMIT ?"
828
- params.append(fts_top_k)
829
-
830
- cursor.execute(sql_query, params)
831
- rows = cursor.fetchall()
832
- columns = [description[0] for description in cursor.description]
833
-
834
- results = [dict(zip(columns, row)) for row in rows]
835
-
836
- # Format results
837
- formatted_results = []
838
- for r in results:
839
- content = f"Name: {r['name']}\nDescription: {r['description']}\nPersonality: {r['personality']}\nScenario: {r['scenario']}"
840
- formatted_results.append({
841
- "content": content,
842
- "metadata": {
843
- "character_id": r['id'],
844
- "name": r['name']
845
- }
846
- })
847
-
848
- return formatted_results
849
-
850
- except Exception as e:
851
- logging.error(f"Error in search_character_cards: {e}")
852
- return []
853
- finally:
854
- conn.close()
855
-
856
-
857
- def fetch_character_ids_by_keywords(keywords: List[str]) -> List[int]:
858
- """
859
- Fetch character IDs associated with any of the specified keywords.
860
-
861
- Args:
862
- keywords (List[str]): List of keywords to search for.
863
-
864
- Returns:
865
- List[int]: List of character IDs associated with the keywords.
866
- """
867
- if not keywords:
868
- return []
869
-
870
- conn = sqlite3.connect(chat_DB_PATH)
871
- cursor = conn.cursor()
872
- try:
873
- # Assuming 'tags' column in CharacterCards table stores tags as JSON array
874
- placeholders = ','.join(['?'] * len(keywords))
875
- sql_query = f"""
876
- SELECT DISTINCT id FROM CharacterCards
877
- WHERE EXISTS (
878
- SELECT 1 FROM json_each(tags)
879
- WHERE json_each.value IN ({placeholders})
880
- )
881
- """
882
- cursor.execute(sql_query, keywords)
883
- rows = cursor.fetchall()
884
- character_ids = [row[0] for row in rows]
885
- return character_ids
886
- except Exception as e:
887
- logging.error(f"Error in fetch_character_ids_by_keywords: {e}")
888
- return []
889
- finally:
890
- conn.close()
891
-
892
-
893
- ###################################################################
894
- #
895
- # Character Keywords
896
-
897
- def view_char_keywords():
898
- try:
899
- with sqlite3.connect(chat_DB_PATH) as conn:
900
- cursor = conn.cursor()
901
- cursor.execute("""
902
- SELECT DISTINCT keyword
903
- FROM CharacterCards
904
- CROSS JOIN json_each(tags)
905
- WHERE json_valid(tags)
906
- ORDER BY keyword
907
- """)
908
- keywords = cursor.fetchall()
909
- if keywords:
910
- keyword_list = [k[0] for k in keywords]
911
- return "### Current Character Keywords:\n" + "\n".join(
912
- [f"- {k}" for k in keyword_list])
913
- return "No keywords found."
914
- except Exception as e:
915
- return f"Error retrieving keywords: {str(e)}"
916
-
917
-
918
- def add_char_keywords(name: str, keywords: str):
919
- try:
920
- keywords_list = [k.strip() for k in keywords.split(",") if k.strip()]
921
- with sqlite3.connect('character_chat.db') as conn:
922
- cursor = conn.cursor()
923
- cursor.execute(
924
- "SELECT tags FROM CharacterCards WHERE name = ?",
925
- (name,)
926
- )
927
- result = cursor.fetchone()
928
- if not result:
929
- return "Character not found."
930
-
931
- current_tags = result[0] if result[0] else "[]"
932
- current_keywords = set(current_tags[1:-1].split(',')) if current_tags != "[]" else set()
933
- updated_keywords = current_keywords.union(set(keywords_list))
934
-
935
- cursor.execute(
936
- "UPDATE CharacterCards SET tags = ? WHERE name = ?",
937
- (str(list(updated_keywords)), name)
938
- )
939
- conn.commit()
940
- return f"Successfully added keywords to character {name}"
941
- except Exception as e:
942
- return f"Error adding keywords: {str(e)}"
943
-
944
-
945
- def delete_char_keyword(char_name: str, keyword: str) -> str:
946
- """
947
- Delete a keyword from a character's tags.
948
-
949
- Args:
950
- char_name (str): The name of the character
951
- keyword (str): The keyword to delete
952
-
953
- Returns:
954
- str: Success/failure message
955
- """
956
- try:
957
- with sqlite3.connect(chat_DB_PATH) as conn:
958
- cursor = conn.cursor()
959
-
960
- # First, check if the character exists
961
- cursor.execute("SELECT tags FROM CharacterCards WHERE name = ?", (char_name,))
962
- result = cursor.fetchone()
963
-
964
- if not result:
965
- return f"Character '{char_name}' not found."
966
-
967
- # Parse existing tags
968
- current_tags = json.loads(result[0]) if result[0] else []
969
-
970
- if keyword not in current_tags:
971
- return f"Keyword '{keyword}' not found in character '{char_name}' tags."
972
-
973
- # Remove the keyword
974
- updated_tags = [tag for tag in current_tags if tag != keyword]
975
-
976
- # Update the character's tags
977
- cursor.execute(
978
- "UPDATE CharacterCards SET tags = ? WHERE name = ?",
979
- (json.dumps(updated_tags), char_name)
980
- )
981
- conn.commit()
982
-
983
- logging.info(f"Keyword '{keyword}' deleted from character '{char_name}'")
984
- return f"Successfully deleted keyword '{keyword}' from character '{char_name}'."
985
-
986
- except Exception as e:
987
- error_msg = f"Error deleting keyword: {str(e)}"
988
- logging.error(error_msg)
989
- return error_msg
990
-
991
-
992
- def export_char_keywords_to_csv() -> Tuple[str, str]:
993
- """
994
- Export all character keywords to a CSV file with associated metadata.
995
-
996
- Returns:
997
- Tuple[str, str]: (status_message, file_path)
998
- """
999
- import csv
1000
- from tempfile import NamedTemporaryFile
1001
- from datetime import datetime
1002
-
1003
- try:
1004
- # Create a temporary CSV file
1005
- temp_file = NamedTemporaryFile(mode='w+', delete=False, suffix='.csv', newline='')
1006
-
1007
- with sqlite3.connect(chat_DB_PATH) as conn:
1008
- cursor = conn.cursor()
1009
-
1010
- # Get all characters and their tags
1011
- cursor.execute("""
1012
- SELECT
1013
- name,
1014
- tags,
1015
- (SELECT COUNT(*) FROM CharacterChats WHERE CharacterChats.character_id = CharacterCards.id) as chat_count
1016
- FROM CharacterCards
1017
- WHERE json_valid(tags)
1018
- ORDER BY name
1019
- """)
1020
-
1021
- results = cursor.fetchall()
1022
-
1023
- # Process the results to create rows for the CSV
1024
- csv_rows = []
1025
- for name, tags_json, chat_count in results:
1026
- tags = json.loads(tags_json) if tags_json else []
1027
- for tag in tags:
1028
- csv_rows.append([
1029
- tag, # keyword
1030
- name, # character name
1031
- chat_count # number of chats
1032
- ])
1033
-
1034
- # Write to CSV
1035
- writer = csv.writer(temp_file)
1036
- writer.writerow(['Keyword', 'Character Name', 'Number of Chats'])
1037
- writer.writerows(csv_rows)
1038
-
1039
- temp_file.close()
1040
-
1041
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
1042
- status_msg = f"Successfully exported {len(csv_rows)} character keyword entries to CSV."
1043
- logging.info(status_msg)
1044
-
1045
- return status_msg, temp_file.name
1046
-
1047
- except Exception as e:
1048
- error_msg = f"Error exporting keywords: {str(e)}"
1049
- logging.error(error_msg)
1050
- return error_msg, ""
1051
-
1052
- #
1053
- # End of Character chat keyword functions
1054
- ######################################################
1055
-
1056
-
1057
- #
1058
- # End of Character_Chat_DB.py
1059
- #######################################################################################################################
 
1
+ # character_chat_db.py
2
+ # Database functions for managing character cards and chat histories.
3
+ # #
4
+ # Imports
5
+ import configparser
6
+ import logging
7
+ import sqlite3
8
+ import json
9
+ import os
10
+ import sys
11
+ from typing import List, Dict, Optional, Tuple, Any, Union
12
+ #
13
+ from App_Function_Libraries.Utils.Utils import get_database_dir, get_project_relative_path, get_database_path
14
+ #
15
+ #######################################################################################################################
16
+ #
17
+ #
18
+
19
+ def ensure_database_directory():
20
+ os.makedirs(get_database_dir(), exist_ok=True)
21
+
22
+ ensure_database_directory()
23
+
24
+
25
+ # Construct the path to the config file
26
+ config_path = get_project_relative_path('Config_Files/config.txt')
27
+
28
+ # Read the config file
29
+ config = configparser.ConfigParser()
30
+ config.read(config_path)
31
+
32
+ # Get the chat db path from the config, or use the default if not specified
33
+ chat_DB_PATH = config.get('Database', 'chatDB_path', fallback=get_database_path('chatDB.db'))
34
+ print(f"Chat Database path: {chat_DB_PATH}")
35
+
36
+ ########################################################################################################
37
+ #
38
+ # Functions
39
+
40
+ # FIXME - Setup properly and test/add documentation for its existence...
41
+ def initialize_database():
42
+ """Initialize the SQLite database with required tables and FTS5 virtual tables."""
43
+ conn = None
44
+ try:
45
+ conn = sqlite3.connect(chat_DB_PATH)
46
+ cursor = conn.cursor()
47
+
48
+ # Enable foreign key constraints
49
+ cursor.execute("PRAGMA foreign_keys = ON;")
50
+
51
+ # Create CharacterCards table with V2 fields
52
+ cursor.execute("""
53
+ CREATE TABLE IF NOT EXISTS CharacterCards (
54
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
55
+ name TEXT UNIQUE NOT NULL,
56
+ description TEXT,
57
+ personality TEXT,
58
+ scenario TEXT,
59
+ image BLOB,
60
+ post_history_instructions TEXT,
61
+ first_mes TEXT,
62
+ mes_example TEXT,
63
+ creator_notes TEXT,
64
+ system_prompt TEXT,
65
+ alternate_greetings TEXT,
66
+ tags TEXT,
67
+ creator TEXT,
68
+ character_version TEXT,
69
+ extensions TEXT,
70
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP
71
+ );
72
+ """)
73
+
74
+ # Create FTS5 virtual table for CharacterCards
75
+ cursor.execute("""
76
+ CREATE VIRTUAL TABLE IF NOT EXISTS CharacterCards_fts USING fts5(
77
+ name,
78
+ description,
79
+ personality,
80
+ scenario,
81
+ system_prompt,
82
+ content='CharacterCards',
83
+ content_rowid='id'
84
+ );
85
+ """)
86
+
87
+ # Create triggers to keep FTS5 table in sync with CharacterCards
88
+ cursor.executescript("""
89
+ CREATE TRIGGER IF NOT EXISTS CharacterCards_ai AFTER INSERT ON CharacterCards BEGIN
90
+ INSERT INTO CharacterCards_fts(
91
+ rowid,
92
+ name,
93
+ description,
94
+ personality,
95
+ scenario,
96
+ system_prompt
97
+ ) VALUES (
98
+ new.id,
99
+ new.name,
100
+ new.description,
101
+ new.personality,
102
+ new.scenario,
103
+ new.system_prompt
104
+ );
105
+ END;
106
+
107
+ CREATE TRIGGER IF NOT EXISTS CharacterCards_ad AFTER DELETE ON CharacterCards BEGIN
108
+ DELETE FROM CharacterCards_fts WHERE rowid = old.id;
109
+ END;
110
+
111
+ CREATE TRIGGER IF NOT EXISTS CharacterCards_au AFTER UPDATE ON CharacterCards BEGIN
112
+ UPDATE CharacterCards_fts SET
113
+ name = new.name,
114
+ description = new.description,
115
+ personality = new.personality,
116
+ scenario = new.scenario,
117
+ system_prompt = new.system_prompt
118
+ WHERE rowid = new.id;
119
+ END;
120
+ """)
121
+
122
+ # Create CharacterChats table
123
+ cursor.execute("""
124
+ CREATE TABLE IF NOT EXISTS CharacterChats (
125
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
126
+ character_id INTEGER NOT NULL,
127
+ conversation_name TEXT,
128
+ chat_history TEXT,
129
+ is_snapshot BOOLEAN DEFAULT FALSE,
130
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
131
+ FOREIGN KEY (character_id) REFERENCES CharacterCards(id) ON DELETE CASCADE
132
+ );
133
+ """)
134
+
135
+ # Create FTS5 virtual table for CharacterChats
136
+ cursor.execute("""
137
+ CREATE VIRTUAL TABLE IF NOT EXISTS CharacterChats_fts USING fts5(
138
+ conversation_name,
139
+ chat_history,
140
+ content='CharacterChats',
141
+ content_rowid='id'
142
+ );
143
+ """)
144
+
145
+ # Create triggers to keep FTS5 table in sync with CharacterChats
146
+ cursor.executescript("""
147
+ CREATE TRIGGER IF NOT EXISTS CharacterChats_ai AFTER INSERT ON CharacterChats BEGIN
148
+ INSERT INTO CharacterChats_fts(rowid, conversation_name, chat_history)
149
+ VALUES (new.id, new.conversation_name, new.chat_history);
150
+ END;
151
+
152
+ CREATE TRIGGER IF NOT EXISTS CharacterChats_ad AFTER DELETE ON CharacterChats BEGIN
153
+ DELETE FROM CharacterChats_fts WHERE rowid = old.id;
154
+ END;
155
+
156
+ CREATE TRIGGER IF NOT EXISTS CharacterChats_au AFTER UPDATE ON CharacterChats BEGIN
157
+ UPDATE CharacterChats_fts SET conversation_name = new.conversation_name, chat_history = new.chat_history
158
+ WHERE rowid = new.id;
159
+ END;
160
+ """)
161
+
162
+ # Create ChatKeywords table
163
+ cursor.execute("""
164
+ CREATE TABLE IF NOT EXISTS ChatKeywords (
165
+ chat_id INTEGER NOT NULL,
166
+ keyword TEXT NOT NULL,
167
+ FOREIGN KEY (chat_id) REFERENCES CharacterChats(id) ON DELETE CASCADE
168
+ );
169
+ """)
170
+
171
+ # Create indexes for faster searches
172
+ cursor.execute("""
173
+ CREATE INDEX IF NOT EXISTS idx_chatkeywords_keyword ON ChatKeywords(keyword);
174
+ """)
175
+ cursor.execute("""
176
+ CREATE INDEX IF NOT EXISTS idx_chatkeywords_chat_id ON ChatKeywords(chat_id);
177
+ """)
178
+
179
+ conn.commit()
180
+ logging.info("Database initialized successfully.")
181
+ except sqlite3.Error as e:
182
+ logging.error(f"SQLite error occurred during database initialization: {e}")
183
+ if conn:
184
+ conn.rollback()
185
+ raise
186
+ except Exception as e:
187
+ logging.error(f"Unexpected error occurred during database initialization: {e}")
188
+ if conn:
189
+ conn.rollback()
190
+ raise
191
+ finally:
192
+ if conn:
193
+ conn.close()
194
+
195
+ # Call initialize_database() at the start of your application
196
+ def setup_chat_database():
197
+ try:
198
+ initialize_database()
199
+ except Exception as e:
200
+ logging.critical(f"Failed to initialize database: {e}")
201
+ sys.exit(1)
202
+
203
+ setup_chat_database()
204
+
205
+
206
+ ########################################################################################################
207
+ #
208
+ # Character Card handling
209
+
210
+ def parse_character_card(card_data: Dict[str, Any]) -> Dict[str, Any]:
211
+ """Parse and validate a character card according to V2 specification."""
212
+ v2_data = {
213
+ 'name': card_data.get('name', ''),
214
+ 'description': card_data.get('description', ''),
215
+ 'personality': card_data.get('personality', ''),
216
+ 'scenario': card_data.get('scenario', ''),
217
+ 'first_mes': card_data.get('first_mes', ''),
218
+ 'mes_example': card_data.get('mes_example', ''),
219
+ 'creator_notes': card_data.get('creator_notes', ''),
220
+ 'system_prompt': card_data.get('system_prompt', ''),
221
+ 'post_history_instructions': card_data.get('post_history_instructions', ''),
222
+ 'alternate_greetings': json.dumps(card_data.get('alternate_greetings', [])),
223
+ 'tags': json.dumps(card_data.get('tags', [])),
224
+ 'creator': card_data.get('creator', ''),
225
+ 'character_version': card_data.get('character_version', ''),
226
+ 'extensions': json.dumps(card_data.get('extensions', {}))
227
+ }
228
+
229
+ # Handle 'image' separately as it might be binary data
230
+ if 'image' in card_data:
231
+ v2_data['image'] = card_data['image']
232
+
233
+ return v2_data
234
+
235
+
236
+ def add_character_card(card_data: Dict[str, Any]) -> Optional[int]:
237
+ """Add or update a character card in the database."""
238
+ conn = sqlite3.connect(chat_DB_PATH)
239
+ cursor = conn.cursor()
240
+ try:
241
+ parsed_card = parse_character_card(card_data)
242
+
243
+ # Check if character already exists
244
+ cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (parsed_card['name'],))
245
+ row = cursor.fetchone()
246
+
247
+ if row:
248
+ # Update existing character
249
+ character_id = row[0]
250
+ update_query = """
251
+ UPDATE CharacterCards
252
+ SET description = ?, personality = ?, scenario = ?, image = ?,
253
+ post_history_instructions = ?, first_mes = ?, mes_example = ?,
254
+ creator_notes = ?, system_prompt = ?, alternate_greetings = ?,
255
+ tags = ?, creator = ?, character_version = ?, extensions = ?
256
+ WHERE id = ?
257
+ """
258
+ cursor.execute(update_query, (
259
+ parsed_card['description'], parsed_card['personality'], parsed_card['scenario'],
260
+ parsed_card['image'], parsed_card['post_history_instructions'], parsed_card['first_mes'],
261
+ parsed_card['mes_example'], parsed_card['creator_notes'], parsed_card['system_prompt'],
262
+ parsed_card['alternate_greetings'], parsed_card['tags'], parsed_card['creator'],
263
+ parsed_card['character_version'], parsed_card['extensions'], character_id
264
+ ))
265
+ else:
266
+ # Insert new character
267
+ insert_query = """
268
+ INSERT INTO CharacterCards (name, description, personality, scenario, image,
269
+ post_history_instructions, first_mes, mes_example, creator_notes, system_prompt,
270
+ alternate_greetings, tags, creator, character_version, extensions)
271
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
272
+ """
273
+ cursor.execute(insert_query, (
274
+ parsed_card['name'], parsed_card['description'], parsed_card['personality'],
275
+ parsed_card['scenario'], parsed_card['image'], parsed_card['post_history_instructions'],
276
+ parsed_card['first_mes'], parsed_card['mes_example'], parsed_card['creator_notes'],
277
+ parsed_card['system_prompt'], parsed_card['alternate_greetings'], parsed_card['tags'],
278
+ parsed_card['creator'], parsed_card['character_version'], parsed_card['extensions']
279
+ ))
280
+ character_id = cursor.lastrowid
281
+
282
+ conn.commit()
283
+ return character_id
284
+ except sqlite3.IntegrityError as e:
285
+ logging.error(f"Error adding character card: {e}")
286
+ return None
287
+ except Exception as e:
288
+ logging.error(f"Unexpected error adding character card: {e}")
289
+ return None
290
+ finally:
291
+ conn.close()
292
+
293
+ # def add_character_card(card_data: Dict) -> Optional[int]:
294
+ # """Add or update a character card in the database.
295
+ #
296
+ # Returns the ID of the inserted character or None if failed.
297
+ # """
298
+ # conn = sqlite3.connect(chat_DB_PATH)
299
+ # cursor = conn.cursor()
300
+ # try:
301
+ # # Ensure all required fields are present
302
+ # required_fields = ['name', 'description', 'personality', 'scenario', 'image', 'post_history_instructions', 'first_message']
303
+ # for field in required_fields:
304
+ # if field not in card_data:
305
+ # card_data[field] = '' # Assign empty string if field is missing
306
+ #
307
+ # # Check if character already exists
308
+ # cursor.execute("SELECT id FROM CharacterCards WHERE name = ?", (card_data['name'],))
309
+ # row = cursor.fetchone()
310
+ #
311
+ # if row:
312
+ # # Update existing character
313
+ # character_id = row[0]
314
+ # cursor.execute("""
315
+ # UPDATE CharacterCards
316
+ # SET description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
317
+ # WHERE id = ?
318
+ # """, (
319
+ # card_data['description'],
320
+ # card_data['personality'],
321
+ # card_data['scenario'],
322
+ # card_data['image'],
323
+ # card_data['post_history_instructions'],
324
+ # card_data['first_message'],
325
+ # character_id
326
+ # ))
327
+ # else:
328
+ # # Insert new character
329
+ # cursor.execute("""
330
+ # INSERT INTO CharacterCards (name, description, personality, scenario, image, post_history_instructions, first_message)
331
+ # VALUES (?, ?, ?, ?, ?, ?, ?)
332
+ # """, (
333
+ # card_data['name'],
334
+ # card_data['description'],
335
+ # card_data['personality'],
336
+ # card_data['scenario'],
337
+ # card_data['image'],
338
+ # card_data['post_history_instructions'],
339
+ # card_data['first_message']
340
+ # ))
341
+ # character_id = cursor.lastrowid
342
+ #
343
+ # conn.commit()
344
+ # return cursor.lastrowid
345
+ # except sqlite3.IntegrityError as e:
346
+ # logging.error(f"Error adding character card: {e}")
347
+ # return None
348
+ # except Exception as e:
349
+ # logging.error(f"Unexpected error adding character card: {e}")
350
+ # return None
351
+ # finally:
352
+ # conn.close()
353
+
354
+
355
+ def get_character_cards() -> List[Dict]:
356
+ """Retrieve all character cards from the database."""
357
+ logging.debug(f"Fetching characters from DB: {chat_DB_PATH}")
358
+ conn = sqlite3.connect(chat_DB_PATH)
359
+ cursor = conn.cursor()
360
+ cursor.execute("SELECT * FROM CharacterCards")
361
+ rows = cursor.fetchall()
362
+ columns = [description[0] for description in cursor.description]
363
+ conn.close()
364
+ characters = [dict(zip(columns, row)) for row in rows]
365
+ #logging.debug(f"Characters fetched from DB: {characters}")
366
+ return characters
367
+
368
+
369
+ def get_character_card_by_id(character_id: Union[int, Dict[str, Any]]) -> Optional[Dict[str, Any]]:
370
+ """
371
+ Retrieve a single character card by its ID.
372
+
373
+ Args:
374
+ character_id: Can be either an integer ID or a dictionary containing character data.
375
+
376
+ Returns:
377
+ A dictionary containing the character card data, or None if not found.
378
+ """
379
+ conn = sqlite3.connect(chat_DB_PATH)
380
+ cursor = conn.cursor()
381
+ try:
382
+ if isinstance(character_id, dict):
383
+ # If a dictionary is passed, assume it's already a character card
384
+ return character_id
385
+ elif isinstance(character_id, int):
386
+ # If an integer is passed, fetch the character from the database
387
+ cursor.execute("SELECT * FROM CharacterCards WHERE id = ?", (character_id,))
388
+ row = cursor.fetchone()
389
+ if row:
390
+ columns = [description[0] for description in cursor.description]
391
+ return dict(zip(columns, row))
392
+ else:
393
+ logging.warning(f"Invalid type for character_id: {type(character_id)}")
394
+ return None
395
+ except Exception as e:
396
+ logging.error(f"Error in get_character_card_by_id: {e}")
397
+ return None
398
+ finally:
399
+ conn.close()
400
+
401
+
402
+ def update_character_card(character_id: int, card_data: Dict) -> bool:
403
+ """Update an existing character card."""
404
+ conn = sqlite3.connect(chat_DB_PATH)
405
+ cursor = conn.cursor()
406
+ try:
407
+ cursor.execute("""
408
+ UPDATE CharacterCards
409
+ SET name = ?, description = ?, personality = ?, scenario = ?, image = ?, post_history_instructions = ?, first_message = ?
410
+ WHERE id = ?
411
+ """, (
412
+ card_data.get('name'),
413
+ card_data.get('description'),
414
+ card_data.get('personality'),
415
+ card_data.get('scenario'),
416
+ card_data.get('image'),
417
+ card_data.get('post_history_instructions', ''),
418
+ card_data.get('first_message', "Hello! I'm ready to chat."),
419
+ character_id
420
+ ))
421
+ conn.commit()
422
+ return cursor.rowcount > 0
423
+ except sqlite3.IntegrityError as e:
424
+ logging.error(f"Error updating character card: {e}")
425
+ return False
426
+ finally:
427
+ conn.close()
428
+
429
+
430
+ def delete_character_card(character_id: int) -> bool:
431
+ """Delete a character card and its associated chats."""
432
+ conn = sqlite3.connect(chat_DB_PATH)
433
+ cursor = conn.cursor()
434
+ try:
435
+ # Delete associated chats first due to foreign key constraint
436
+ cursor.execute("DELETE FROM CharacterChats WHERE character_id = ?", (character_id,))
437
+ cursor.execute("DELETE FROM CharacterCards WHERE id = ?", (character_id,))
438
+ conn.commit()
439
+ return cursor.rowcount > 0
440
+ except sqlite3.Error as e:
441
+ logging.error(f"Error deleting character card: {e}")
442
+ return False
443
+ finally:
444
+ conn.close()
445
+
446
+
447
+ def add_character_chat(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]], keywords: Optional[List[str]] = None, is_snapshot: bool = False) -> Optional[int]:
448
+ """
449
+ Add a new chat history for a character, optionally associating keywords.
450
+
451
+ Args:
452
+ character_id (int): The ID of the character.
453
+ conversation_name (str): Name of the conversation.
454
+ chat_history (List[Tuple[str, str]]): List of (user, bot) message tuples.
455
+ keywords (Optional[List[str]]): List of keywords to associate with this chat.
456
+ is_snapshot (bool, optional): Whether this chat is a snapshot.
457
+
458
+ Returns:
459
+ Optional[int]: The ID of the inserted chat or None if failed.
460
+ """
461
+ conn = sqlite3.connect(chat_DB_PATH)
462
+ cursor = conn.cursor()
463
+ try:
464
+ chat_history_json = json.dumps(chat_history)
465
+ cursor.execute("""
466
+ INSERT INTO CharacterChats (character_id, conversation_name, chat_history, is_snapshot)
467
+ VALUES (?, ?, ?, ?)
468
+ """, (
469
+ character_id,
470
+ conversation_name,
471
+ chat_history_json,
472
+ is_snapshot
473
+ ))
474
+ chat_id = cursor.lastrowid
475
+
476
+ if keywords:
477
+ # Insert keywords into ChatKeywords table
478
+ keyword_records = [(chat_id, keyword.strip().lower()) for keyword in keywords]
479
+ cursor.executemany("""
480
+ INSERT INTO ChatKeywords (chat_id, keyword)
481
+ VALUES (?, ?)
482
+ """, keyword_records)
483
+
484
+ conn.commit()
485
+ return chat_id
486
+ except sqlite3.Error as e:
487
+ logging.error(f"Error adding character chat: {e}")
488
+ return None
489
+ finally:
490
+ conn.close()
491
+
492
+
493
+ def get_character_chats(character_id: Optional[int] = None) -> List[Dict]:
494
+ """Retrieve all chats, or chats for a specific character if character_id is provided."""
495
+ conn = sqlite3.connect(chat_DB_PATH)
496
+ cursor = conn.cursor()
497
+ if character_id is not None:
498
+ cursor.execute("SELECT * FROM CharacterChats WHERE character_id = ?", (character_id,))
499
+ else:
500
+ cursor.execute("SELECT * FROM CharacterChats")
501
+ rows = cursor.fetchall()
502
+ columns = [description[0] for description in cursor.description]
503
+ conn.close()
504
+ return [dict(zip(columns, row)) for row in rows]
505
+
506
+
507
+ def get_character_chat_by_id(chat_id: int) -> Optional[Dict]:
508
+ """Retrieve a single chat by its ID."""
509
+ conn = sqlite3.connect(chat_DB_PATH)
510
+ cursor = conn.cursor()
511
+ cursor.execute("SELECT * FROM CharacterChats WHERE id = ?", (chat_id,))
512
+ row = cursor.fetchone()
513
+ conn.close()
514
+ if row:
515
+ columns = [description[0] for description in cursor.description]
516
+ chat = dict(zip(columns, row))
517
+ chat['chat_history'] = json.loads(chat['chat_history'])
518
+ return chat
519
+ return None
520
+
521
+
522
+ def search_character_chats(query: str, character_id: Optional[int] = None) -> Tuple[List[Dict], str]:
523
+ """
524
+ Search for character chats using FTS5, optionally filtered by character_id.
525
+
526
+ Args:
527
+ query (str): The search query.
528
+ character_id (Optional[int]): The ID of the character to filter chats by.
529
+
530
+ Returns:
531
+ Tuple[List[Dict], str]: A list of matching chats and a status message.
532
+ """
533
+ if not query.strip():
534
+ return [], "Please enter a search query."
535
+
536
+ conn = sqlite3.connect(chat_DB_PATH)
537
+ cursor = conn.cursor()
538
+ try:
539
+ if character_id is not None:
540
+ # Search with character_id filter
541
+ cursor.execute("""
542
+ SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
543
+ FROM CharacterChats_fts
544
+ JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
545
+ WHERE CharacterChats_fts MATCH ? AND CharacterChats.character_id = ?
546
+ ORDER BY rank
547
+ """, (query, character_id))
548
+ else:
549
+ # Search without character_id filter
550
+ cursor.execute("""
551
+ SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
552
+ FROM CharacterChats_fts
553
+ JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
554
+ WHERE CharacterChats_fts MATCH ?
555
+ ORDER BY rank
556
+ """, (query,))
557
+
558
+ rows = cursor.fetchall()
559
+ columns = [description[0] for description in cursor.description]
560
+ results = [dict(zip(columns, row)) for row in rows]
561
+
562
+ if character_id is not None:
563
+ status_message = f"Found {len(results)} chat(s) matching '{query}' for the selected character."
564
+ else:
565
+ status_message = f"Found {len(results)} chat(s) matching '{query}' across all characters."
566
+
567
+ return results, status_message
568
+ except Exception as e:
569
+ logging.error(f"Error searching chats with FTS5: {e}")
570
+ return [], f"Error occurred during search: {e}"
571
+ finally:
572
+ conn.close()
573
+
574
+ def update_character_chat(chat_id: int, chat_history: List[Tuple[str, str]]) -> bool:
575
+ """Update an existing chat history."""
576
+ conn = sqlite3.connect(chat_DB_PATH)
577
+ cursor = conn.cursor()
578
+ try:
579
+ chat_history_json = json.dumps(chat_history)
580
+ cursor.execute("""
581
+ UPDATE CharacterChats
582
+ SET chat_history = ?
583
+ WHERE id = ?
584
+ """, (
585
+ chat_history_json,
586
+ chat_id
587
+ ))
588
+ conn.commit()
589
+ return cursor.rowcount > 0
590
+ except sqlite3.Error as e:
591
+ logging.error(f"Error updating character chat: {e}")
592
+ return False
593
+ finally:
594
+ conn.close()
595
+
596
+
597
+ def delete_character_chat(chat_id: int) -> bool:
598
+ """Delete a specific chat."""
599
+ conn = sqlite3.connect(chat_DB_PATH)
600
+ cursor = conn.cursor()
601
+ try:
602
+ cursor.execute("DELETE FROM CharacterChats WHERE id = ?", (chat_id,))
603
+ conn.commit()
604
+ return cursor.rowcount > 0
605
+ except sqlite3.Error as e:
606
+ logging.error(f"Error deleting character chat: {e}")
607
+ return False
608
+ finally:
609
+ conn.close()
610
+
611
+
612
+ def fetch_keywords_for_chats(keywords: List[str]) -> List[int]:
613
+ """
614
+ Fetch chat IDs associated with any of the specified keywords.
615
+
616
+ Args:
617
+ keywords (List[str]): List of keywords to search for.
618
+
619
+ Returns:
620
+ List[int]: List of chat IDs associated with the keywords.
621
+ """
622
+ if not keywords:
623
+ return []
624
+
625
+ conn = sqlite3.connect(chat_DB_PATH)
626
+ cursor = conn.cursor()
627
+ try:
628
+ # Construct the WHERE clause to search for each keyword
629
+ keyword_clauses = " OR ".join(["keyword = ?"] * len(keywords))
630
+ sql_query = f"SELECT DISTINCT chat_id FROM ChatKeywords WHERE {keyword_clauses}"
631
+ cursor.execute(sql_query, keywords)
632
+ rows = cursor.fetchall()
633
+ chat_ids = [row[0] for row in rows]
634
+ return chat_ids
635
+ except Exception as e:
636
+ logging.error(f"Error in fetch_keywords_for_chats: {e}")
637
+ return []
638
+ finally:
639
+ conn.close()
640
+
641
+
642
+ def save_chat_history_to_character_db(character_id: int, conversation_name: str, chat_history: List[Tuple[str, str]]) -> Optional[int]:
643
+ """Save chat history to the CharacterChats table.
644
+
645
+ Returns the ID of the inserted chat or None if failed.
646
+ """
647
+ return add_character_chat(character_id, conversation_name, chat_history)
648
+
649
+
650
+ def search_db(query: str, fields: List[str], where_clause: str = "", page: int = 1, results_per_page: int = 5) -> List[Dict[str, Any]]:
651
+ """
652
+ Perform a full-text search on specified fields with optional filtering and pagination.
653
+
654
+ Args:
655
+ query (str): The search query.
656
+ fields (List[str]): List of fields to search in.
657
+ where_clause (str, optional): Additional SQL WHERE clause to filter results.
658
+ page (int, optional): Page number for pagination.
659
+ results_per_page (int, optional): Number of results per page.
660
+
661
+ Returns:
662
+ List[Dict[str, Any]]: List of matching chat records with content and metadata.
663
+ """
664
+ if not query.strip():
665
+ return []
666
+
667
+ conn = sqlite3.connect(chat_DB_PATH)
668
+ cursor = conn.cursor()
669
+ try:
670
+ # Construct the MATCH query for FTS5
671
+ match_query = " AND ".join(fields) + f" MATCH ?"
672
+ # Adjust the query with the fields
673
+ fts_query = f"""
674
+ SELECT CharacterChats.id, CharacterChats.conversation_name, CharacterChats.chat_history
675
+ FROM CharacterChats_fts
676
+ JOIN CharacterChats ON CharacterChats_fts.rowid = CharacterChats.id
677
+ WHERE {match_query}
678
+ """
679
+ if where_clause:
680
+ fts_query += f" AND ({where_clause})"
681
+ fts_query += " ORDER BY rank LIMIT ? OFFSET ?"
682
+ offset = (page - 1) * results_per_page
683
+ cursor.execute(fts_query, (query, results_per_page, offset))
684
+ rows = cursor.fetchall()
685
+ columns = [description[0] for description in cursor.description]
686
+ results = [dict(zip(columns, row)) for row in rows]
687
+ return results
688
+ except Exception as e:
689
+ logging.error(f"Error in search_db: {e}")
690
+ return []
691
+ finally:
692
+ conn.close()
693
+
694
+
695
+ def perform_full_text_search_chat(query: str, relevant_chat_ids: List[int], page: int = 1, results_per_page: int = 5) -> \
696
+ List[Dict[str, Any]]:
697
+ """
698
+ Perform a full-text search within the specified chat IDs using FTS5.
699
+
700
+ Args:
701
+ query (str): The user's query.
702
+ relevant_chat_ids (List[int]): List of chat IDs to search within.
703
+ page (int): Pagination page number.
704
+ results_per_page (int): Number of results per page.
705
+
706
+ Returns:
707
+ List[Dict[str, Any]]: List of search results with content and metadata.
708
+ """
709
+ try:
710
+ # Construct a WHERE clause to limit the search to relevant chat IDs
711
+ where_clause = " OR ".join([f"media_id = {chat_id}" for chat_id in relevant_chat_ids])
712
+ if not where_clause:
713
+ where_clause = "1" # No restriction if no chat IDs
714
+
715
+ # Perform full-text search using FTS5
716
+ fts_results = search_db(query, ["content"], where_clause, page=page, results_per_page=results_per_page)
717
+
718
+ filtered_fts_results = [
719
+ {
720
+ "content": result['content'],
721
+ "metadata": {"media_id": result['id']}
722
+ }
723
+ for result in fts_results
724
+ if result['id'] in relevant_chat_ids
725
+ ]
726
+ return filtered_fts_results
727
+ except Exception as e:
728
+ logging.error(f"Error in perform_full_text_search_chat: {str(e)}")
729
+ return []
730
+
731
+
732
+ def fetch_all_chats() -> List[Dict[str, Any]]:
733
+ """
734
+ Fetch all chat messages from the database.
735
+
736
+ Returns:
737
+ List[Dict[str, Any]]: List of chat messages with relevant metadata.
738
+ """
739
+ try:
740
+ chats = get_character_chats() # Modify this function to retrieve all chats
741
+ return chats
742
+ except Exception as e:
743
+ logging.error(f"Error fetching all chats: {str(e)}")
744
+ return []
745
+
746
+
747
+ def search_character_chat(query: str, fts_top_k: int = 10, relevant_media_ids: List[str] = None) -> List[Dict[str, Any]]:
748
+ """
749
+ Perform a full-text search on the Character Chat database.
750
+
751
+ Args:
752
+ query: Search query string.
753
+ fts_top_k: Maximum number of results to return.
754
+ relevant_media_ids: Optional list of character IDs to filter results.
755
+
756
+ Returns:
757
+ List of search results with content and metadata.
758
+ """
759
+ if not query.strip():
760
+ return []
761
+
762
+ try:
763
+ # Construct a WHERE clause to limit the search to relevant character IDs
764
+ where_clause = ""
765
+ if relevant_media_ids:
766
+ placeholders = ','.join(['?'] * len(relevant_media_ids))
767
+ where_clause = f"CharacterChats.character_id IN ({placeholders})"
768
+
769
+ # Perform full-text search using existing search_db function
770
+ results = search_db(query, ["conversation_name", "chat_history"], where_clause, results_per_page=fts_top_k)
771
+
772
+ # Format results
773
+ formatted_results = []
774
+ for r in results:
775
+ formatted_results.append({
776
+ "content": r['chat_history'],
777
+ "metadata": {
778
+ "chat_id": r['id'],
779
+ "conversation_name": r['conversation_name'],
780
+ "character_id": r['character_id']
781
+ }
782
+ })
783
+
784
+ return formatted_results
785
+
786
+ except Exception as e:
787
+ logging.error(f"Error in search_character_chat: {e}")
788
+ return []
789
+
790
+
791
+ def search_character_cards(query: str, fts_top_k: int = 10, relevant_media_ids: List[str] = None) -> List[Dict[str, Any]]:
792
+ """
793
+ Perform a full-text search on the Character Cards database.
794
+
795
+ Args:
796
+ query: Search query string.
797
+ fts_top_k: Maximum number of results to return.
798
+ relevant_media_ids: Optional list of character IDs to filter results.
799
+
800
+ Returns:
801
+ List of search results with content and metadata.
802
+ """
803
+ if not query.strip():
804
+ return []
805
+
806
+ try:
807
+ conn = sqlite3.connect(chat_DB_PATH)
808
+ cursor = conn.cursor()
809
+
810
+ # Construct the query
811
+ sql_query = """
812
+ SELECT CharacterCards.id, CharacterCards.name, CharacterCards.description, CharacterCards.personality, CharacterCards.scenario
813
+ FROM CharacterCards_fts
814
+ JOIN CharacterCards ON CharacterCards_fts.rowid = CharacterCards.id
815
+ WHERE CharacterCards_fts MATCH ?
816
+ """
817
+
818
+ params = [query]
819
+
820
+ # Add filtering by character IDs if provided
821
+ if relevant_media_ids:
822
+ placeholders = ','.join(['?'] * len(relevant_media_ids))
823
+ sql_query += f" AND CharacterCards.id IN ({placeholders})"
824
+ params.extend(relevant_media_ids)
825
+
826
+ sql_query += " LIMIT ?"
827
+ params.append(fts_top_k)
828
+
829
+ cursor.execute(sql_query, params)
830
+ rows = cursor.fetchall()
831
+ columns = [description[0] for description in cursor.description]
832
+
833
+ results = [dict(zip(columns, row)) for row in rows]
834
+
835
+ # Format results
836
+ formatted_results = []
837
+ for r in results:
838
+ content = f"Name: {r['name']}\nDescription: {r['description']}\nPersonality: {r['personality']}\nScenario: {r['scenario']}"
839
+ formatted_results.append({
840
+ "content": content,
841
+ "metadata": {
842
+ "character_id": r['id'],
843
+ "name": r['name']
844
+ }
845
+ })
846
+
847
+ return formatted_results
848
+
849
+ except Exception as e:
850
+ logging.error(f"Error in search_character_cards: {e}")
851
+ return []
852
+ finally:
853
+ conn.close()
854
+
855
+
856
+ def fetch_character_ids_by_keywords(keywords: List[str]) -> List[int]:
857
+ """
858
+ Fetch character IDs associated with any of the specified keywords.
859
+
860
+ Args:
861
+ keywords (List[str]): List of keywords to search for.
862
+
863
+ Returns:
864
+ List[int]: List of character IDs associated with the keywords.
865
+ """
866
+ if not keywords:
867
+ return []
868
+
869
+ conn = sqlite3.connect(chat_DB_PATH)
870
+ cursor = conn.cursor()
871
+ try:
872
+ # Assuming 'tags' column in CharacterCards table stores tags as JSON array
873
+ placeholders = ','.join(['?'] * len(keywords))
874
+ sql_query = f"""
875
+ SELECT DISTINCT id FROM CharacterCards
876
+ WHERE EXISTS (
877
+ SELECT 1 FROM json_each(tags)
878
+ WHERE json_each.value IN ({placeholders})
879
+ )
880
+ """
881
+ cursor.execute(sql_query, keywords)
882
+ rows = cursor.fetchall()
883
+ character_ids = [row[0] for row in rows]
884
+ return character_ids
885
+ except Exception as e:
886
+ logging.error(f"Error in fetch_character_ids_by_keywords: {e}")
887
+ return []
888
+ finally:
889
+ conn.close()
890
+
891
+
892
+ ###################################################################
893
+ #
894
+ # Character Keywords
895
+
896
+ def view_char_keywords():
897
+ try:
898
+ with sqlite3.connect(chat_DB_PATH) as conn:
899
+ cursor = conn.cursor()
900
+ cursor.execute("""
901
+ SELECT DISTINCT keyword
902
+ FROM CharacterCards
903
+ CROSS JOIN json_each(tags)
904
+ WHERE json_valid(tags)
905
+ ORDER BY keyword
906
+ """)
907
+ keywords = cursor.fetchall()
908
+ if keywords:
909
+ keyword_list = [k[0] for k in keywords]
910
+ return "### Current Character Keywords:\n" + "\n".join(
911
+ [f"- {k}" for k in keyword_list])
912
+ return "No keywords found."
913
+ except Exception as e:
914
+ return f"Error retrieving keywords: {str(e)}"
915
+
916
+
917
+ def add_char_keywords(name: str, keywords: str):
918
+ try:
919
+ keywords_list = [k.strip() for k in keywords.split(",") if k.strip()]
920
+ with sqlite3.connect('character_chat.db') as conn:
921
+ cursor = conn.cursor()
922
+ cursor.execute(
923
+ "SELECT tags FROM CharacterCards WHERE name = ?",
924
+ (name,)
925
+ )
926
+ result = cursor.fetchone()
927
+ if not result:
928
+ return "Character not found."
929
+
930
+ current_tags = result[0] if result[0] else "[]"
931
+ current_keywords = set(current_tags[1:-1].split(',')) if current_tags != "[]" else set()
932
+ updated_keywords = current_keywords.union(set(keywords_list))
933
+
934
+ cursor.execute(
935
+ "UPDATE CharacterCards SET tags = ? WHERE name = ?",
936
+ (str(list(updated_keywords)), name)
937
+ )
938
+ conn.commit()
939
+ return f"Successfully added keywords to character {name}"
940
+ except Exception as e:
941
+ return f"Error adding keywords: {str(e)}"
942
+
943
+
944
+ def delete_char_keyword(char_name: str, keyword: str) -> str:
945
+ """
946
+ Delete a keyword from a character's tags.
947
+
948
+ Args:
949
+ char_name (str): The name of the character
950
+ keyword (str): The keyword to delete
951
+
952
+ Returns:
953
+ str: Success/failure message
954
+ """
955
+ try:
956
+ with sqlite3.connect(chat_DB_PATH) as conn:
957
+ cursor = conn.cursor()
958
+
959
+ # First, check if the character exists
960
+ cursor.execute("SELECT tags FROM CharacterCards WHERE name = ?", (char_name,))
961
+ result = cursor.fetchone()
962
+
963
+ if not result:
964
+ return f"Character '{char_name}' not found."
965
+
966
+ # Parse existing tags
967
+ current_tags = json.loads(result[0]) if result[0] else []
968
+
969
+ if keyword not in current_tags:
970
+ return f"Keyword '{keyword}' not found in character '{char_name}' tags."
971
+
972
+ # Remove the keyword
973
+ updated_tags = [tag for tag in current_tags if tag != keyword]
974
+
975
+ # Update the character's tags
976
+ cursor.execute(
977
+ "UPDATE CharacterCards SET tags = ? WHERE name = ?",
978
+ (json.dumps(updated_tags), char_name)
979
+ )
980
+ conn.commit()
981
+
982
+ logging.info(f"Keyword '{keyword}' deleted from character '{char_name}'")
983
+ return f"Successfully deleted keyword '{keyword}' from character '{char_name}'."
984
+
985
+ except Exception as e:
986
+ error_msg = f"Error deleting keyword: {str(e)}"
987
+ logging.error(error_msg)
988
+ return error_msg
989
+
990
+
991
+ def export_char_keywords_to_csv() -> Tuple[str, str]:
992
+ """
993
+ Export all character keywords to a CSV file with associated metadata.
994
+
995
+ Returns:
996
+ Tuple[str, str]: (status_message, file_path)
997
+ """
998
+ import csv
999
+ from tempfile import NamedTemporaryFile
1000
+ from datetime import datetime
1001
+
1002
+ try:
1003
+ # Create a temporary CSV file
1004
+ temp_file = NamedTemporaryFile(mode='w+', delete=False, suffix='.csv', newline='')
1005
+
1006
+ with sqlite3.connect(chat_DB_PATH) as conn:
1007
+ cursor = conn.cursor()
1008
+
1009
+ # Get all characters and their tags
1010
+ cursor.execute("""
1011
+ SELECT
1012
+ name,
1013
+ tags,
1014
+ (SELECT COUNT(*) FROM CharacterChats WHERE CharacterChats.character_id = CharacterCards.id) as chat_count
1015
+ FROM CharacterCards
1016
+ WHERE json_valid(tags)
1017
+ ORDER BY name
1018
+ """)
1019
+
1020
+ results = cursor.fetchall()
1021
+
1022
+ # Process the results to create rows for the CSV
1023
+ csv_rows = []
1024
+ for name, tags_json, chat_count in results:
1025
+ tags = json.loads(tags_json) if tags_json else []
1026
+ for tag in tags:
1027
+ csv_rows.append([
1028
+ tag, # keyword
1029
+ name, # character name
1030
+ chat_count # number of chats
1031
+ ])
1032
+
1033
+ # Write to CSV
1034
+ writer = csv.writer(temp_file)
1035
+ writer.writerow(['Keyword', 'Character Name', 'Number of Chats'])
1036
+ writer.writerows(csv_rows)
1037
+
1038
+ temp_file.close()
1039
+
1040
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
1041
+ status_msg = f"Successfully exported {len(csv_rows)} character keyword entries to CSV."
1042
+ logging.info(status_msg)
1043
+
1044
+ return status_msg, temp_file.name
1045
+
1046
+ except Exception as e:
1047
+ error_msg = f"Error exporting keywords: {str(e)}"
1048
+ logging.error(error_msg)
1049
+ return error_msg, ""
1050
+
1051
+ #
1052
+ # End of Character chat keyword functions
1053
+ ######################################################
1054
+
1055
+
1056
+ #
1057
+ # End of Character_Chat_DB.py
1058
+ #######################################################################################################################