Prudvireddy commited on
Commit
063ae63
1 Parent(s): 8e4df09

Update tools.py

Browse files
Files changed (1) hide show
  1. tools.py +114 -131
tools.py CHANGED
@@ -20,104 +20,16 @@ import numpy as np
20
  from PIL import Image, ImageDraw, ImageFont
21
  from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips, ImageClip
22
 
23
- def process_script(script):
24
- """Used to process the script into dictionary format"""
25
- dict = {}
26
- text_for_image_generation = re.findall(r'<image>(.*?)</?image>', script, re.DOTALL)
27
- text_for_speech_generation = re.findall(r'<narration>(.*?)</?narration>', script, re.DOTALL)
28
- dict['text_for_image_generation'] = text_for_image_generation
29
- dict['text_for_speech_generation'] = text_for_speech_generation
30
- return dict
31
-
32
- @tool
33
- def image_generator(script):
34
- """Generates images for the given script.
35
- Saves it to images_dir and return path
36
- Args:
37
- script: a complete script containing narrations and image descriptions
38
- Returns:
39
- A list of images in bytes format.
40
- """
41
- # images_dir = './outputs/images'
42
- # for filename in os.listdir(images_dir):
43
- # file_path = os.path.join(images_dir, filename)
44
- # if os.path.isfile(file_path):
45
- # os.remove(file_path)
46
-
47
- dict = process_script(script)
48
- images_list = []
49
- for i, text in enumerate(dict['text_for_image_generation']):
50
- response = requests.post(
51
- f"https://api.stability.ai/v2beta/stable-image/generate/core",
52
- headers={
53
- "authorization": f'sk-2h9CmjC33uxc9W8fmx23oIicgqHk2jVtBF9KoEfdyTUIfODt',
54
- "accept": "image/*"
55
- },
56
- files={"none": ''},
57
- data={
58
- "prompt": text,
59
- "output_format": "png",
60
- 'aspect_ratio': "9:16",
61
- },
62
- )
63
- print('image generated')
64
-
65
- if response.status_code == 200:
66
- images_list.append(response.content)
67
- else:
68
- raise Exception(str(response.json()))
69
-
70
- return images_list
71
-
72
- @tool
73
- def generate_speech(script, lang='en', speed=1.2, max_segments=2):
74
- """
75
- Generates speech for the given script using gTTS and adjusts the speed.
76
-
77
- Args:
78
- script (str): The script containing narration segments.
79
- lang (str, optional): The language code (default is 'en' for English).
80
- speed (float, optional): The speed factor of speech generation (default is 1.0).
81
- max_segments (int, optional): Maximum number of speech segments to generate (default is 2).
82
-
83
- Returns:
84
- list: List of generated speech segments as bytes.
85
- """
86
- dict = process_script(script)
87
- speeches_list = []
88
-
89
- # Ensure we limit the number of segments processed
90
- segments_to_process = min(max_segments, len(dict['text_for_speech_generation']))
91
-
92
- for text in dict['text_for_speech_generation'][:segments_to_process]:
93
- # Generate speech
94
- tts = gTTS(text=text, lang=lang)
95
-
96
- # Save speech to BytesIO
97
- speech_data = io.BytesIO()
98
- tts.write_to_fp(speech_data)
99
- speech_data.seek(0)
100
-
101
- # Adjust speed if necessary
102
- if speed != 1.0:
103
- audio_segment = AudioSegment.from_file(speech_data, format="mp3")
104
- audio_segment = audio_segment.speedup(playback_speed=speed)
105
- speech_data = io.BytesIO()
106
- audio_segment.export(speech_data, format="mp3")
107
- speech_data.seek(0)
108
-
109
- speeches_list.append(speech_data.read())
110
-
111
- return speeches_list
112
-
113
  def split_text_into_chunks(text, chunk_size):
114
  words = text.split()
115
  return [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]
116
 
117
- def add_text_to_video(input_video, output_video, text, duration=1, fontsize=40, fontcolor=(255, 255, 255),
118
- outline_thickness=2, outline_color=(0, 0, 0), delay_between_chunks=0.1,
119
- font_path=os.path.join(os.path.dirname(os.path.abspath(__name__)),'Montserrat-Bold.ttf')):
120
-
 
 
121
  chunks = split_text_into_chunks(text, 3) # Adjust chunk size as needed
122
 
123
  cap = cv2.VideoCapture(input_video)
