#github.com/etherealxx import os import time import gradio as gr import urllib.request, subprocess, contextlib #these handle mega.nz import http.client import requests #this handle civit import re from tqdm import tqdm #from IPython.display import display, clear_output import pathlib import inspect import platform import shlex import signal import sys sdless = False try: from modules import script_callbacks #,scripts from modules.paths import models_path, script_path #, data_path from modules.shared import cmd_opts #check for gradio queue except ImportError: #sdless if platform.system() == "Windows": userprofile = os.environ['USERPROFILE'] downloadpath = os.path.join(userprofile, "Downloads") script_path = os.path.join(downloadpath, "stable-diffusion-webui") elif platform.system() == "Darwin": userhome = os.environ['HOME'] downloadpath = os.path.join(userhome, "Downloads") script_path = os.path.join(downloadpath, "stable-diffusion-webui") else: script_path = '/content/stable-diffusion-webui' models_path = os.path.join(script_path, 'models') gradio_queue = True ckpt_dir = None import sys import types module = types.ModuleType('cmd_opts') module.gradio_queue = gradio_queue module.ckpt_dir = ckpt_dir sys.modules['cmd_opts'] = module import cmd_opts sdless = True script_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) extension_dir = os.path.abspath(os.path.join(script_dir, "../")) #Version checking{ version_dir = os.path.join(extension_dir, "version.txt") with open(version_dir, 'r', encoding='utf-8') as file: curverall = file.readlines() currentversion = curverall[0].strip() try: versionurl = "https://raw.githubusercontent.com/etherealxx/batchlinks-webui/main/version.txt" versionresp = requests.get(versionurl) version_lines = versionresp.text.splitlines() latestversion = version_lines[0].strip() except requests.exceptions.RequestException: latestversion = '??' if latestversion != '??': if currentversion == latestversion: latestversiontext = "" else: latestversiontext = f"[Latest version: {latestversion}]" else: latestversiontext = "" currentverforlink = latestversion.replace('.', '') #} vladmandic = False try: global gradiostate if cmd_opts.gradio_queue: #automatic1111 gradiostate = True else: gradiostate = False except AttributeError: try: if not cmd_opts.disable_queue: #vladmandic gradiostate = True else: gradiostate = False vladmandic = True except AttributeError: gradiostate = False #at this point just use onedotsix pass typechecker = [ "embedding", "embeddings", "embed", "embeds", "textualinversion", "ti", "model", "models", "checkpoint", "checkpoints", "vae", "vaes", "lora", "loras", "hypernetwork", "hypernetworks", "hypernet", "hypernets", "hynet", "hynets", "addnetlora", "loraaddnet", "additionalnetworks", "addnet", "aestheticembedding", "aestheticembed", "controlnet", "cnet", "extension", "extensions", "ext", #obsolete "upscaler", "upscale", "altmodel", "altmodels", "lycoris", "locon", "loha" ] typemain = [ "model", "altmodel", "vae", "embed", "hynet", "lora", "addnetlora", "aestheticembed", "cnet", "ext", "upscaler", "lycoris" ] countofdefaulthashtags = len(typemain) supportedlinks = [ "https://mega.nz", "https://huggingface.co", "https://civitai.com/api/download/models", "https://civitai.com/models/", "https://cdn.discordapp.com/attachments", "https://github.com", "https://raw.githubusercontent.com", "https://files.catbox.moe", "https://drive.google.com", "https://pixeldrain.com", "https://www.mediafire.com/file", "https://anonfiles.com", "https://www.dropbox.com/s" ] modelpath = os.path.join(script_path, "models/Stable-diffusion") embedpath = os.path.join(script_path, "embeddings") vaepath = os.path.join(script_path, "models/VAE") lorapath = os.path.join(script_path, "models/Lora") addnetlorapath = os.path.join(script_path, "extensions/sd-webui-additional-networks/models/lora") hynetpath = os.path.join(script_path, "models/hypernetworks") aestheticembedpath = os.path.join(script_path, "extensions/stable-diffusion-webui-aesthetic-gradients/aesthetic_embeddings") cnetpath = os.path.join(script_path, "extensions/sd-webui-controlnet/models") extpath = os.path.join(script_path, "extensions") #obsolete upscalerpath = os.path.join(script_path, "models/ESRGAN") lycorispath = os.path.join(addnetlorapath, "lycoris") if vladmandic: cnetpath = os.path.join(script_path, "models/ControlNet") lycorispath = os.path.join(script_path, "models/LyCORIS") if cmd_opts.ckpt_dir: altmodelpath = cmd_opts.ckpt_dir currentfolder = altmodelpath else: altmodelpath = modelpath currentfolder = modelpath if platform.system() == "Windows": for x in typemain: # exec(f"{x}path = os.path.normpath({x}path)") exec(f"{x}path = {x}path.replace('/', '\\\\')") #exec(f"print({x}path)") if os.path.exists(addnetlorapath) and not os.path.exists(lycorispath): os.makedirs(lycorispath, exist_ok=True) newlines = ['\n', '\r\n', '\r'] currentlink = '' finalwrite = [] currentcondition = '' currentsuboutput = '' processid = '' logging = False #currentiterfolder = '' prockilled = False currentfoldertrack = [] everyprocessid = [] remaininglinks = [] gdownupgraded = False mediafireinstalled = False prevciviterror = False # addedcustompath = [] # ariamode = False globaldebug = False #set this to true to activate every debug features if len(sys.argv) > 1: if sys.argv[1] == '--debug': globaldebug = True process = '' def stopwatch(func): #unused """ A function that acts as a stopwatch for another function. Args: func (function): The function to be timed. Returns: The return value of the timed function. """ def wrapper(*args, **kwargs): if not globaldebug: return func(*args, **kwargs) start_time = time.time() result = func(*args, **kwargs) end_time = time.time() elapsed_time = end_time - start_time minutes = int(elapsed_time // 60) seconds = elapsed_time % 60 print(f"Time taken by {func.__name__}: {minutes:.0f}m {seconds:.2f}s") return result return wrapper #debuggingpurpose{ #Hello debuggers! This will track every files when the extension is launched, and #you can remove every downloaded files after with hashtag '@debugresetdownloads', for debugging purposes on colab #(Note: You need to fill the textbox with only a single line of @debugresetdownloads and nothing more) #There's also another feature, '#debugeverymethod', which will download a single link with all four possible methods available. import shutil snapshot = [] paths_to_scan = [] # take a snapshot of the directories def take_snapshot(): snapshotdir = os.path.join(script_path, 'snapshot.txt') global snapshot global paths_to_scan paths_to_scan = [] for x in typemain: exec(f"paths_to_scan.append({x}path)") if os.path.exists(snapshotdir): with open(snapshotdir, 'r') as f: snapshot = [line.strip() for line in f.readlines()] print("Batchlinks extension: snapshot already exist.") return else: snapshot = [] for path in paths_to_scan: if os.path.exists(path): pathtemp = os.listdir(path) for file in pathtemp: pathoffile = os.path.join(path, file) snapshot.append(pathoffile) with open(snapshotdir, 'w') as f: for item in snapshot: f.write(f"{item}\n") print("Batchlinks extension: snapshot taken.") # rewind everything to a fresh state def global_rewind(): global paths_to_scan global path global currentsuboutput removedall, removed_files, removed_dirs, new_snapshot = [], [], [], [] print('Debug rewind initiated...') print('') for path in paths_to_scan: if os.path.exists(path): pathtemp = os.listdir(path) for file in pathtemp: pathoffile = os.path.join(path, file) new_snapshot.append(pathoffile) toremoves = [x for x in new_snapshot if x not in snapshot] for fileordir in toremoves: if os.path.exists(fileordir): if os.path.isdir(fileordir): try: shutil.rmtree(fileordir) removed_dirs.append(fileordir) except PermissionError as e: print("Error: " + str(e)) else: os.remove(fileordir) removed_files.append(fileordir) if removed_files or removed_dirs: print("Removed files:") removedall.append("Removed files:") for file in removed_files: print(file) removedall.append(file) print("Removed directories:") removedall.append("Removed directories:") for dir in removed_dirs: print(dir) removedall.append(dir) print('rewind completed') print('') return removedall # Take a snapshot of the directories if globaldebug == True: take_snapshot() # } def printdebug(toprint): if globaldebug == True: print(toprint) def printvardebug(toprint): if globaldebug == True: import inspect caller = inspect.currentframe().f_back try: name = [k for k, v in caller.f_locals.items() if v is toprint][0] except IndexError: name = "idk" if not isinstance(toprint, str): toprint = str(toprint) print(f"debug {name}: {toprint}") def runwithsubprocess(rawcommand, folder=None, justrun=False, additionalcontext=''): #@note runwithsubprocess commandtorun = shlex.split(rawcommand) #construct_command(rawcommand) printdebug(f"raw command: {rawcommand}") printdebug(f"merged command: {commandtorun}") global prockilled global everyprocessid # if gradiostate == False and not rawcommand.startswith("aria"): # subprocess.run(commandtorun, stderr=subprocess.STDOUT, universal_newlines=True) # else: if folder != None: printdebug("debug folderforsavestate: " + str(folder)) savestate_folder(folder) printdebug("debug currentfoldertrack: " + str(currentfoldertrack)) global process if justrun: process = subprocess.Popen(rawcommand, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, shell=True) else: process = subprocess.Popen(commandtorun, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) global processid processid = process.pid everyprocessid.append(processid) printdebug("debug processid: " + str(processid)) ariacomplete = False global currentsuboutput while True: # Read the output from the process nextline = process.stdout.readline() if nextline == '' and process.poll() is not None: break # Check if the line contains progress information elif additionalcontext == 'aria2': if 'Download Results' in nextline: ariacomplete = True print('\n') print(nextline, end='') currentsuboutput = nextline else: if not ariacomplete: match = re.search(r'\[[^\]]+\]', nextline) if match: stripnext = match.group().strip() print("\r", end="") print(f"\r{stripnext}", end='') currentsuboutput = stripnext else: print(nextline, end='') currentsuboutput = nextline elif additionalcontext == '7z': sevenzmessage = [ "Extracting", "Everything", "ERROR" ] if nextline.strip().startswith(tuple(sevenzmessage)): print(nextline, end=' ') currentsuboutput = nextline else: stripnext = nextline.strip() print("\r", end="") print(f"\r{stripnext}", end='') currentsuboutput = stripnext else: if justrun: print(nextline, end='') else: if "%" in nextline.strip() or rawcommand.startswith("curl"): stripnext = nextline.strip() print("\r", end="") print(f"\r{stripnext}", end='') else: print(nextline, end='') if additionalcontext == 'mega': process.kill() if additionalcontext == 'mega': if nextline.strip().startswith("Download finished:"): _ = subprocess.getoutput("taskkill /f /t /im MEGAcmdServer.exe") printdebug("MEGA server killed") currentsuboutput = nextline process.wait() currentsuboutput = '' processid = '' if prockilled == True: printdebug("before rewind folder") printvardebug(folder) rewind_folder(folder) print('Operation Cancelled') print('') global currentcondition currentcondition = 'Operation Cancelled' return #these code below handle mega.nz def unbuffered(proc, stream='stdout'): stream = getattr(proc, stream) with contextlib.closing(stream): while prockilled == False: out = [] last = stream.read(1) # Don't loop forever if last == '' and proc.poll() is not None: break while last not in newlines: # Don't loop forever if last == '' and proc.poll() is not None: break out.append(last) last = stream.read(1) out = ''.join(out) yield out def transfare(todownload, folder, torename=''): #import codecs #decoder = codecs.getincrementaldecoder("UTF-8")() todownload_s = shlex.quote(todownload) folder_s = shlex.quote(folder) if platform.system() == "Windows": savestate_folder(folder) localappdata = os.environ['LOCALAPPDATA'] megagetloc = os.path.join(localappdata, "MEGAcmd", "mega-get.bat") global process process = runwithsubprocess(f"{shlex.quote(megagetloc)} {todownload_s} {folder_s}", folder, False, 'mega') else: savestate_folder(folder_s) cmd = ["mega-get", todownload_s, folder_s] proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, ) global processid global everyprocessid processid = proc.pid everyprocessid.append(processid) global currentsuboutput for line in unbuffered(proc): if prockilled == False: if not line.startswith("Download"): currentsuboutput = line print(f"\r{line}", end="") else: print(f"\n{line}") _ = subprocess.getoutput("pkill -f \"mega-cmd-server\"") printdebug("MEGA server killed") else: currentsuboutput = '' print('Operation Cancelled') print('') global currentcondition currentcondition = 'Operation Cancelled' return currentsuboutput = '' if torename: listfilenew = os.listdir(folder) newerfoldertrack = [] for file in listfilenew: pathoffile = os.path.join(folder, file) newerfoldertrack.append(pathoffile) checkrename = [x for x in newerfoldertrack if x not in currentfoldertrack] if checkrename: # renamedfile = os.path.basename(checkrename[0]) # pathtorename = os.path.join(folder, renamedfile) os.rename(checkrename[0], os.path.join(folder, torename)) def installmega(): HOME = os.path.expanduser("~") ocr_file = pathlib.Path(f"{HOME}/.ipython/ocr.py") if not ocr_file.exists(): hCode = "https://raw.githubusercontent.com/biplobsd/" \ "OneClickRun/master/res/ocr.py" urllib.request.urlretrieve(hCode, str(ocr_file)) from importlib.util import module_from_spec, spec_from_file_location ocr_spec = spec_from_file_location("ocr", str(ocr_file)) ocr = module_from_spec(ocr_spec) ocr_spec.loader.exec_module(ocr) if not os.path.exists("/usr/bin/mega-cmd"): #ocr.loadingAn() print('Installing MEGA ...') print('') ocr.runSh('sudo apt-get -y update') ocr.runSh('sudo apt-get -y install libmms0 libc-ares2 libc6 libcrypto++6 libgcc1 libmediainfo0v5 libpcre3 libpcrecpp0v5 libssl1.1 libstdc++6 libzen0v5 zlib1g apt-transport-https') ocr.runSh('sudo curl -sL -o /var/cache/apt/archives/MEGAcmd.deb https://mega.nz/linux/MEGAsync/Debian_9.0/amd64/megacmd-Debian_9.0_amd64.deb', output=True) ocr.runSh('sudo dpkg -i /var/cache/apt/archives/MEGAcmd.deb', output=True) print('MEGA is installed.') print('') def installmegawin(): userprofile = os.environ['USERPROFILE'] localappdata = os.environ['LOCALAPPDATA'] megagetloc = os.path.join(localappdata, "MEGAcmd", "mega-get.bat") megacmdloc = os.path.join(userprofile, "Downloads", "MEGAcmdSetup64.exe") if not os.path.exists(megagetloc): print('Installing MEGA ...') print('') runwithsubprocess(f"curl -o {shlex.quote(megacmdloc)} https://mega.nz/MEGAcmdSetup64.exe") time.sleep(1) runwithsubprocess(f"{shlex.quote(megacmdloc)} /S") time.sleep(4) print('MEGA is installed.') print('') #clear_output() #these code above handle mega.nz def civitdown(url, folder, torename=''): filename = url.split('?')[0].rsplit('/', 1)[-1] + ".bdgh" pathtodown = os.path.join(folder, filename) max_retries = 5 retry_delay = 10 # url_s = quote(url) while prockilled == False: downloaded_size = 0 headers = {} progress = tqdm(total=1000000000, unit="B", unit_scale=True, desc=f"Downloading {filename}. (will be renamed correctly after downloading)", initial=downloaded_size, leave=False) global currentsuboutput global currentcondition with open(pathtodown, "ab") as f: while prockilled == False: try: response = requests.get(url, headers=headers, stream=True) total_size = int(response.headers.get("Content-Length", 0)) # if total_size == 0: # total_size = downloaded_size # progress.total = total_size for chunk in response.iter_content(chunk_size=1024): if chunk and prockilled == False: f.write(chunk) progress.update(len(chunk)) currentsuboutput = str(progress) else: break downloaded_size = os.path.getsize(pathtodown) currentsuboutput = '' break except ConnectionError as e: max_retries -= 1 if max_retries == 0: raise e time.sleep(retry_delay) progress.close() if prockilled == True: if os.path.exists(pathtodown): os.remove(pathtodown) print('Operation Cancelled') print('') currentcondition = 'Operation Cancelled' currentsuboutput = '' return "Operation Cancelled" if torename: actualfilename = torename else: try: actualfilename = response.headers['Content-Disposition'].split("filename=")[1].strip('"') except KeyError: print('Link Error') print('') break #%cd {folder} actualpath = os.path.join(folder, actualfilename) os.rename(pathtodown, actualpath) downloaded_size = os.path.getsize(actualpath) # Check if the download was successful if downloaded_size >= total_size: print(f"{actualfilename} successfully downloaded.") break else: print(f"Error: File download failed. Retrying...") def getcivitname(link, frommodeltypechooser=False): #@note getcivitname # searcher = "findstr" if platform.system() == "Windows" else "grep" printdebug("getcivitname:") printvardebug(link) global currentcondition tempcondition = currentcondition currentcondition = "Connecting to CivitAI..." try: req = urllib.request.Request(link, headers={'User-Agent': 'Mozilla/5.0'}) response = urllib.request.urlopen(req, timeout=7) contentdis = response.geturl() # contentdis = [line for line in subprocess.getoutput(f"curl -sI {link} | {searcher} -i content-disposition").splitlines() if line.startswith('location')][0] except Exception as e: errortype = str(type(e)) printdebug("Error: " + errortype) if 'TimeoutError' in errortype: print("Unable to connect to CivitAI in 7 seconds. Skipping the link...") currentcondition = tempcondition return 'batchlinksskip' else: if frommodeltypechooser: currentcondition = tempcondition return '' else: print("Cannot get the filename. Download will continue with the old slow method.\nReason:" + str(e)) currentcondition = tempcondition return 'batchlinksold' if contentdis == "https://civitai.com/": print('CivitAI website is currently down ツ') print('') currentcondition = tempcondition return 'batchlinksskip' cuttedcontent = contentdis.find('response-content-disposition=attachment%3B%20filename%3D%22') + 59 # filename = str(contentdis[cuttedcontent:]).replace('%22&x-id=GetObject', '') #obsolete since 30-03-2023 filename = str(contentdis[cuttedcontent:]).partition('%22&')[0] # filename = contentdis.split('filename=')[-1].split('%22')[1] #this also works i guess filename = civitmodeltypename(filename, link) currentcondition = tempcondition return filename def getcivitname2(link): searcher = "findstr" if platform.system() == "Windows" else "grep" try: contentdis = [line for line in subprocess.getoutput(f"curl -sI {link} | {searcher} -i content-disposition").splitlines() if line.startswith('location')][0] cuttedcontent = contentdis.find('response-content-disposition=attachment%3B%20filename%3D%22') + 59 filename = str(contentdis[cuttedcontent:]).replace('%22&x-id=GetObject', '') if filename: return [0, link, filename] except: return [] def civitmodeltypename(name, filelink): nameonly, extension = os.path.splitext(name) if 'type=Pruned' in filelink: nameonly += "_pruned" if 'format=Safetensor' in filelink: extension = '.safetensors' elif 'format=PickleTensor' in filelink: extension = '.ckpt' nameoffile = nameonly + extension return nameoffile def checkcivitconfig(link): #check if the current civit link has a config file (v2.0+) @note checkcivitconfig def getrequest(link2): response = requests.get(link2) return response.status_code params = ['?type=Config','?type=Config&format=Other'] for param in params: response = getrequest(link + param) if response == 200: #print('The link exists') return link + param return '' def civitmodeltypechooser(modeljson, prunedmodel, torchortensor, linkandnames): # prunedorfull = ['?type=Model', '?type=Pruned%20Model'] # if prunedmodel == 'True': prunedorfull = ['&fp=fp32', '&fp=fp16'] if prunedmodel == 'fp16': prunedorfull.reverse() pickleorsafe = ['&format=SafeTensor', '&format=PickleTensor'] if torchortensor=='ckpt': pickleorsafe.reverse() defaultlinkurl = [link.get('downloadUrl') for link in modeljson['modelVersions'][0]['files'] if not '?type=' in link.get('downloadUrl')][0] indexlinkname = list() checklater = '' for index, (link, name) in enumerate(linkandnames.items()): if prunedorfull[0] in link: if pickleorsafe[0] in link: indexlinkname.extend([index, link, name]) break else: checklater = link continue else: continue if checklater and not indexlinkname: for index, (link, name) in enumerate(linkandnames.items()): if checklater == link: indexlinkname.extend([index, link, name]) checklater = '' break if not indexlinkname: indexlinkname = getcivitname(defaultlinkurl+prunedorfull[0]+pickleorsafe[0], True) #@note indexlinkname if indexlinkname == "batchlinksskip" or not indexlinkname: indexlinkname = list() if not indexlinkname: for index, (link, name) in enumerate(linkandnames.items()): if defaultlinkurl == link: indexlinkname.extend([index, link, name]) break return indexlinkname #thank you @rti7743 for this part { def civitdown2_get_json(url): import re m = re.search(r'https://civitai.com/models/(\d+)', url) model_id = m.group(1) api_url = "https://civitai.com/api/v1/models/" + model_id import requests txt = requests.get(api_url).text import json try: return json.loads(txt) except json.decoder.JSONDecodeError: return 'error' def civitdown2_get_save_directory(model_type, default_folder): printdebug("model_type: " + model_type) customtypemains = list(typemain[countofdefaulthashtags:]) printdebug("customtypemains: " + str(customtypemains)) for type in customtypemains: xpath = eval(type + "path") if default_folder == xpath: printdebug("customtype returned") return default_folder if model_type == "Checkpoint": if cmd_opts.ckpt_dir and default_folder == altmodelpath: return altmodelpath else: return modelpath elif model_type == "Hypernetwork": return hynetpath elif model_type == "TextualInversion": return embedpath elif model_type == "AestheticGradient": return aestheticembedpath elif model_type == "VAE": return vaepath elif model_type == "LORA": if default_folder == addnetlorapath: return addnetlorapath else: return lorapath elif model_type == "LoCon": return lycorispath else: printdebug("defaultfolder returned") return default_folder def civitdown2_convertimage(imagejpg_save_path, imagepng_save_path): from PIL import Image try: img = Image.open(imagejpg_save_path) img_resized = img.resize((img.width // 2, img.height // 2)) img_resized.save(imagepng_save_path) if os.path.exists(imagejpg_save_path): os.remove(imagejpg_save_path) except Exception as e: print("Civitai image convertion failed. Reason: " + str(e)) pass def civitdown2(url, folder, downloader, renamedfilename, isdebugevery, modeldefaulttype, isprunedmodel, isdownvae): #@note civitdown2 def civitlinkandnamer(model): linkandname = dict() for i, link in enumerate(model['modelVersions'][0]['files']): name = link.get('name') url = link.get('downloadUrl') if name and url: linkandname[url] = name # print(i, url, name) else: pass return linkandname model = civitdown2_get_json(url) if model == 'error': print('Failed retrieving the model data.') print('CivitAI website might going down currently.') print('') return if model == {'error': 'Model not found'}: print('Model ' + url + ' is not available anymore') print('') return save_directory = civitdown2_get_save_directory(model['type'], folder) if not os.path.exists(save_directory): os.makedirs(save_directory, exist_ok=True) try: parameter = url.split("?")[-1] + "?" except: parameter = '' civitlinkandnames = civitlinkandnamer(model) if model['type'] == "Checkpoint": data_index, data_url, data_filename = civitmodeltypechooser(model, isprunedmodel, modeldefaulttype, civitlinkandnames) #@note data_filename = civitmodeltypename(data_filename, data_url) image_filename = model['modelVersions'][0]['files'][0]['name'] else: data_url = model['modelVersions'][0]['files'][0]['downloadUrl'] data_filename = model['modelVersions'][0]['files'][0]['name'] if "?type=Training%20Data" in data_url: data_url = data_url.partition("?")[0] wannabename = getcivitname(data_url) if wannabename: if wannabename != 'batchlinksskip': data_filename = wannabename else: return '' image_filename = data_filename if renamedfilename: data_filename = renamedfilename image_filename = data_filename image_url = model['modelVersions'][0]['images'][0]['url'] printvardebug(data_url) printvardebug(data_filename) printvardebug(image_url) if model['type'] == "TextualInversion": image_filename_jpg = pathlib.PurePath(image_filename).stem + ".preview.jpg" image_filename_png = pathlib.PurePath(image_filename).stem + ".preview.png" else: image_filename_jpg = pathlib.PurePath(image_filename).stem + ".jpg" image_filename_png = pathlib.PurePath(image_filename).stem + ".png" data_save_path = os.path.join(save_directory, data_filename) imagejpg_save_path = os.path.join(save_directory, image_filename_jpg) imagepng_save_path = os.path.join(save_directory, image_filename_png) currentmode = 'civit' if isdebugevery: currentmode = 'civitdebugevery' printdebug(f"debug download_url({data_url}, {data_save_path}, {downloader})") if prockilled == False: hfdown(data_url, data_save_path, downloader, currentmode) printdebug("normal download done, now check for the config") config_url = checkcivitconfig(data_url) vae_url, vae_name = '','' for link, name in civitlinkandnames.items(): if '?type=VAE' in link: vae_url, vae_name = link, name break if model['type'] == "Checkpoint": if prockilled == False and isdownvae and vae_url: namefile = os.path.splitext(data_filename)[0] vae_name = namefile + '.vae.pt' data_save_path = os.path.join(os.path.dirname(data_save_path), vae_name) hfdown(vae_url, data_save_path, downloader, currentmode) if prockilled == False and config_url: namefile = os.path.splitext(data_filename)[0] config_name = namefile + '.yaml' data_save_path = os.path.join(os.path.dirname(data_save_path), config_name) hfdown(config_url, data_save_path, downloader, currentmode) if prockilled == False: hfdown(image_url, imagejpg_save_path, downloader, currentmode) if os.path.exists(imagejpg_save_path): civitdown2_convertimage(imagejpg_save_path, imagepng_save_path) else: print(imagejpg_save_path + " doesn't exist") if prockilled == False: print(f"{data_save_path} successfully downloaded.") if isdebugevery: additionalname = '-' + downloader if model['type'] == "TextualInversion": image_filename_jpg = pathlib.PurePath(data_filename).stem + additionalname + ".preview.jpg" image_filename_png = pathlib.PurePath(data_filename).stem + additionalname + ".preview.png" else: image_filename_jpg = pathlib.PurePath(data_filename).stem + additionalname + ".jpg" image_filename_png = pathlib.PurePath(data_filename).stem + additionalname + ".png" imagejpg_save_path = os.path.join(save_directory, image_filename_jpg) imagepng_save_path = os.path.join(save_directory, image_filename_png) #} def mediadrivedown(todownload, folder, mode, torename=''): #@note mediadrivedown todownload_s = shlex.quote(todownload) if platform.system() == "Windows": folder_s = folder else: folder_s = shlex.quote(folder) prevcurdir = os.getcwd() os.chdir(folder) savestate_folder(folder_s) if mode=='gdrive': runwithsubprocess(f"gdown --fuzzy {todownload_s}", folder_s) elif mode=='mediafire': runwithsubprocess(f"mediafire-dl {todownload_s}", folder_s) os.chdir(prevcurdir) if torename: listfilenew = os.listdir(folder) newerfoldertrack = [] for file in listfilenew: pathoffile = os.path.join(folder, file) newerfoldertrack.append(pathoffile) checkrename = [x for x in newerfoldertrack if x not in currentfoldertrack] if checkrename: os.rename(checkrename[0], os.path.join(folder, torename)) def hfdown(todownload, folder, downloader, mode='default', torename=''): #@note hfdown global currentcondition global prevciviterror prevciviterror = False # global ariamode if mode=='civit' or mode=='civitdebugevery': filename = pathlib.Path(folder).name filename_s = shlex.quote(filename) filepath = folder filepath_s = shlex.quote(folder) todownload_s = todownload folder = os.path.dirname(filepath) # folder_s = pathlib.Path(folder).parent.resolve() folder_s = shlex.quote(folder) folder_win = folder else: if mode=='pixeldrain' or mode=='dropbox': filename = torename else: filename = todownload.rsplit('/', 1)[-1] filename_s = shlex.quote(filename) filepath = os.path.join(folder, filename) filepath_s = shlex.quote(filepath) todownload_s = shlex.quote(todownload) folder_s = shlex.quote(folder) folder_win = folder ariafilename_s = filename_s if torename: ariafilename_s = shlex.quote(torename) printvardebug(folder) printvardebug(todownload) printvardebug(filename) printvardebug(filename_s) printvardebug(filepath) printvardebug(filepath_s) printvardebug(todownload_s) printvardebug(folder_s) printvardebug(folder_win) #savestate_folder(folder_s) if platform.system() == "Windows": #@note windows downloader localappdata = os.environ['LOCALAPPDATA'] batchlinksinstallpath = os.path.join(localappdata, "batchlinks") printvardebug(batchlinksinstallpath) wgetpath = os.path.join(batchlinksinstallpath, "wget-gnutls-x64.exe") ariazippath = os.path.join(batchlinksinstallpath, "aria2-1.36.0-win-64bit-build2.7z") ariapath = os.path.join(batchlinksinstallpath, "aria2-1.36.0-win-64bit-build2", "aria2c.exe") os.makedirs(batchlinksinstallpath, exist_ok=True) if os.path.exists(batchlinksinstallpath): printdebug("batchlinksinstallpath created") if downloader=='gdown': global gdownupgraded if gdownupgraded == False: tempcondition = currentcondition currentcondition = "Upgrading gdown..." print('Upgrading gdown ...') print('') runwithsubprocess(f"pip3 install -q --upgrade --no-cache-dir gdown") print('gdown upgraded!') print('') currentcondition = tempcondition gdownupgraded = True try: runwithsubprocess(f"gdown {todownload_s} -O {filepath_s}", folder_win) except FileNotFoundError: import gdown gdown.download(todownload, filepath, quiet=False) elif downloader=='wget': #os.system("python -m wget -o " + os.path.join(folder, filename) + " " + todownload) # import wget # wget.download(todownload, filepath) # checkwget = subprocess.getoutput(wgetpath) # if "is not recognized" in checkwget: if not os.path.exists(wgetpath): tempcondition = currentcondition currentcondition = "Installing wget Windows..." print('Installing wget Windows...') print('') wgetwinlink = "https://github.com/webfolderio/wget-windows/releases/download/v1.21.3.june.19.2022/wget-gnutls-x64.exe" print(wgetwinlink) runwithsubprocess("curl -Lo " + shlex.quote(wgetpath) + " " + wgetwinlink, batchlinksinstallpath) print('wget Windows installed!') print('') currentcondition = tempcondition runwithsubprocess(f"{shlex.quote(wgetpath)} -O {filepath_s} {todownload_s} --progress=bar:force", folder_win) elif downloader=='curl': runwithsubprocess(f"curl -Lo {filepath_s} {todownload_s}", folder_win) elif downloader=='aria2': # checkaria = subprocess.getoutput(ariapath) # if "is not recognized" in checkaria or "cannot find the path" in checkaria: if not os.path.exists(ariapath): tempcondition = currentcondition currentcondition = "Installing aria2 Windows..." print('Installing aria2 Windows...') print('') ariaziplink = "https://github.com/q3aql/aria2-static-builds/releases/download/v1.36.0/aria2-1.36.0-win-64bit-build2.7z" print(ariaziplink) runwithsubprocess("curl -Lo " + shlex.quote(ariazippath) + " " + ariaziplink, batchlinksinstallpath) sevenzpath = install7zWin() runwithsubprocess(f"{shlex.quote(sevenzpath)} x {shlex.quote(ariazippath)} -o{shlex.quote(batchlinksinstallpath)}", batchlinksinstallpath, False, '7z') print() print('aria2 Windows installed!') print('') currentcondition = tempcondition # ariamode = True runwithsubprocess(f"{shlex.quote(ariapath)} --summary-interval=1 --console-log-level=error --check-certificate=false -c -x 16 -s 16 -k 1M {todownload_s} -d {folder_s} -o {ariafilename_s}", folder_win, False, 'aria2') else: #non-windows if downloader=='gdown': printdebug(f"debug gdown {todownload_s} -O {filepath_s}") runwithsubprocess(f"gdown {todownload_s} -O {filepath_s}", folder_s) elif downloader=='wget': runwithsubprocess(f"wget -O {filepath_s} {todownload_s} --progress=bar:force", folder_s) elif downloader=='curl': runwithsubprocess(f"curl -Lo {filepath_s} {todownload_s}", folder_s) # curdir = os.getcwd() # os.rename(os.path.join(curdir, filename), filepath) elif downloader=='aria2': ariachecker = "dpkg-query -W -f='${Status}' aria2" checkaria = subprocess.getoutput(ariachecker) if "no packages found matching aria2" in checkaria: tempcondition = currentcondition currentcondition = "Installing aria2..." print('Installing aria2 ...') print('') runwithsubprocess(f"apt -y update -qq") runwithsubprocess(f"apt -y install -qq aria2") print('aria2 installed!') print('') currentcondition = tempcondition # ariamode = True runwithsubprocess(f"aria2c --summary-interval=1 --console-log-level=error -c -x 16 -s 16 -k 1M {todownload_s} -d {folder_s} -o {ariafilename_s}", folder_s, False, 'aria2') printdebug("\nmode: " + mode) if mode=='debugevery': time.sleep(2) try: os.rename(filepath, os.path.join(folder, f"{os.path.splitext(filename)[0]}-{downloader}{os.path.splitext(filename)[1]}")) printdebug("renamed to " + f"{os.path.splitext(filename)[0]}-{downloader}{os.path.splitext(filename)[1]}") except FileNotFoundError or FileExistsError: printdebug("rename failed somehow") pass elif mode=='civitdebugevery': time.sleep(2) printdebug("debug filename: " + str(filename)) printdebug("debug filename_s: " + str(filename_s)) printdebug("debug filepath: " + str(filepath_s)) printdebug("debug todownload_s: " + str(todownload_s)) printdebug("debug folder_s: " + str(folder_s)) try: os.rename(folder, os.path.join(folder_s, f"{os.path.splitext(filename)[0]}-{downloader}{os.path.splitext(filename)[1]}")) printdebug("renamed to " + f"{os.path.splitext(filename)[0]}-{downloader}{os.path.splitext(filename)[1]}") except FileNotFoundError or FileExistsError: printdebug("rename failed somehow") pass if torename and prockilled == False and filename != torename and downloader != 'aria2': def saferename(oldpath, newpath): try: os.rename(oldpath, newpath) except Exception as e: print("Rename failed. Reason: " + str(e)) pass saferename(filepath, os.path.join(folder, shlex.quote(torename))) if mode=='civit0' and prockilled == False: # if torename: # filepath = os.path.join(folder, torename) try: if os.path.exists(filepath): if os.path.getsize(filepath) <= 5 * 1024: try: with open(filepath, "r", encoding="utf-8") as file: if "We'll be right back | Civitai" in file.read(): prevciviterror = True print('CivitAI website is currently down ツ') print('') except Exception as e: print("File size checking failed. Reason: " + str(e)) pass except Exception as e: print("File size checking failed. Reason: " + str(e)) pass if mode=='dropbox' and prockilled == False: if os.path.exists(filepath): if os.path.getsize(filepath) <= 1024: print('Dropbox filesize below 1kb') print("There's a chance that the file got too much traffic and dropbox blocked access to it") print('') # if prockilled == True: # #rewind_folder(folder_s) # pass def install7zWin(): #@note install7z #usage: sevenzpath = install7zWin() global currentcondition tempcondition = currentcondition currentcondition = "Installing 7z..." localappdata = os.environ['LOCALAPPDATA'] batchlinksinstallpath = os.path.join(localappdata, "batchlinks") sevenzrpath = os.path.join(batchlinksinstallpath, "7zr.exe") sevenzrlink = "https://www.7-zip.org/a/7zr.exe" svnzpacklink = "https://7-zip.org/a/7z2201-x64.exe" svnzpackpath = os.path.join(batchlinksinstallpath, "7z2201-x64.exe") svnzexecpath = os.path.join(batchlinksinstallpath, "7z.exe") if not os.path.exists(sevenzrpath): print(sevenzrlink) runwithsubprocess("curl -Lo " + shlex.quote(sevenzrpath) + " " + sevenzrlink, batchlinksinstallpath) if not os.path.exists(svnzpackpath): print(svnzpacklink) runwithsubprocess("curl -Lo " + shlex.quote(svnzpackpath) + " " + svnzpacklink, batchlinksinstallpath) if not os.path.exists(svnzexecpath): runwithsubprocess(f"{shlex.quote(sevenzrpath)} x {shlex.quote(svnzpackpath)} -p- -o{shlex.quote(batchlinksinstallpath)} -y -sdel -bb0", batchlinksinstallpath, False, '7z') currentcondition = tempcondition return svnzexecpath def savestate_folder(folder): global currentfoldertrack currentfoldertrack = [] listfile = os.listdir(folder) for file in listfile: pathoffile = os.path.join(folder, file) currentfoldertrack.append(pathoffile) def rewind_folder(folder): listfilenew = os.listdir(folder) newerfoldertrack = [] for file in listfilenew: pathoffile = os.path.join(folder, file) newerfoldertrack.append(pathoffile) toremove = [x for x in newerfoldertrack if x not in currentfoldertrack] printdebug("\ndebug toremove: " + str(toremove)) print() for fileordir in toremove: if os.path.exists(fileordir): if os.path.isdir(fileordir): shutil.rmtree(fileordir) print("Removed incomplete download: " + fileordir) else: os.remove(fileordir) print("Removed incomplete download: " + fileordir) def writeall(olddict, shellonly, custompaths=''): newdict = trackall() global finalwrite finalwrite = [] finalwrite.append("All done!") if custompaths: finalwrite.append("⬇️Custom path added:⬇️") for hashtag, thepath in custompaths.items(): finalwrite.append(f"{hashtag} -> {thepath}") finalwrite.append("Downloaded files: ") for oldtype, olddir in olddict.items(): for newtype, newdir in newdict.items(): if newtype == oldtype: s = set(olddir) trackcompare = [x for x in newdir if x not in s] if len(trackcompare) > 0: exec(f"finalwrite.append('⬇️' + {newtype}path + '⬇️')") for item in trackcompare: finalwrite.append(item) if bool(remaininglinks): finalwrite.append("(There are still some files that have not been downloaded. Click the 'Resume Download' button to load the links that haven't been downloaded.)") printvardebug(finalwrite) if len(finalwrite) > 1 and "Downloaded files:" in finalwrite[1] and len(finalwrite) < 3: finalwrite.append("(Not finding something here? Check the terminal/colab console)") finaloutput = list_to_text(finalwrite) finalwrite = [] if shellonly: return "Commands executed successfully." else: return finaloutput def writepart(box, path): global finalwrite if len(box) > 0: finalwrite.append("⬇️" + path + "⬇️") for item in box: finalwrite.append(item) def trackall(): filesdict = dict() for x in typemain: if x == "altmodel" and altmodelpath == modelpath: continue exec(f"os.makedirs({x}path, exist_ok=True)") exec(f"filesdict['{x}'] = os.listdir({x}path)") return filesdict def currentfoldertohashtag(folder): for x in typemain: checkpath = str() checkpath = eval(x+'path') printdebug("checkpath: " + checkpath) if str(folder).strip() == checkpath: thehashtag = "#" + x printdebug("thehashtag: " + thehashtag) return thehashtag return "#debug" def splitrename(linkcurrent): renamecurrent = '' if ">" in linkcurrent: file_rename = linkcurrent.split(">") file_rename = [file_rename.strip() for file_rename in file_rename] linkcurrent = file_rename[0] if file_rename[1]: renamecurrent = file_rename[1] return linkcurrent, renamecurrent def extractcurdir(currentdir): #@note extractcurdir allfileshere = os.listdir(currentdir) szfileall = [] if platform.system() == "Windows": sevenzpath = install7zWin() global currentcondition extensiontoextract = [".zip", ".rar", ".7z", ".tar"] for filehere in allfileshere: if filehere.endswith(tuple(extensiontoextract)): szpath = os.path.join(currentdir, filehere) szfileall.append(szpath) for szfile in szfileall: if prockilled == False: # currentcondition = "Extracting" + os.path.basename(szfile) + "..." if platform.system() == "Windows": runwithsubprocess(f"{shlex.quote(sevenzpath)} x {shlex.quote(szfile)} -p- -o{shlex.quote(currentdir)} -y -sdel -bb0", currentdir, False, '7z') else: runwithsubprocess(f"7z x {shlex.quote(szfile)} -p- -o{shlex.quote(currentdir)} -y -sdel -bb0", currentdir, False, '7z') #@stopwatch #the decorator mess with the progress bar #@note run def run(command, choosedowner, civitdefault, civitpruned, civitvae, progress=gr.Progress()): progress(0.01, desc='') global prockilled prockilled = False global everyprocessid everyprocessid = [] everymethod = False global currentcondition resumebuttonvisible = False if command.strip().startswith('https://pastebin.com/') and command.strip().count('\n') == 0: currentcondition = f'Done.' if gradiostate == True: return ["Use the 'Copy from Pastebin' button instead", gr.Dataframe.update(value=buildarrayofhashtags('bottom'))] #gr.Dataframe.update(value=buildarrayofhashtags('right')), else: return ["Use the 'Copy from Pastebin' button instead", gr.Dataframe.update(value=buildarrayofhashtags('bottom')), gr.Button.update(visible=resumebuttonvisible)] #gr.Dataframe.update(value=buildarrayofhashtags('right')), if command.strip() == '@debugresetdownloads' and snapshot != {} and globaldebug == True: currentcondition = f'Removing downloaded files...' removed_files = global_rewind() texttowrite = ["⬇️Removed files⬇️"] for item in removed_files: texttowrite.append(item) writefinal = list_to_text(texttowrite) currentcondition = f'Removing done.' if gradiostate == True: return [writefinal, gr.Dataframe.update(value=buildarrayofhashtags('bottom'))] #gr.Dataframe.update(value=buildarrayofhashtags('right')), else: return [writefinal, gr.Dataframe.update(value=buildarrayofhashtags('bottom')), gr.Button.update(visible=resumebuttonvisible)] #gr.Dataframe.update(value=buildarrayofhashtags('right')), if not command.strip(): currentcondition = "Logging activated." texttowrite = ["The link box is empty."] writefinal = list_to_text(texttowrite) if gradiostate == True: return [writefinal, gr.Dataframe.update(value=buildarrayofhashtags('bottom'))] #gr.Dataframe.update(value=buildarrayofhashtags('right')) else: return [writefinal, gr.Dataframe.update(value=buildarrayofhashtags('bottom')), gr.Button.update(visible=resumebuttonvisible)] #gr.Dataframe.update(value=buildarrayofhashtags('right')), oldfilesdict = trackall() if cmd_opts.ckpt_dir: altmodelpath = cmd_opts.ckpt_dir currentfolder = altmodelpath else: altmodelpath = os.path.join(models_path, "Stable-diffusion") currentfolder = modelpath currenthashtag = '#model' os.makedirs(currentfolder, exist_ok=True) currentcondition = 'Extracting links...' links = extract_links(command) isshell = True for listpart in links: if not listpart.startswith('!'): isshell = False break printdebug("links: " + str(links)) steps = float(0) totalsteps = float(1) usemega = False usegdrive = False usemediafire = False # global ariamode global gdownupgraded global mediafireinstalled for item in links: if not item.startswith('#'): totalsteps += 1 if item.startswith('https://mega.nz'): usemega = True if item.startswith('https://drive.google.com'): usegdrive = True if item.startswith('https://www.mediafire.com/file'): usemediafire = True #break printdebug("totalsteps: " + str(totalsteps)) if usemega == True: currentcondition = 'Installing Mega...' progress(0.01, desc='Installing Mega...') if platform.system() == "Windows": installmegawin() else: installmega() if usegdrive == True and gdownupgraded == False: tempcondition = currentcondition currentcondition = "Upgrading gdown..." print('Upgrading gdown ...') print('') runwithsubprocess(f"pip3 install -q --upgrade --no-cache-dir gdown") print('gdown upgraded!') print('') currentcondition = tempcondition gdownupgraded = True if usemediafire == True and mediafireinstalled == False: tempcondition = currentcondition currentcondition = "Installing mediafire-dl..." print('Installing mediafire-dl...') print('') runwithsubprocess(f"pip3 install git+https://github.com/Juvenal-Yescas/mediafire-dl") print('mediafire-dl installed!') print('') currentcondition = tempcondition mediafireinstalled = True print('BatchLinks Downloads starting...') print('') printdebug('prockilled: ' + str(prockilled)) global remaininglinks batchtime = time.time() addedcustompath = dict() downmethod = ['gdown', 'wget', 'curl', 'aria2'] hfmethods = [ "https://raw.githubusercontent.com", "https://huggingface.co", "https://cdn.discordapp.com/attachments" ] global typemain global typechecker for listpart in links: if prockilled == False: currenttorename = '' printdebug("steps: " + str(steps)) printdebug("total steps: " + str(totalsteps)) printdebug("percentage: " + str(round(steps/totalsteps, 1))) printdebug("currenttypemain: " + str(typemain)) # ariamode = False if gradiostate == False: if time.time() - batchtime >= 70: remaininglinks = links[links.index(listpart):] printdebug("remaining links: " + str(remaininglinks)) if bool(remaininglinks): printdebug("currentfolder: " + currentfolder) tophashtag = currentfoldertohashtag(currentfolder) printdebug("tophashtag: " + tophashtag) remaininglinks.insert(0, tophashtag) printdebug("remaining links new: " + str(remaininglinks)) print() print('Runtime was stopped to prevent hangs.') print("Check the UI and press 'Resume Download' to load the remaining links") print("Then click 'Download All!' again") print('') print("These are some links that haven't been downloaded yet.👇") printremains = list_to_text(remaininglinks) print(printremains) resumebuttonvisible = True cancelrun() break if listpart.startswith("https://mega.nz"): currentlink, currenttorename = splitrename(listpart) print('\n') print(currentlink) currentcondition = f'Downloading {currentlink}...' progress(round(steps/totalsteps, 3), desc='Downloading from ' + os.path.basename(currentlink).split('#')[0] + f' into {currenthashtag}...') transfare(currentlink, currentfolder, currenttorename) steps += 1 elif listpart.startswith(tuple(hfmethods)): # elif listpart.startswith("https://huggingface.co") or listpart.startswith("https://raw.githubusercontent.com") or listpart.startswith("https://cdn.discordapp.com/attachments"): currentlink, currenttorename = splitrename(listpart) print('\n') print(currentlink) currentcondition = f'Downloading {currentlink}...' progress(round(steps/totalsteps, 3), desc='Downloading ' + os.path.basename(currentlink) + f' into {currenthashtag}...') if everymethod == False: hfdown(currentlink, currentfolder, choosedowner, 'default', currenttorename) else: for xmethod in downmethod: if prockilled == False: hfdown(currentlink, currentfolder, xmethod, 'debugevery') steps += 1 elif listpart.startswith("https://www.dropbox.com/s"): #@note dropbox currentlink, currenttorename = splitrename(listpart) if currentlink.endswith("?dl=0"): currentlink = currentlink.split("?dl=")[0] + "?dl=1" realname = currentlink.split("/")[-1].split("?dl=")[0] if currenttorename == '': currenttorename = realname print('\n') print(currentlink) currentcondition = f'Downloading {currentlink}...' progress(round(steps/totalsteps, 3), desc='Downloading ' + currenttorename + f' into {currenthashtag}...') hfdown(currentlink, currentfolder, choosedowner, 'dropbox', currenttorename) steps += 1 elif listpart.startswith("https://drive.google.com"): currentlink, currenttorename = splitrename(listpart) print('\n') print(currentlink) currentcondition = f'Downloading {currentlink}...' match = re.search(r'\?id=([^\&]+)\&export|/d/([^\s/]+)/', currentlink) if match: if match.group(1): extracted_string = match.group(1) else: extracted_string = match.group(2) progress(round(steps/totalsteps, 3), desc='Downloading from ' + extracted_string + f' into {currenthashtag}...') mediadrivedown(currentlink, currentfolder, 'gdrive', currenttorename) steps += 1 elif listpart.startswith("https://pixeldrain.com"): #@note pixeldrain currentlink, currenttorename = splitrename(listpart) if not currentlink.startswith("https://pixeldrain.com/api/file/"): fileid = currentlink.split("/")[-1] currentlink = f"https://pixeldrain.com/api/file/{fileid}" currentcondition = f'Retrieving Pixeldrain link...' searcher = "findstr" if platform.system() == "Windows" else "grep" try: filename = subprocess.getoutput(f"curl -sI {currentlink} | {searcher} -i Content-Disposition").split('filename="', 1)[1].rsplit('"', 1)[0] except IndexError: print("Something wrong while retrieving the Pixeldrain link") continue if currenttorename == '': currenttorename = filename print('\n') print(currentlink) currentcondition = f'Downloading {currentlink}...' progress(round(steps/totalsteps, 3), desc='Downloading ' + currenttorename + f' into {currenthashtag}...') # if everymethod == False: hfdown(currentlink, currentfolder, choosedowner, 'pixeldrain', currenttorename) # else: # for xmethod in downmethod: # if prockilled == False: # hfdown(currentlink, currentfolder, xmethod, 'debugevery') steps += 1 elif listpart.startswith("https://www.mediafire.com/file"): #@note mediafire currentlink, currenttorename = splitrename(listpart) print('\n') print(currentlink) currentcondition = f'Downloading {currentlink}...' progress(round(steps/totalsteps, 3), desc='Downloading ' + currentlink.split("/")[-2] + f' into {currenthashtag}...') # if everymethod == False: mediadrivedown(currentlink, currentfolder, 'mediafire', currenttorename) # else: # for xmethod in downmethod: # if prockilled == False: # hfdown(currentlink, currentfolder, xmethod, 'debugevery') steps += 1 elif listpart.startswith("https://anonfiles.com"): #@note anonfiles currentlink, currenttorename = splitrename(listpart) print('\n') print(currentlink) currentcondition = f'Retrieving anonfiles link...' # print(filename) # Send HTTP request to the website and read the response response = urllib.request.urlopen(currentlink) html_content = response.read().decode('utf-8') # Find all links that contain "anonfiles" in them using regular expressions download_links = re.findall(r'href=["\'](https?:\/\/.*?anonfiles.*?)["\']', html_content) currentlink = max(download_links, key=len) currentcondition = f'Downloading {currentlink}...' progress(round(steps/totalsteps, 3), desc='Downloading ' + currentlink.split("/")[-1] + f' into {currenthashtag}...') if everymethod == False: hfdown(currentlink, currentfolder, choosedowner, 'default', currenttorename) else: for xmethod in downmethod: if prockilled == False: hfdown(currentlink, currentfolder, xmethod, 'debugevery') steps += 1 elif listpart.startswith("https://files.catbox.moe"): currentlink, currenttorename = splitrename(listpart) print('\n') print(currentlink) try: urllib.request.urlopen(currentlink) except http.client.RemoteDisconnected: print('Connection to ' + currentlink + ' failed.') print("This colab session's server might doesn't have access to catbox") print('') continue except urllib.error.URLError as e: print('Connection to ' + currentlink + ' failed.') print("This colab session's server might doesn't have access to catbox") print('') continue print(currentlink) currentcondition = f'Downloading {currentlink}...' progress(round(steps/totalsteps, 3), desc='Downloading ' + os.path.basename(currentlink) + f' into {currenthashtag}...') if everymethod == False: hfdown(currentlink, currentfolder, choosedowner, 'default', currenttorename) else: for xmethod in downmethod: if prockilled == False: hfdown(currentlink, currentfolder, xmethod, 'debugevery') steps += 1 elif listpart.startswith("https://civitai.com/api/download/models/"): #@note civit direct usenewmethod = True currentlink, currenttorename = splitrename(listpart) if currenttorename == '': currenttorename = getcivitname(listpart) if currenttorename == 'batchlinksold': usenewmethod = False elif currenttorename == 'batchlinksskip': continue # print("That CivitAI link no longer exist, or the server is currently down.") # continue print('\n') print(currentlink) currentcondition = f'Downloading {currentlink}...' if everymethod == False: if usenewmethod: progress(round(steps/totalsteps, 3), desc='Downloading ' + currenttorename + f' into {currenthashtag}...') hfdown(currentlink, currentfolder, choosedowner, 'civit0', currenttorename) else: progress(round(steps/totalsteps, 3), desc='Downloading model number ' + os.path.basename(currentlink) + '...') civitdown(currentlink, currentfolder, currenttorename) if not prevciviterror: configlink = checkcivitconfig(currentlink) if not configlink=='': namefile= os.path.splitext(currenttorename.split('?')[0])[0] currenttorename = namefile + '.yaml' hfdown(configlink, currentfolder, choosedowner, 'civit0', currenttorename) else: for xmethod in downmethod: if prockilled == False: hfdown(currentlink, currentfolder, xmethod, 'debugevery') # civitdown(currentlink, currentfolder, currenttorename) steps += 1 elif listpart.startswith("https://github.com"): currentlink, currenttorename = splitrename(listpart) print('\n') print(currentlink) if '/raw/' in listpart or '/releases/download/' in listpart: currentcondition = f'Downloading {currentlink}...' progress(round(steps/totalsteps, 3), desc='Downloading ' + os.path.basename(currentlink) + f' into {currenthashtag}...') if everymethod == False: hfdown(currentlink, currentfolder, choosedowner, 'default', currenttorename) else: for xmethod in downmethod: if prockilled == False: hfdown(currentlink, currentfolder, xmethod, 'debugevery') else: splits = listpart.split('#')[0].split("/") currentlink = "/".join(splits[:5]) foldername = listpart.split('#')[0].rsplit('/', 1)[-1] folderpath = os.path.join(extpath, foldername) currentcondition = f'Cloning {currentlink}...' progress(round(steps/totalsteps, 3), desc='Cloning from ' + currentlink.split('/', 3)[-1] + f' into #ext...') if platform.system() == "Windows": runwithsubprocess(f"git clone {currentlink} {shlex.quote(folderpath)}") else: runwithsubprocess(f"git clone {currentlink} {folderpath}") steps += 1 elif listpart.startswith("https://civitai.com/models/"): currentlink, currenttorename = splitrename(listpart) print('\n') print(currentlink) currentcondition = f'Downloading {currentlink}...' #customtypemains = list progress(round(steps/totalsteps, 3), desc='Downloading ' + os.path.basename(currentlink) + f'...') for tag in tuple((typemain[countofdefaulthashtags:])): if "#" + tag == currenthashtag and gradiostate: progress(round(steps/totalsteps, 3), desc='Downloading ' + os.path.basename(currentlink) + f' into {currenthashtag}...') pass break if everymethod == False: civitdown2(currentlink, currentfolder, choosedowner, currenttorename, False, civitdefault, civitpruned, civitvae) # else: # for xmethod in downmethod: # if prockilled == False: # civitdown2(currentlink, currentfolder, xmethod, True) steps += 1 elif listpart.startswith("!"): commandtorun = listpart[1:] currentcondition = f'Running command: {commandtorun}' progress(round(steps/totalsteps, 3), desc=currentcondition) runwithsubprocess(commandtorun, None, True) steps += 1 elif listpart.startswith("@debugeverymethod") and globaldebug == True and gradiostate == True: print('\n') everymethod = True print('DebugEveryMethod activated!') print('One link will be downloaded with every possible download method.') print('') elif listpart.startswith("@debugresetdownloads") and snapshot != {} and globaldebug == True: print('\n') currentcondition = f'Removing downloaded files...' removed_files = global_rewind() oldfilesdict = trackall() texttowrite = ["⬇️Removed files⬇️"] for item in removed_files: texttowrite.append(item) writefinal = list_to_text(texttowrite) printdebug(str(writefinal)) elif listpart.startswith("@extract"): #@note hashtagextract print('\n') currentcondition = 'Extracting every archive file in current directory...' print('Extracting every archive file in current directory...\n') extractcurdir(currentfolder) elif listpart.startswith("@new"): #@note hashtagcustom newcommand, newhashtag, newpath = shlex.split(listpart) if newcommand and newhashtag and newpath: if newhashtag.startswith("#"): printdebug("making custom hashtag") newtype = newhashtag[1:] #global typemain #moved to top #global typechecker #moved to top if not newtype[0].isdigit(): if not newtype in typemain and not newpath in typechecker: try: typemain.append(newtype) printdebug('typemain: ' + str(typemain)) typechecker.append(newtype) printdebug('typechecker: ' + str(typechecker)) newglobalpath = newtype + "path" printdebug('newglobalpath: ' + newtype + "path") newpath = os.path.abspath(os.path.normpath(newpath).rstrip(os.sep)) os.makedirs(newpath, exist_ok=True) globals()[newglobalpath] = newpath printdebug("modelpath = " + eval(newglobalpath)) oldfilesdict[newtype] = os.listdir(newpath) # global addedcustompath addedcustompath[newhashtag] = newpath printdebug("addedcustompath: " + str(addedcustompath)) print(f"New custom path added!\n{newhashtag} means {newpath}") except Exception as e: print("Adding custom path failed! Reason: " + str(e)) else: print("Adding custom path failed! Reason: Custom hashtag name cannot starts with a number") elif listpart.startswith("@aria") or listpart.startswith("@aria2"): #@note customaria # checkariacommand = ['##', '>'] printvardebug(listpart) if "##" in listpart: listpart = listpart.split('##')[0] if ":\\" in listpart: #windows path listpart = listpart.replace('\\', '\\\\').replace('"', '\\"') ariacmd = shlex.split(listpart) printvardebug(ariacmd) arialink, ariarename, ariapath = '', '', '' renamedalready = False custompath = False # startcomment = False if (ariacmd[0] == '@aria2' or ariacmd[0] == '@aria') and len(ariacmd) > 1: #two items. @aria and link if ariacmd[1].startswith('http'): arialink = ariacmd[1] if len(ariacmd) > 2: #three items. @aria, link, and path if ariacmd[2].startswith(">") and len(ariacmd) > 3: #four items. @aria, link, '>', and rename (no custom path) ariarename = shlex.quote(ariacmd[3]) renamedalready = True else: if ariacmd[2].startswith('#'): #three items. @aria, link, hashtagpath tobeariapath, _ = hashtagtopath(ariacmd[2]) if tobeariapath: ariapath = tobeariapath else: #three items. @aria, link, custompath # ariapath = shlex.quote(ariacmd[2]) ariapath = ariacmd[2] custompath = True #message for file downloaded to custom path will be added later # if len(cmd) > 3: # if cmd[3].startswith('>'): # renaming = True # else: # renaming = False if len(ariacmd) > 4 and ariacmd[3].startswith('>') and not renamedalready: #five items. @aria, link, path, '>', and rename ariarename = shlex.quote(ariacmd[4]) if not ariapath: ariapath = currentfolder if not ariarename: ariarename = arialink.rsplit('/', 1)[-1] #finalcommand = f"aria2c --summary-interval=1 --console-log-level=error -c -x 16 -s 16 -k 1M {arialink} -d {ariapath} -o {ariarename}" printvardebug(arialink) printvardebug(ariapath) printvardebug(ariarename) if arialink and not prockilled: try: currentcondition = f'Downloading from {arialink} into {ariapath}...' hfdown(arialink, ariapath, 'aria2', 'default', ariarename) except Exception as e: print(f"Custom aria download failed. Reason: {str(e)}") elif listpart.startswith("#") and listpart.endswith(tuple(typemain)): #tuple(typemain[countofdefaulthashtags:]) try: printdebug("one of typemain") currenthashtag = listpart currentfolder = eval(listpart[1:] + "path") os.makedirs(currentfolder, exist_ok=True) except Exception as e: print(f"Cannot use hashtag: {str(e)}") else: tobecurrentfolder, tobecurrenthashtag = hashtagtopath(listpart) if tobecurrentfolder: currentfolder = tobecurrentfolder if tobecurrentfolder: currenthashtag = tobecurrenthashtag else: currentcondition = 'Operation cancelled' return "Operation cancelled" currentcondition = 'Writing output...' downloadedfiles = writeall(oldfilesdict, isshell, addedcustompath) for tokill in everyprocessid: try: os.kill(tokill, signal.SIGTERM) except ProcessLookupError: pass except PermissionError: pass except OSError: pass print() print('BatchLinks Downloads finished!') print('') currentcondition = 'Done!' printdebug(f"this should be the output:\n" + str(downloadedfiles)) progress(1.00, desc='') if gradiostate == True: return [downloadedfiles, gr.Dataframe.update(value=buildarrayofhashtags('bottom'))] #@note dataframe #gr.Dataframe.update(value=buildarrayofhashtags('right')), was here else: return [downloadedfiles, gr.Dataframe.update(value=buildarrayofhashtags('bottom')), gr.Button.update(visible=resumebuttonvisible)] #gr.Dataframe.update(value=buildarrayofhashtags('right')), was here def hashtagtopath(thehashtag): hashtagcurrent, foldercurrent = '','' notbreaking = True typemainlocal = [] for x in typemain: typemainlocal.append(x) for y in typemainlocal: if thehashtag[1:] == y: printdebug("one of typemain, found on else") hashtagcurrent = thehashtag foldercurrent = eval(thehashtag[1:] + "path") os.makedirs(foldercurrent, exist_ok=True) notbreaking = False for prefix in typechecker: if thehashtag.startswith("#" + prefix) and notbreaking: if prefix in ["embedding", "embeddings", "embed", "embeds","textualinversion", "ti"]: hashtagcurrent = '#embed' elif prefix in ["model", "models", "checkpoint", "checkpoints"]: hashtagcurrent = '#model' elif prefix in ["vae", "vaes"]: hashtagcurrent = '#vae' elif prefix in ["lora", "loras"]: hashtagcurrent ='#lora' elif prefix in ["hypernetwork", "hypernetworks", "hypernet", "hypernets", "hynet", "hynets",]: hashtagcurrent = '#hynet' elif prefix in ["addnetlora", "loraaddnet", "additionalnetworks", "addnet"]: hashtagcurrent = '#addnetlora' elif prefix in ["controlnet", "cnet"]: hashtagcurrent = '#cnet' elif prefix in ["extension", "extensions", "ext"]: hashtagcurrent = '#ext' elif prefix in ["aestheticembedding", "aestheticembed"]: hashtagcurrent = '#aestheticembed' elif prefix in ["upscaler", "upscale"]: hashtagcurrent = '#upscaler' elif prefix in ["altmodel", "altmodels"]: hashtagcurrent = '#altmodel' elif prefix in ["lycoris", "locon", "loha"]: hashtagcurrent = '#lycoris' try: foldercurrent = eval(hashtagcurrent[1:] + 'path') except Exception as e: print(f"Cannot use hashtag: {e}") continue os.makedirs(foldercurrent, exist_ok=True) return foldercurrent, hashtagcurrent wildcardcommand = [ "@debugeverymethod", "@debugresetdownloads", "@extract", "@aria", "@aria2", "@new" ] def extract_links(string): links = [] lines = string.split('\n') for line in lines: line = line.split('##')[0].strip() if line.startswith(tuple(supportedlinks)): links.append(line) elif line.startswith(tuple(wildcardcommand)): links.append(line) elif line.startswith("!"): links.append(line.strip()) elif line.startswith("#"): links.append(line.strip()) # else: # for prefix in typechecker: # if line.startswith("#" + prefix): # links.append(line) #print(f"links: {links}") return links def list_to_text(lst): stripped_list = [item.strip(',').strip('\"') for item in lst] return '\n'.join(stripped_list) def uploaded(textpath): if not textpath is None: print(textpath) file_paths = textpath.name print(file_paths) links = [] with open(file_paths, 'r') as file: for line in file: if line.startswith(tuple(supportedlinks)): links.append(line.strip()) elif line.startswith("!"): links.append(line.strip()) elif line.startswith(tuple(wildcardcommand)): links.append(line.strip()) elif line.startswith("#"): links.append(line.strip()) # else: # for prefix in typechecker: # if line.startswith("#" + prefix): # links.append(line.strip()) text = list_to_text(links) return text count = 0 def keeplog(): global currentcondition global currentsuboutput global logging if logging == False: currentcondition = "Logging activated." currentsuboutput = '' logging = True return [currentcondition, gr.Button.update(visible=True), gr.Button.update(visible=False)] if currentsuboutput == '': return [currentcondition, gr.Button.update(visible=True), gr.Button.update(visible=False)] else: return [f"{currentcondition}\n{currentsuboutput}", gr.Button.update(visible=True), gr.Button.update(visible=False)] def offlog(): global currentcondition global currentsuboutput global logging if logging == True: currentcondition = "Logging deactivated." currentsuboutput = '' logging = False return [f"{currentcondition}", gr.Button.update(visible=False), gr.Button.update(visible=True)] def empty(): return '' def cancelrun(): global processid global prockilled printdebug("debug processid: " + str(processid)) if not processid == '': if platform.system() == "Windows": global process process.terminate() _ = subprocess.getoutput("taskkill /F /T /PID" + str(process.pid)) _ = subprocess.getoutput("taskkill /f /t /im MEGAcmdServer.exe") else: try: os.kill(processid, signal.SIGTERM) except ProcessLookupError: pass except PermissionError: pass except OSError: pass #os.killpg(os.getpgid(processid.pid), signal.SIGTERM) prockilled = True if prockilled == True and globaldebug == True: print() print("This should kill") print() return ["Operation Cancelled",gr.Dataframe.update(value=buildarrayofhashtags('bottom'))] def fillbox(): global remaininglinks if bool(remaininglinks): text = list_to_text(remaininglinks) remaininglinks = [] return [text, 'Links updated!\nClick Download All! to download the rest of the links', gr.Button.update(visible=False)] return ['', '', gr.Button.update(visible=False)] if gradiostate == True: storedstatedownloader = "gdown" else: storedstatedownloader = "aria2" storedstatemodeltype = "safetensors" storedstateprecision = "fp16" storedstatevae = True def grchangedown(downstate): global storedstatedownloader storedstatedownloader = downstate printvardebug(storedstatedownloader) def grchangetype(typestate): global storedstatemodeltype storedstatemodeltype = typestate printvardebug(storedstatemodeltype) def grchangefp(fpstate): global storedstateprecision storedstateprecision = fpstate printvardebug(storedstateprecision) def grchangevae(vaestate): global storedstatevae storedstatevae = vaestate printvardebug(storedstatevae) def stretchui(stretch): #outputs=[boxtohide, bottomlist] #outputs=[boxtohide, downbox, civitbox, choose_downloader, civit_default, civit_ispruned, civit_alsodownvae]) # if stretch: # return [gr.Box.update(visible=False), gr.Accordion.update(visible=False), gr.Accordion.update(visible=True)] # else: # return [gr.Box.update(visible=True), gr.Accordion.update(visible=True), gr.Accordion.update(visible=False)] global storedstatedownloader global storedstatemodeltype global storedstateprecision global storedstatevae if stretch: return [gr.Box.update(visible=False), gr.Row.update(visible=True), gr.Row.update(visible=True), gr.Radio.update(value=storedstatedownloader), gr.Radio.update(value=storedstatemodeltype), gr.Radio.update(value=storedstateprecision), gr.Checkbox.update(value=storedstatevae)] else: return [gr.Box.update(visible=True), gr.Row.update(visible=False), gr.Row.update(visible=False), gr.Radio.update(value=storedstatedownloader), gr.Radio.update(value=storedstatemodeltype), gr.Radio.update(value=storedstateprecision), gr.Checkbox.update(value=storedstatevae)] def hidehelp(hide): if hide: return [gr.Markdown.update(value=titletext), gr.Markdown.update(visible=False)] else: return [gr.Markdown.update(value=introductiontext), gr.Markdown.update(visible=True)] # countofdefaulthashtags = len(typemain) def buildarrayofhashtags(rightorbottom): printdebug(f"buildarray {rightorbottom} initiated!") # defaultpathtime = True def writingpath(j, path): if path == modelpath: return path + " (default path)" elif path == altmodelpath: return path else: if rightorbottom == 'right' and j < countofdefaulthashtags: return path.replace(script_path, "~") else: return path hashtagandpath = [] for i, x in enumerate(typemain): try: xpath = eval(x+"path") if cmd_opts.ckpt_dir: hashtagandpath.append(["#"+x, writingpath(i, xpath)]) elif x != "altmodel": hashtagandpath.append(["#"+x, writingpath(i, xpath)]) # defaultpathtime = False except Exception as e: print(str(e)) return hashtagandpath def copyfrompastebin(boxwithlink): pastebinlink = boxwithlink.strip() if pastebinlink.startswith('https://pastebin.com/'): if pastebinlink.count('\n') == 0: global currentcondition currentcondition = f'Gathering links from Pastebin..' if not '/raw/' in pastebinlink: pastebinlink = pastebinlink.replace("pastebin.com/", "pastebin.com/raw/") pbinresponse = requests.get(pastebinlink) if pbinresponse.status_code == 200: pbintextlist = pbinresponse.text.splitlines() links = [] for line in pbintextlist: if line.startswith(tuple(supportedlinks)): links.append(line.strip()) elif line.startswith("!"): links.append(line.strip()) elif line.startswith("@new"): links.append(line.strip()) elif line.startswith(tuple(wildcardcommand)): links.append(line.strip()) elif line.startswith("#"): links.append(line.strip()) currentcondition = f'Done.' finallinks = list_to_text(links) return [finallinks, "Pastebin links retrieved successfully."] else: currentcondition = f'Done.' print("Error retrieving data from pastebin") return [boxwithlink, "Error retrieving data from pastebin"] else: currentcondition = f'Done.' return [boxwithlink, "Pastebin link must be the only one link on the textbox! (And only one pastebin link supported)"] else: currentcondition = f'Done.' return [boxwithlink, "Pastebin link must be the first (and the only) link on the textbox!"] titletext = f"""

