import argparse import queue import time import threading import logging import os from PIL import Image from daily import EventHandler, CallClient, Daily from datetime import datetime from dotenv import load_dotenv from auth import get_meeting_token, get_room_name from pipeline import Pipeline from device import device, torch_dtype load_dotenv() class DailyVision(EventHandler): def __init__( self, room_url, room_name, expiration, idle, bot_name="Daily Bot" ): self.__client = CallClient(event_handler=self) self.__pipeline = Pipeline self.__camera = None self.__time = time.time() #self.__queue = queue.Queue() self.__app_quit = False self.__image_buffer = None self.__bot_name = bot_name self.__room_url = room_url self.__room_name = room_name self.__expiration = expiration self.__params = Pipeline.InputParams() self.__idle = idle # Create the pipeline (this might take a moment) self.__pipeline = Pipeline(device, torch_dtype) #print(self.__pipeline.InputParams.schema()) # Configure logger FORMAT = f"%(asctime)s {self.__room_url} %(message)s" logging.basicConfig(format=FORMAT) self.logger = logging.getLogger("bot-instance") self.logger.setLevel(logging.DEBUG) self.logger.info(f"Expiration timer set to: {self.__expiration}") # Setup camera self.setup_camera() def run(self, meeting_url, token): # Join self.logger.info(f"Connecting to room {meeting_url} as {self.__bot_name}") self.__client.set_user_name(self.__bot_name) self.__client.join(meeting_url, token, completion=self.on_joined) #self.__participant_id = self.client.participants()["local"]["id"] # Start thread self.__thread = threading.Thread(target = self.process_frames) self.__thread.start() # Keep-alive on thread self.__thread.join() def leave(self): self.logger.info(f"Leaving...") self.__app_quit = True self.__thread.join() self.__client.leave() def on_joined(self, join_data, client_error): self.logger.info(f"call_joined: {join_data}, {client_error}") def on_participant_joined(self, participant): self.logger.info(f"Participant {participant['id']} joined, analyzing frames...") self.__client.set_video_renderer(participant["id"], self.on_video_frame, color_format="RGB") # Say hello self.wave() def setup_camera(self): if not self.__camera: self.__camera = Daily.create_camera_device("camera", width = 640, height = 480, color_format="RGB") self.__client.update_inputs({ "camera": { "isEnabled": True, "settings": { "deviceId": "camera" } } }) def process_frames(self): params = self.__params while not self.__app_quit: # Is anyone watching? if not self.__idle and len(self.__client.participants()) < 2: self.logger.info(f"No partcipants in channel. Exiting...") self.__app_quit = True break # Check expiry timer if time.time() > self.__expiration: self.logger.info(f"Expiration timer exceeded. Exiting...") self.__app_quit = True break try: #video_frame = self.__queue.get(timeout=5) video_frame = self.__image_buffer if not video_frame == None: image = Image.frombytes("RGB", (video_frame.width, video_frame.height), video_frame.buffer) result_image = self.__pipeline.predict(params, image) self.__camera.write_frame(result_image.tobytes()) except queue.Empty: pass def on_video_frame(self, participant_id, video_frame): # Process ~15 frames per second (considering incoming frames at 30fps). if time.time() - self.__time > float(os.getenv("FPS_CAP", 0.0333)): self.__time = time.time() self.__image_buffer = video_frame #self.__queue.put(video_frame) def on_app_message(self, message, sender): # Update pipeline settings based on message data print(message) self.__params = self.__pipeline.InputParams(**message) print(self.__params) #print(self.__pipeline.Info()) return def wave(self): self.__client.send_app_message( { "prompt": self.__params.prompt, } ) def main(): parser = argparse.ArgumentParser(description="Daily Bot") # Required args parser.add_argument("-u", "--url", required=True, type=str, help="URL of the Daily room") parser.add_argument("-k", "--api_key", required=True, type=str, help="Daily API key") # Optional args parser.add_argument("-t", "--private", type=bool, help="Is this room private?", default=True) parser.add_argument("-n", "--bot-name", type=str, help="Name of the bot", default="Daily Bot") parser.add_argument("-e", "--expiration", type=int, help="Duration of bot", default=os.getenv("BOT_MAX_DURATION", 300)) parser.add_argument("-i", "--idle", type=bool, help="Wait for participants to join", default=os.getenv("BOT_WILL_IDLE", False)) args = parser.parse_args() Daily.init() expiration = time.time() + args.expiration room_name = get_room_name(args.url) # Retrieve a meeting token, if not provided #@TODO do room lookup to check privacy if args.private: token = get_meeting_token(room_name, args.api_key, expiration) app = DailyVision(args.url, room_name, expiration, args.idle, args.bot_name) try : app.run(args.url, token) except KeyboardInterrupt: print("Ctrl-C detected. Exiting!") finally: app.leave() # Let leave finish time.sleep(2) if __name__ == '__main__': main()