@@ -189,6 +101,8 @@ def add_text_to_video(input_video, output_video, text, duration=1, fontsize=40,
189
  cap.release()
190
  out.release()
191
  cv2.destroyAllWindows()
 
 
192
 
193
  def apply_zoom_in_effect(clip, zoom_factor=1.2):
194
  width, height = clip.size
@@ -208,46 +122,32 @@ def apply_zoom_in_effect(clip, zoom_factor=1.2):
208
 
209
  return clip.fl(zoom_in_effect, apply_to=['mask'])
210
 
211
- @tool
212
- def create_video_from_images_and_audio(images, speeches, zoom_factor=1.2):
213
- """Creates video using images and audios.
214
- Args:
215
- images: list of images in bytes format
216
- speeches: list of speeches in bytes format"""
217
-
218
  clips = []
219
  temp_files = []
220
 
221
- for i in range(min(len(images), len(speeches))):
222
- # Save image to a temporary file
223
- img_path = f"./temp_image_{i}.png"
224
- with open(img_path, 'wb') as img_file:
225
- img_file.write(images[i])
226
-
227
- # Create an ImageClip
228
- img_clip = ImageClip(img_path)
229
-
230
- # Save audio to a temporary file
231
- audio_path = f"./temp_audio_{i}.mp3"
232
- with open(audio_path, 'wb') as audio_file:
233
- audio_file.write(speeches[i])
234
-
235
- # Create an AudioClip
236
- audioclip = AudioFileClip(audio_path)
237
-
238
- # Set the duration of the video clip to match the audio duration
239
  videoclip = img_clip.set_duration(audioclip.duration)
240
  zoomed_clip = apply_zoom_in_effect(videoclip, zoom_factor)
241
 
242
- # Generate captions using the text for speech generation
243
- caption = process_script(script)['text_for_speech_generation'][i]
 
 
 
 
 
244
 
245
- temp_video_path = f"./temp_zoomed_{i}.mp4"
246
  zoomed_clip.write_videofile(temp_video_path, codec='libx264', fps=24)
247
  temp_files.append(temp_video_path)
248
 
249
- final_video_path = f"./temp_captioned_{i}.mp4"
250
- add_text_to_video(temp_video_path, final_video_path, caption, duration=1, fontsize=60)
251
  temp_files.append(final_video_path)
252
 
253
  final_clip = VideoFileClip(final_video_path)
@@ -256,20 +156,25 @@ def create_video_from_images_and_audio(images, speeches, zoom_factor=1.2):
256
  clips.append(final_clip)
257
 
258
  final_clip = concatenate_videoclips(clips)
259
- final_clip.write_videofile("./final_video.mp4", codec='libx264', fps=24)
 
260
 
261
  # Close all video files properly
262
  for clip in clips:
263
  clip.close()
264
 
265
  # Remove all temporary files
266
- # for temp_file in temp_files:
267
- # try:
268
- # os.remove(temp_file)
269
- # except Exception as e:
270
- # print(f"Error removing file {temp_file}: {e}")
271
 
272
- return "./final_video.mp4"
 
 
 
 
273
 
274
  class WikiInputs(BaseModel):
275
  """Inputs to the wikipedia tool."""
@@ -290,3 +195,81 @@ wiki = Tool(
290
  func = wiki_tool.run,
291
  description= "{query:'input here'}"
292
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
  from PIL import Image, ImageDraw, ImageFont
21
  from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips, ImageClip
22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  def split_text_into_chunks(text, chunk_size):
24
  words = text.split()
25
  return [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]
26
 
27
+ def add_text_to_video(input_video, text, duration=1, fontsize=40, fontcolor=(255, 255, 255),
28
+ outline_thickness=2, outline_color=(0, 0, 0), delay_between_chunks=0.3,
29
+ font_path='Montserrat-Bold.ttf'):
30
+ temp_output_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
31
+ output_video = temp_output_file.name
32
+
33
  chunks = split_text_into_chunks(text, 3) # Adjust chunk size as needed
34
 
35
  cap = cv2.VideoCapture(input_video)
 
101
  cap.release()
102
  out.release()
103
  cv2.destroyAllWindows()
104
+
105
+ return output_video
106
 
107
  def apply_zoom_in_effect(clip, zoom_factor=1.2):
108
  width, height = clip.size
 
122
 
123
  return clip.fl(zoom_in_effect, apply_to=['mask'])
124
 
125
+ def create_video_from_images_and_audio(images_dir, speeches_dir, zoom_factor=1.2):
126
+ client = Groq(api_key='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')
127
+ images_paths = sorted(os.listdir(images_dir))
128
+ audio_paths = sorted(os.listdir(speeches_dir))
 
 
 
129
  clips = []
130
  temp_files = []
131
 
132
+ for i in range(min(len(images_paths), len(audio_paths))):
133
+ img_clip = ImageClip(os.path.join(images_dir, images_paths[i]))
134
+ audioclip = AudioFileClip(os.path.join(speeches_dir, audio_paths[i]))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
135
  videoclip = img_clip.set_duration(audioclip.duration)
136
  zoomed_clip = apply_zoom_in_effect(videoclip, zoom_factor)
137
 
138
+ with open(os.path.join(speeches_dir, audio_paths[i]), "rb") as file:
139
+ transcription = client.audio.transcriptions.create(
140
+ file=(audio_paths[i], file.read()),
141
+ model="whisper-large-v3",
142
+ response_format="verbose_json",
143
+ )
144
+ caption = transcription.text
145
 
146
+ temp_video_path = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name
147
  zoomed_clip.write_videofile(temp_video_path, codec='libx264', fps=24)
148
  temp_files.append(temp_video_path)
149
 
150
+ final_video_path = add_text_to_video(temp_video_path, caption, duration=1, fontsize=60)
 
151
  temp_files.append(final_video_path)
152
 
153
  final_clip = VideoFileClip(final_video_path)
 
156
  clips.append(final_clip)
157
 
158
  final_clip = concatenate_videoclips(clips)
159
+ temp_final_video = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4').name
160
+ final_clip.write_videofile(temp_final_video, codec='libx264', fps=24)
161
 
162
  # Close all video files properly
163
  for clip in clips:
164
  clip.close()
165
 
166
  # Remove all temporary files
167
+ for temp_file in temp_files:
168
+ try:
169
+ os.remove(temp_file)
170
+ except Exception as e:
171
+ print(f"Error removing file {temp_file}: {e}")
172
 
173
+ return temp_final_video
174
+
175
+ from langchain.pydantic_v1 import BaseModel, Field
176
+ from langchain_community.tools import WikipediaQueryRun
177
+ from langchain_community.utilities import WikipediaAPIWrapper
178
 
179
  class WikiInputs(BaseModel):
180
  """Inputs to the wikipedia tool."""
 
195
  func = wiki_tool.run,
196
  description= "{query:'input here'}"
197
  )
198
+
199
+ def process_script(script):
200
+ """Used to process the script into dictionary format"""
201
+ dict = {}
202
+ text_for_image_generation = re.findall(r'<image>(.*?)</?image>', script, re.DOTALL)
203
+ text_for_speech_generation = re.findall(r'<narration>(.*?)</?narration>', script, re.DOTALL)
204
+ dict['text_for_image_generation'] = text_for_image_generation
205
+ dict['text_for_speech_generation'] = text_for_speech_generation
206
+ return dict
207
+
208
+ def generate_speech(text, lang='en', speed=1.15, num=0):
209
+ """
210
+ Generates speech for the given script using gTTS and adjusts the speed.
211
+ """
212
+ temp_speech_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3')
213
+ temp_speech_path = temp_speech_file.name
214
+
215
+ tts = gTTS(text=text, lang=lang)
216
+ tts.save(temp_speech_path)
217
+
218
+ sound = AudioSegment.from_file(temp_speech_path)
219
+ if speed != 1.0:
220
+ sound_with_altered_speed = sound._spawn(sound.raw_data, overrides={
221
+ "frame_rate": int(sound.frame_rate * speed)
222
+ }).set_frame_rate(sound.frame_rate)
223
+ sound_with_altered_speed.export(temp_speech_path, format="mp3")
224
+ else:
225
+ sound.export(temp_speech_path, format="mp3")
226
+
227
+ temp_speech_file.close()
228
+ return temp_speech_path
229
+
230
+ def image_generator(script):
231
+ """Generates images for the given script.
232
+ Saves it to a temporary directory and returns the path.
233
+ Args:
234
+ script: a complete script containing narrations and image descriptions."""
235
+ images_dir = tempfile.mkdtemp()
236
+
237
+ dict = process_script(script)
238
+ for i, text in enumerate(dict['text_for_image_generation']):
239
+ response = requests.post(
240
+ f"https://api.stability.ai/v2beta/stable-image/generate/core",
241
+ headers={
242
+ "authorization": 'sk-amF2RAcBrDHNwuFUivtDsZFGJ6hzISz53NhtjdY9bs0SsrLc',
243
+ "accept": "image/*"
244
+ },
245
+ files={"none": ''},
246
+ data={
247
+ "prompt": text,
248
+ "output_format": "png",
249
+ 'aspect_ratio': "9:16",
250
+ },
251
+ )
252
+
253
+ if response.status_code == 200:
254
+ with open(os.path.join(images_dir, f'image_{i}.png'), 'wb') as file:
255
+ file.write(response.content)
256
+ else:
257
+ raise Exception(f"Image generation failed with status code {response.status_code} and message: {response.text}")
258
+
259
+ return images_dir
260
+
261
+
262
+ def speech_generator(script):
263
+ """
264
+ Generates speech files for the given script using gTTS.
265
+ Saves them to a temporary directory and returns the path.
266
+ """
267
+ speeches_dir = tempfile.mkdtemp()
268
+
269
+ dict = process_script(script)
270
+ for i, text in enumerate(dict['text_for_speech_generation']):
271
+ speech_path = generate_speech(text, num=i)
272
+ os.rename(speech_path, os.path.join(speeches_dir, f'speech_{i}.mp3'))
273
+
274
+ return speeches_dir
275
+