⬇️ Batchlinks Downloader ({currentversion}) {latestversiontext}

""" introductiontext = f""" {titletext}
(what's new?)

This tool will read the textbox and download every links from top to bottom one by one
Put your links down below. Supported link: Huggingface, CivitAI, MEGA, Discord, Github, Catbox, Google Drive, Pixeldrain, Mediafire, Anonfiles, Dropbox
Use hashtag to separate downloaded items based on their download location
Valid hashtags: #embed, #model, #hypernet, #lora, #vae, #addnetlora, etc.
(For colab that uses sd-webui-additional-networks extension to load LoRA, use #addnetlora instead)
Use double hashtag (##) after links for comment. Useful to mark which links downloads what.
Remember to always press the 🔄️ refresh button on the UI after downloading models etc. in order for them to show up on the list.

""" knowmoretext = f"""

Click these links for more:
Readme Page
Example
Syntax
Valid Hashtags
Here's how you can get the direct links
Report Bug

""" testboxplaceholder = f"""#model #vae #lora ##this is a comment, and these text is just an example """ def on_ui_tabs(): with gr.Blocks() as batchlinks: with gr.Row(): with gr.Column(scale=2): introduction = gr.Markdown(introductiontext) with gr.Column(scale=1): with gr.Row(): uistretcher = gr.Checkbox(value=False, label="Stretch UI", interactive=True) helphider = gr.Checkbox(value=False, label="Hide Help", interactive=True) knowmore = gr.Markdown(knowmoretext) with gr.Group(): command = gr.Textbox(label="Links", placeholder=testboxplaceholder, lines=5) if gradiostate == True: logbox = gr.Textbox(label="Log", interactive=False) else: if vladmandic: logbox = gr.Textbox("(remove the --disable-queue args on launch.py to enable optional logging)", label="Log", interactive=False) else: logbox = gr.Textbox("(use --gradio-queue args on launch.py to enable optional logging)", label="Log", interactive=False) with gr.Row(): with gr.Box(): if gradiostate == True: with gr.Column(): # with gr.Row(): # gr.Textbox(value=None, interactive=False, show_label=False) btn_onlog = gr.Button("Turn On Logging", variant="primary", visible=True) # with gr.Row(): btn_offlog = gr.Button("Turn Off Logging", visible=False) loggingon = btn_onlog.click(keeplog, outputs=[logbox, btn_offlog, btn_onlog], every=1) btn_offlog.click(offlog, outputs=[logbox, btn_offlog, btn_onlog], cancels=[loggingon]) # gr.Textbox(value=None, interactive=False, show_label=False) # logging = gr.Radio(["Turn On Logging"], show_label=False) # logging.change(keeplog, outputs=logbox, every=1) out_text = gr.Textbox(label="Output") else: if vladmandic: print("Batchlinks webui extension: (Optional) Remove the --disable-queue args to enable logging & cancel button on this extension") else: print("Batchlinks webui extension: (Optional) Use --gradio-queue args to enable logging & cancel button on this extension") out_text = gr.Textbox("(If this text disappear, that means a download session is in progress.)", label="Output") # if platform.system() == "Windows": # choose_downloader = gr.Radio(["gdown", "wget", "curl"], value="gdown", label="Download method") # else: with gr.Row(variant='panel', visible=False) as downbox: if gradiostate == True: with gr.Column(scale=1): choose_downloader2 = gr.Radio(["gdown", "wget", "curl", "aria2"], value="gdown", label="Download method") else: with gr.Column(scale=1): choose_downloader2 = gr.Radio(["aria2"], value="aria2", label="Downloader") with gr.Row(variant='panel', visible=False) as civitbox: # with gr.Box(): # with gr.Column(scale=1): # with gr.Row(): civit_default2 = gr.Radio(["ckpt", "safetensors"], value="safetensors", label="CivitAI Preferred Model Type", interactive=True) civit_ispruned2 = gr.Radio(["fp16", "fp32"], value="fp16", label="Model Precision", interactive=True) # with gr.Row(): # civit_ispruned = gr.Checkbox(True, label="Pruned", interactive=True) civit_alsodownvae2 = gr.Checkbox(True, label="Also Download VAE", interactive=True) # def passvaluebetweenradio(newval): # return gr.Radio.update(value=newval) # def passvaluebetweencheckbox(newval): # return gr.Checkbox.update(value=newval) # choose_downloader2.change(passvaluebetweenradio, inputs=choose_downloader2, outputs=choose_downloader) # civit_default2.change(passvaluebetweenradio, inputs=civit_default2, outputs=civit_default) # civit_ispruned2.change(passvaluebetweenradio, inputs=civit_ispruned2, outputs=civit_ispruned) # civit_alsodownvae2.change(passvaluebetweencheckbox, inputs=civit_alsodownvae2, outputs=civit_alsodownvae) with gr.Row(): if gradiostate == True: with gr.Column(scale=1, min_width=100): btn_run = gr.Button("Download All!", variant="primary") # btn_upload = gr.UploadButton("Upload .txt", file_types="text") # btn_upload.upload(uploaded, btn_upload, file_output) with gr.Column(scale=1, min_width=100): btn_cancel = gr.Button("Cancel") with gr.Row(): file_output = gr.UploadButton(file_types=['.txt'], label='Upload txt') btn_pastebin = gr.Button("Copy from Pastebin") else: btn_run = gr.Button("Download All!", variant="primary") btn_resume = gr.Button("Resume Download", visible=False) with gr.Column(scale=1, min_width=100): with gr.Row(): file_output = gr.UploadButton(file_types=['.txt'], label='Upload txt') btn_pastebin = gr.Button("Copy from Pastebin") # with gr.Column(scale=1, min_width=100): # file_output = gr.UploadButton(file_types=['.txt'], label='Upload txt') # copy_pastebin = gr.Button("Copy from Pastebin") if gradiostate == False: with gr.Row(): gr.Markdown( f"""


After clicking the Download All button, it's recommended to inspect the colab console, as every information about the download progress is there.

""") if gradiostate == False: btn_resume.click(fillbox, None, outputs=[command, out_text, btn_resume]) # file_output = gr.File(file_types=['.txt'], label="you can upload a .txt file containing links here") # file_output.change(uploaded, file_output, command) file_output.upload(uploaded, file_output, command) with gr.Box(visible=True) as boxtohide: with gr.Row(variant='panel'): if gradiostate == True: with gr.Column(scale=1): choose_downloader = gr.Radio(["gdown", "wget", "curl", "aria2"], value=storedstatedownloader, label="Download method") else: with gr.Column(scale=1): choose_downloader = gr.Radio(["aria2"], value=storedstatedownloader, label="Downloader") with gr.Row(variant='panel'): civit_default = gr.Radio(["ckpt", "safetensors"], value=storedstatemodeltype, label="CivitAI Preferred Model Type", interactive=True) civit_ispruned = gr.Radio(["fp16", "fp32"], value=storedstateprecision, label="Model Precision", interactive=True) # with gr.Row(): # civit_ispruned = gr.Checkbox(True, label="Pruned", interactive=True) civit_alsodownvae = gr.Checkbox(storedstatevae, label="Also Download VAE", interactive=True) choose_downloader.change(grchangedown, inputs=choose_downloader) civit_default.change(grchangetype, inputs=civit_default) civit_ispruned.change(grchangefp, inputs=civit_ispruned) civit_alsodownvae.change(grchangevae, inputs=civit_alsodownvae) # with gr.Box(visible=True) as boxtohide: # gr.Markdown(""" #
If you feel the UI is too cramped, click the Stretch UI button above.
# """) # with gr.Accordion("List of Every Hashtags and its Path", open=False, visible=True) as rightlist: # righttable = gr.DataFrame( # buildarrayofhashtags('right'), # headers=["hashtag", "path"], # datatype=["str", "str"], # interactive=False # ) # sidetext = gr.Markdown(knowmoretext, visible=True) def passvaluebetweenradio(newval): return gr.Radio.update(value=newval) def passvaluebetweencheckbox(newval): return gr.Checkbox.update(value=newval) choose_downloader2.change(passvaluebetweenradio, inputs=choose_downloader2, outputs=choose_downloader) civit_default2.change(passvaluebetweenradio, inputs=civit_default2, outputs=civit_default) civit_ispruned2.change(passvaluebetweenradio, inputs=civit_ispruned2, outputs=civit_ispruned) civit_alsodownvae2.change(passvaluebetweencheckbox, inputs=civit_alsodownvae2, outputs=civit_alsodownvae) finish_audio = gr.Audio(interactive=False, value=os.path.join(extension_dir, "notification.mp3"), elem_id="finish_audio", visible=False) with gr.Accordion("List of Every Hashtags and its Path", open=False, visible=True) as bottomlist: bottomtable = gr.DataFrame( buildarrayofhashtags('bottom'), headers=["hashtag", "path"], datatype=["str", "str"], interactive=False, ) gr.Markdown( f"""

Made with ❤️ by etherealxx

""") helphider.change(hidehelp, helphider, outputs=[introduction, knowmore]) # uistretcher.change(stretchui, uistretcher, outputs=[boxtohide, rightlist, bottomlist]) # uistretcher.change(stretchui, uistretcher, outputs=boxtohide) uistretcher.change(stretchui, uistretcher, outputs=[boxtohide, downbox, civitbox, choose_downloader2, civit_default2, civit_ispruned2, civit_alsodownvae2]) #batchlinks.load(debug, output=debug_txt, every=1) if gradiostate == True: run_event = btn_run.click(run, inputs=[command, choose_downloader, civit_default, civit_ispruned, civit_alsodownvae], outputs=[out_text, bottomtable]) #righttable was on output btn_cancel.click(cancelrun, None, outputs=[out_text, bottomtable], cancels=[run_event]) else: btn_run.click(run, inputs=[command, choose_downloader, civit_default, civit_ispruned, civit_alsodownvae], outputs=[out_text, bottomtable, btn_resume]) #righttable was on output btn_pastebin.click(copyfrompastebin, inputs=[command], outputs=[command, out_text]) if sdless: if platform.system() == "Windows": batchlinks.queue(64).launch(inbrowser=True) else: batchlinks.queue(64).launch(share=True) else: return (batchlinks, "Batchlinks Downloader", "batchlinks"), if not sdless: script_callbacks.on_ui_tabs(on_ui_tabs) else: on_ui_tabs()